from datetime import datetime, date
from dataclasses import dataclass, field
from typing import Any, Literal, Optional, List
import pickle
import base64
from hashlib import sha256
import json
from pyspark.sql import functions as F, DataFrame, Column
from pyspark.sql.connect.column import Column as ConnectColumn
from pyspark.sql import SparkSession
from yipit_databricks_client import get_spark_session
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit import expressions as E
from etl_toolkit.expressions.core import is_column_type
from etl_toolkit.exceptions import BaseETLToolkitException, InvalidInputException
def _pickle_b64(column: Column) -> str:
# Use pickle to serialize arbitrary python objects
# and then use base64 to convert binary to string
pickled = pickle.dumps(column)
encoded = base64.b64encode(pickled).decode("utf-8")
return encoded
def _unpickle_b64(data: str) -> str:
# Use to deserialize a pickle and base64 encoded string
# and converts it back to its original data
if data == None:
return None
decode = base64.b64decode(data)
unpickle = pickle.loads(decode)
return unpickle
class ParserConfigurationException(BaseETLToolkitException):
pass
@dataclass
class ParserConfig:
"""
Use this function to define parsing configurations that are applied in the ``A.parse_records`` function.
These parsers can control inclusion and exclusion filtering logic via ``include_any`` and ``exclude_any`` and
enrichment columns through ``with_columns``.
.. tip:: Encapsulating logic in parsers like this make transformation logic modular and more easily understood.
For example, if covering multiple companies, each company's filtering logic can be defined as a distinct parser.
.. note:: This function should not be used on its own. It's not a standard analysis function that returns a dataframe.
Instead, it defines a configuration that can be supplied to ``A.parse_records``.
:param parser_id: Unique ID of the parser object that is used in the ``parser_id`` column included in the ``A.parse_records`` function.
This ID must be unique within the list of configurations used in ``A.parse_records``
:param include_any: A list or dict of boolean Column expressions. If any of these column expressions evaluate to True for a record,
the parser is considered match (assuming ``exclude_any`` also does not have a match). If a dict is provided instead of a list,
then key/value pairs are supplied where the key is a Unique ID for condition and the value is the boolean expression.
:param exclude_any: A list or dict of boolean Column expressions. If any of these column expressions evaluate to True for a record,
the record is not included in the output of ``A.parse_records``. If a dict is provided instead of a list,
then key/value pairs are supplied where the key is a Unique ID for condition and the value is the boolean expression.
:param with_columns: A dict of key/value pairs where the key is a column name to add and the value is a Column expression.
These columns are included on a record of ``A.parse_records`` should the parser match.
:param metadata: Generally not used outside of library developers. These dicts can include arbitrary key/value information about the parser.
These values are **not added** to the outputs of ``A.parse_records``
Examples
^^^^^^^^^^^^^
.. code-block:: python
:caption: Simple parser example with ``A.parse_records``.
from etl_toolkit import E, F, A, W
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])
display(
A.parse_records(
df,
configs=[
A.parser(
parser_id="red_parser",
include_any=[
F.col("color") == "red"
],
),
A.parser(
parser_id="blue_parser",
include_any=[
F.col("color") == "blue"
],
),
],
)
)
+--------------+--------------+--------------+----------------+
|color |date |value |parser_id |
+--------------+--------------+--------------+----------------+
| red| 2024-01-01| 100| red_parser|
+--------------+--------------+--------------+----------------+
| blue| 2024-01-02| 50| blue_parser|
+--------------+--------------+--------------+----------------+
.. code-block:: python
:caption: Parser example with ``A.parse_records`` when ``with_columns`` is used. Notice how the columns are added to the output dataframe.
Notice that if the set of keys of columns to add are not consisted between parsers, the function will resolve the missing columns as nulls.
from etl_toolkit import E, F, A, W
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])
display(
A.parse_records(
df,
configs=[
A.parser(
parser_id="red_parser",
include_any=[
F.col("color") == "red"
],
with_columns={
"enriched_red": F.lit(True),
"enriched_red_2": F.lit(1),
},
),
A.parser(
parser_id="blue_parser",
include_any=[
F.col("color") == "blue"
],
with_columns={
"enriched_blue": F.lit(True),
"enriched_blue_2": F.lit(1),
},
),
],
)
)
+--------------+--------------+--------------+----------------+----------------+----------------+----------------+----------------+
|color |date |value |parser_id |enriched_red |enriched_red_2 |enriched_blue |enriched_blue_2 |
+--------------+--------------+--------------+----------------+----------------+----------------+----------------+----------------+
| red| 2024-01-01| 100| red_parser|true | 1|null |null |
+--------------+--------------+--------------+----------------+----------------+----------------+----------------+----------------+
| blue| 2024-01-02| 50| blue_parser|null |null |true | 1|
+--------------+--------------+--------------+----------------+----------------+----------------+----------------+----------------+
.. code-block:: python
:caption: Parser example with ``include_any`` and ``exclude_any`` expressed as dicts. The keys are present in the output dataframe when QA mode is used for ``A.parse_records``
and the condition is satisfied for the record.
from etl_toolkit import E, F, A, W
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])
display(
A.parse_records(
df,
configs=[
A.parser(
parser_id="red_parser",
include_any={
"color_match": F.col("color") == "red",
},
),
A.parser(
parser_id="blue_parser",
include_any={
"color_match": F.col("color") == "blue",
},
exclude_any={
"date_stale": F.col("date") < date(2024, 1, 3),
},
),
],
qa=True,
)
)
+--------------+--------------+--------------+----------------+-------------------------+-------------------------+
|color |date |value |parser_id |parser_exclusion_case |parser_inclusion_case |
+--------------+--------------+--------------+----------------+-------------------------+-------------------------+
| red| 2024-01-01| 100| red_parser| null| color_match|
+--------------+--------------+--------------+----------------+-------------------------+-------------------------+
| blue| 2024-01-02| 50| blue_parser| date_stale| color_match|
+--------------+--------------+--------------+----------------+-------------------------+-------------------------+
"""
parser_id: str
include_any: list[Column] | dict[str, Column] = field(default_factory=list)
exclude_any: list[Column] | dict[str, Column] = field(default_factory=list)
with_columns: dict[str, str | Column] = field(default_factory=dict)
metadata: dict[str, str | int | date | datetime | bool | float] = field(
default_factory=dict
)
def __post_init__(self):
self._validate_include_any()
self._validate_exclude_any()
self._validate_with_columns()
@property
def full_filter(self) -> Column:
if self.exclusion_filter is None:
full_filter = self.inclusion_filter
elif self.inclusion_filter is None:
full_filter = ~self.exclusion_filter
else:
full_filter = E.all(self.inclusion_filter, ~self.exclusion_filter)
return full_filter
@property
def inclusion_filter(self) -> Column:
if isinstance(self.include_any, dict):
include_any = list(self.include_any.values())
elif isinstance(self.include_any, list):
include_any = self.include_any
else:
return None
return E.any(include_any)
@property
def exclusion_filter(self) -> Column:
if isinstance(self.exclude_any, dict):
exclude_any = list(self.exclude_any.values())
elif isinstance(self.exclude_any, list):
exclude_any = self.exclude_any
else:
return None
return E.any([F.coalesce(condition, F.lit(False)) for condition in exclude_any])
@property
def exclusion_enrichment(self) -> Column:
"""
Rather than a filter, apply exclusion as a case statement
to reveal which case matches a row. If the cases are supplied
as a list, the positional idx of the case is mapped otherwise
the case label will be used
"""
if isinstance(self.exclude_any, dict):
return E.chain_cases(
[
E.case(F.coalesce(condition, F.lit(False)), value)
for value, condition in self.exclude_any.items()
]
)
if isinstance(self.exclude_any, list):
return E.chain_cases(
[
E.case(F.coalesce(condition, F.lit(False)), idx)
for idx, condition in enumerate(self.exclude_any)
]
)
return F.lit(None).cast("string")
@property
def inclusion_enrichment(self) -> Column:
"""
Rather than a filter, apply inclusion as a case statement
to reveal which case matches a row. If the cases are supplied
as a list, the positional idx of the case is mapped otherwise
the case label will be used
"""
if isinstance(self.include_any, dict):
return E.chain_cases(
[
E.case(condition, value)
for value, condition in self.include_any.items()
]
)
if isinstance(self.include_any, list):
return E.chain_cases(
[
E.case(condition, idx)
for idx, condition in enumerate(self.include_any)
]
)
return F.lit(None).cast("string")
@property
def checksum(self) -> str:
return sha256(
json.dumps(self._json_representation, sort_keys=True).encode("utf-8")
).hexdigest()
@property
def _json_representation(self) -> dict:
return {
"include_any": self._pickled_filter(self.include_any),
"exclude_any": self._pickled_filter(self.exclude_any),
"with_columns": {
key: _pickle_b64(E.normalize_literal(value))
for key, value in self.with_columns.items()
},
}
def _pickled_filter(
self, parser_filter: list[Column] | dict[str, Column]
) -> dict[str, str] | list[str]:
"""
Pickling a list or dict of columns as that will preserve the pyspark implementation of each column
without having to rely on __str__ which is not consistent. Pickled values are binary, so using base64 encoding
to capture value as a string for ease of use in generating a checksum
"""
serialized_filter = None
if isinstance(parser_filter, dict):
serialized_filter = {
case_id: _pickle_b64(column)
for case_id, column in parser_filter.items()
}
elif isinstance(parser_filter, list):
serialized_filter = [_pickle_b64(column) for column in parser_filter]
return serialized_filter
def _validate_include_any(self):
if not (
isinstance(self.include_any, dict) or isinstance(self.include_any, list)
):
raise ParserConfigurationException(
"include_any is invalid type, must be a dict or list, fix the value and re-run"
)
if isinstance(self.include_any, dict):
for case_id, column in self.include_any.items():
if not is_column_type(column):
raise ParserConfigurationException(
f"include_any case_id: {case_id} must be a Pyspark Column, fix the value and re-run"
)
if isinstance(self.include_any, list):
for case_position, column in enumerate(self.include_any):
if not is_column_type(column):
raise ParserConfigurationException(
f"include_any case item {case_position} must be a Pyspark Column, fix the value and re-run"
)
def _validate_exclude_any(self):
if not (
isinstance(self.exclude_any, dict) or isinstance(self.exclude_any, list)
):
raise ParserConfigurationException(
"exclude_any is invalid type, must be a dict or list, fix the value and re-run"
)
if isinstance(self.exclude_any, dict):
for case_id, column in self.exclude_any.items():
if not is_column_type(column):
raise ParserConfigurationException(
f"exclude_any case_id: {case_id} must be a Pyspark Column, fix the value and re-run"
)
if isinstance(self.exclude_any, list):
for case_position, column in enumerate(self.exclude_any):
if not is_column_type(column):
raise ParserConfigurationException(
f"exclude_any case item {case_position} must be a Pyspark Column, fix the value and re-run"
)
def _validate_with_columns(self):
if not isinstance(self.with_columns, dict):
raise ParserConfigurationException(
"with_columns is invalid type, must be a dict or list"
)
for column_name, value in self.with_columns.items():
if not (
isinstance(value, (str, int, float, datetime, date, bool))
or (value is None)
or is_column_type(value)
):
raise ParserConfigurationException(
f"with_column field {column_name} must have a value of type string or Pyspark Column, fix the value and re-run"
)
@track_usage
def fetch_parsers_for_dataset(
dataset: Literal["yoda", "leia", "skywalker", "edison", "spendhound", "mando"],
parser_ids: Optional[List[str]] = None,
source_table: str = "yd_production.etl_gold.collaborative_tags",
spark: SparkSession = None,
) -> List[ParserConfig]:
"""
Returns a list of parser configs for the given dataset from the collaborative tags table.
:param dataset: The dataset to fetch parsers for. Must be one of: "yoda", "leia", "skywalker", "edison", "spendhound", "mando"
:param parser_ids: Optional list of specific parser IDs to fetch. If not provided, all parsers for the dataset are returned.
:param source_table: The source table containing parser configurations. Defaults to "yd_production.etl_gold.collaborative_tags"
:param spark: Spark session to use. Generally, this is **not needed** as the session is automatically generated in databricks. It is used by library developers.
:return: List of ParserConfig objects that can be used with parse_records()
:raises: InvalidInputException if dataset or parser_ids are invalid
Examples
^^^^^^^^
.. code-block:: python
:caption: Fetch all parsers for the Yoda dataset
from etl_toolkit import A
# Get all Yoda parsers
parsers = A.fetch_parsers_for_dataset("yoda")
# Use parsers with parse_records
df = A.parse_records(
input_df,
configs=parsers
)
.. code-block:: python
:caption: Fetch specific parsers by ID
from etl_toolkit import A
# Get specific parsers by ID
parsers = A.fetch_parsers_for_dataset(
"skywalker",
parser_ids=["parser_1", "parser_2"]
)
# Use parsers with parse_records
df = A.parse_records(
input_df,
configs=parsers
)
"""
VALID_DATASETS = ["yoda", "leia", "skywalker", "edison", "spendhound", "mando"]
# Validate inputs
if dataset not in VALID_DATASETS:
raise InvalidInputException(
f"Invalid dataset '{dataset}'. Must be one of: {', '.join(VALID_DATASETS)}"
)
spark = spark or get_spark_session()
df = (
spark.table(source_table)
.where(F.col("is_current") == True)
.select("parser_id", "dataset", "_pickle_parsers")
)
parser_df = df.where(F.col("dataset") == dataset)
if parser_ids is not None:
if not isinstance(parser_ids, list):
raise InvalidInputException("parser_ids must be a list of strings")
parser_df = parser_df.where(F.col("parser_id").isin(parser_ids))
# Verify all requested parser_ids exist
found_parser_ids = {
row["parser_id"] for row in parser_df.select("parser_id").collect()
}
missing_parser_ids = set(parser_ids) - found_parser_ids
if missing_parser_ids:
raise InvalidInputException(
f"The following parser_ids were not found for dataset '{dataset}': {missing_parser_ids}"
)
# Unpickle parser configs
parsers = [
_unpickle_b64(row["_pickle_parsers"])
for row in parser_df.select("_pickle_parsers").collect()
]
return parsers
@track_usage
[docs]
def parse_records(
df: DataFrame,
configs: list[parser],
qa: bool = False,
include_non_matches: bool = False,
) -> DataFrame:
"""
:bdg-primary:`QA Mode support`
.. role:: python(code)
:language: python
Takes the input ``df`` and returns an output dataframe
filtered by the provided configs, as each config is mapped to a ``parser_id`` column
that is added to the output dataframe. Rows are filtered if:
* they match the inclusion cases of any parser
* they do not match the exclusion case of the corresponding matched parser.
If :python:`qa=True`, then the ``exclude_any`` argument for each parser will not drop records.
Instead, those columns will be included with a ``parser_exclusion_case`` column added to flag which
exclude criteria matches that record.
Additionally, if :python:`qa=True`, then the ``include_any`` argument for each parser will be used to
populate the ``parser_inclusion_case`` column added to the output df. However, this is in addition to the ``include_any`` option filtering records.
If :python:`include_non_matches=True`, the parser will not filter out any records that were not able to match.
If :python:`qa=True` and :python:`include_non_matches=True`, the function allows multiple parser matches per record. In this case:
* The ``parser_id`` and ``tag`` columns become arrays, potentially containing multiple parser IDs.
* The ``parser_inclusion_case`` and ``parser_exclusion_case`` columns become arrays of structs, each containing the ``parser_id`` and the corresponding inclusion or exclusion case.
:param df: Input dataframe that is filtered and enriched by the specified parser ``configs``.
:param configs: List of ``A.parser`` configs to use in parsing the ``df``. Parsers are evaluated in order and the first matching parser is used for a given record.
:param qa: Boolean flag to control QA mode for this function. When ``True``, then all rows are preserved and ``parser_inclusion_case`` + ``parser_exclusion_case`` columns are added to the dataframe that indicate the specific case that caused the parser to match. If ``False``, any rows that don't match any of the inclusion cases or match any of the exclusion cases are removed.
:param include_non_matches: Boolean flag to control if unmatched records are excluded from the output dataframe. Unmatched records are included when this flag is ``True``.
Examples
^^^^^^^^^^^^
.. code-block:: python
:caption: Example of using ``A.parse_records`` to filter data based on a list of ``A.parsers`` and their conditions.
Notice how only matched records are included and the ``parser_id`` column indicates which parser was matched.
from etl_toolkit import E, F, A, W
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])
display(
A.parse_records(
df,
configs=[
A.parser(
parser_id="red_parser",
include_any=[
F.col("color") == "red",
],
)
]
)
)
+--------------+--------------+--------------+----------------+
|color |date |value |parser_id |
+--------------+--------------+--------------+----------------+
| red| 2024-01-01| 100| red_parser|
+--------------+--------------+--------------+----------------+
.. code-block:: python
:caption: Example of using `A.parse_records` to filter data on both inclusion and exclusion criteria.
from etl_toolkit import E, F, A, W
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])
display(
A.parse_records(
df,
configs=[
A.parser(
parser_id="red_parser",
include_any=[
F.col("date") >= date(2024, 1, 1),
],
exclude_any=[
F.col("color") == "blue"
]
)
]
)
)
+--------------+--------------+--------------+----------------+
|color |date |value |parser_id |
+--------------+--------------+--------------+----------------+
| red| 2024-01-01| 100| red_parser|
+--------------+--------------+--------------+----------------+
.. code-block:: python
:caption: Example of how multipler parsers can be used.
Notice how if a record satisfies any of the included parsers it will be included,
and the associated ``parser_id`` is present in the output row.
from etl_toolkit import E, F, A, W
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])
display(
A.parse_records(
df,
configs=[
A.parser(
parser_id="red_parser",
include_any=[
F.col("color") == "red"
],
),
A.parser(
parser_id="blue_parser",
include_any=[
F.col("color") == "blue"
],
),
],
)
)
+--------------+--------------+--------------+----------------+
|color |date |value |parser_id |
+--------------+--------------+--------------+----------------+
| red| 2024-01-01| 100| red_parser|
+--------------+--------------+--------------+----------------+
| blue| 2024-01-02| 50| blue_parser|
+--------------+--------------+--------------+----------------+
.. code-block:: python
:caption: Example of how QA mode works. Notice how the inclusion and exclusion cases apply to each matched ``parser_id``.
The values of these additional columns match the positional index of ``includes_any`` or ``excludes_any`` arguments for
the matching parser if they are lists. If dicts are used for these arguments, then the matching key will be used as the value.
These columns pinpoint the logic responsible for a records being included/excluded from the dataframe.
from etl_toolkit import E, F, A, W
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])
display(
A.parse_records(
df,
configs=[
A.parser(
parser_id="red_parser",
include_any=[
F.col("date") >= date(2024, 1, 1),
],
exclude_any=[
F.col("color") == "blue"
]
)
],
qa=True,
)
)
+--------------+--------------+--------------+----------------+
|color |date |value |parser_id |
+--------------+--------------+--------------+----------------+
| red| 2024-01-01| 100| red_parser|
+--------------+--------------+--------------+----------------+
.. code-block:: python
:caption: Example of how the `include_non_matches` flag works.
Notice how unmatched records are preserved in the dataframe with a ``parser_id`` column value of NULL.
from etl_toolkit import E, F, A, W
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])
display(
A.parse_records(
df,
configs=[
A.parser(
parser_id="red_parser",
include_any=[
F.col("date") >= date(2024, 1, 1),
],
exclude_any=[
F.col("color") == "blue"
]
)
],
include_non_matches=True,
)
)
+--------------+--------------+--------------+----------------+
|color |date |value |parser_id |
+--------------+--------------+--------------+----------------+
| red| 2024-01-01| 100| red_parser|
+--------------+--------------+--------------+----------------+
| blue| 2024-01-02| 50| null|
+--------------+--------------+--------------+----------------+
.. code-block:: python
:caption: Example of how the QA mode works with include_non_matches.
Notice how records can match multiple parsers and show multiple inclusion/exclusion cases.
from etl_toolkit import E, F, A, W
from datetime import date
df = spark.createDataFrame([
{"id": 1, "shape": "square", "color": "red"},
{"id": 2, "shape": "square", "color": "blue"},
{"id": 3, "shape": "triangle", "color": "blue"},
{"id": 4, "shape": "circle", "color": "red"},
])
result_df = A.parse_records(
df,
configs=[
A.parser(
parser_id="red_square_parser",
include_any=[F.col("shape") == "square"],
exclude_any=[F.col("color").isin(["blue", "green"])],
),
A.parser(
parser_id="blue_parser",
include_any=[F.col("color") == "blue"],
),
],
qa=True,
include_non_matches=True,
)
display(result_df)
+----+--------+-----+-----------------------------------+----------------------------------------------------+----------------------------------------------------+-----------------------------------+-------------+
| id | shape |color|parser_id |parser_inclusion_case |parser_exclusion_case |matched_parser_ids |primary_match|
+----+--------+-----+-----------------------------------+----------------------------------------------------+----------------------------------------------------+-----------------------------------+-------------+
| 1 | square | red |["red_square_parser"] |[{"parser_id":"red_square_parser","inclusion_case":0}]|[{"parser_id":"red_square_parser","exclusion_case":null}]|["red_square_parser"] |red_square_parser|
| 2 | square |blue |["red_square_parser", "blue_parser"]|[{"parser_id":"red_square_parser","inclusion_case":0},|[{"parser_id":"red_square_parser","exclusion_case":0}] |["blue_parser"] |blue_parser |
| | | | | {"parser_id":"blue_parser","inclusion_case":0}] | | | |
| 3 |triangle|blue |["blue_parser"] |[{"parser_id":"blue_parser","inclusion_case":0}] |[] |["blue_parser"] |blue_parser |
| 4 | circle | red |[] |[] |[] |[] |null |
+----+--------+-----+-----------------------------------+----------------------------------------------------+----------------------------------------------------+-----------------------------------+-------------+
In this example:
- The square red shape (id=1) matches only the "red_square_parser" and is listed as primary match since it's the only match.
- The square blue shape (id=2) initially matches both parsers but after exclusions only matches "blue_parser" (shown in matched_parser_ids).
The parser_id shows all initial matches before exclusions.
- The triangle blue shape (id=3) matches only the "blue_parser" and thus has it as primary_match.
- The circle red shape (id=4) doesn't match any parser, but is included due to `include_non_matches=True`.
The columns:
- `parser_id`: Array showing all initial parser matches before exclusions
- `matched_parser_ids`: Array showing final parser matches after applying exclusions
- `primary_match`: Single parser ID when there's exactly one match after exclusions, otherwise null
- `parser_inclusion_case` and `parser_exclusion_case`: Arrays of structs containing the parser_id and corresponding inclusion or exclusion case that matched
"""
parsing_cases = []
for config in configs:
if qa and len(config.include_any):
condition = config.inclusion_filter
elif qa and not len(config.include_any):
condition = F.lit(True)
else:
condition = config.full_filter
parsing_cases.append(E.case(condition, E.normalize_literal(config.parser_id)))
with_columns = {}
for config in configs:
for column in config.with_columns:
if column not in with_columns:
with_columns[column] = []
for config in configs:
for key, value in config.with_columns.items():
if qa and include_non_matches:
with_columns[key].append(
E.case(
F.array_contains(F.col("parser_id"), config.parser_id),
E.normalize_literal(value),
)
)
else:
with_columns[key].append(
E.case(
(F.col("parser_id") == config.parser_id),
E.normalize_literal(value),
)
)
if qa and include_non_matches:
parsed_df = df.withColumn("parser_id", E.chain_cases_multiple(parsing_cases))
elif include_non_matches:
parsed_df = df.withColumn(
"parser_id", E.chain_cases(parsing_cases, otherwise=None)
)
else:
parsed_df = df.withColumn(
"parser_id", E.chain_cases(parsing_cases, otherwise=None)
).filter(F.col("parser_id").isNotNull())
if len(with_columns):
parsed_df = parsed_df.withColumns(
{
column: E.chain_cases(conditions, otherwise=None)
for column, conditions in with_columns.items()
}
)
if qa:
total_include_count = sum(len(config.include_any) for config in configs)
total_exclude_count = sum(len(config.exclude_any) for config in configs)
if include_non_matches:
matched_ids_expr = F.filter(
F.col("matched_parser_ids_before_exclusions"),
lambda tag: ~F.array_contains(
F.transform(
F.filter(
F.col("parser_exclusion_case"),
lambda x: x.exclusion_case.isNotNull(),
),
lambda x: x.parser_id,
),
tag,
),
)
primary_match_expr = F.when(
F.size(F.col("matched_parser_ids")) == 1, F.col("matched_parser_ids")[0]
).otherwise(F.lit(None))
enriched_columns = _get_qa_enriched_columns(configs)
parsed_df = parsed_df.withColumns(enriched_columns)
parsed_df = (
parsed_df.withColumn("matched_parser_ids", matched_ids_expr)
.withColumn("primary_match", primary_match_expr)
.drop("tag")
)
else:
parsed_df = parsed_df.withColumns(
{
"parser_exclusion_case": E.chain_cases(
[
E.case(
(F.col("parser_id") == config.parser_id),
config.exclusion_enrichment,
)
for config in configs
if len(config.exclude_any)
]
)
if total_exclude_count
else F.lit(None),
"parser_inclusion_case": E.chain_cases(
[
E.case(
(F.col("parser_id") == config.parser_id),
config.inclusion_enrichment,
)
for config in configs
if len(config.include_any)
]
)
if total_include_count
else F.lit(None),
"tag": F.col("parser_id"),
}
)
return parsed_df
def _get_qa_enriched_columns(configs):
total_include_count = sum(len(config.include_any) for config in configs)
total_exclude_count = sum(len(config.exclude_any) for config in configs)
exclusion_case_expr = (
E.chain_cases_multiple(
[
E.case(
F.array_contains(F.col("parser_id"), config.parser_id),
F.struct(
F.lit(config.parser_id).alias("parser_id"),
config.exclusion_enrichment.alias("exclusion_case"),
),
)
for config in configs
if len(config.exclude_any)
]
)
if total_exclude_count
else F.lit(None)
)
inclusion_case_expr = (
E.chain_cases_multiple(
[
E.case(
F.array_contains(F.col("parser_id"), config.parser_id),
F.struct(
F.lit(config.parser_id).alias("parser_id"),
config.inclusion_enrichment.alias("inclusion_case"),
),
)
for config in configs
if len(config.include_any)
]
)
if total_include_count
else F.lit(None)
)
matched_ids_before_exclusions_expr = E.chain_cases_multiple(
[
E.case(
F.array_contains(F.col("parser_id"), config.parser_id),
F.lit(config.parser_id),
)
for config in configs
]
)
return {
"parser_exclusion_case": exclusion_case_expr,
"parser_inclusion_case": inclusion_case_expr,
"matched_parser_ids_before_exclusions": matched_ids_before_exclusions_expr,
}