from pyspark.sql import functions as F, DataFrame, Window as W, Column
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit import expressions as E
@track_usage
[docs]
def dedupe_by_condition(
df: DataFrame,
condition: str | Column,
qa: bool = False,
) -> DataFrame:
"""
:bdg-primary:`QA Mode support`
.. role:: python(code)
:language: python
Function that applies a generic deduplication transformation where rows will be excluded
if the supplied condition does not equal the expected_value.
The expected value is 1 as a common use case is to dedupe based on a row number
over a custom window expression.
If :python:`qa=True`, then all rows will be preserved, and a ``dedupe_index`` column
matching the condition will be added to the returned dataframe. This can be useful
when investigating the behavior of deduplication for QA purposes.
:param df: DataFrame to deduplicate
:param condition: A pyspark Column or string that should return a 1 if the row is meant to be preserved. If a string is provided, it is resolved as a Column.
:param qa: Boolean flag to control QA mode for this function. When ``True``, then all rows are preserved and a ``dedupe_index`` column is added to the dataframe that stores the value of the ``condition``. If ``False``, rows that with a condition value that does not equal 1 are dropped.
.. code-block:: python
:caption: Dedupe by taking the first value of rows for each color based on the date column.
Note that we use a Window expression to define the condition.
from etl_toolkit import E, F, A, W
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 1000, "color": "red", "date": date(2023, 1, 1)},
{"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])
window = W.partitionBy("color").orderBy("date")
display(
A.dedupe_by_condition(
df,
condition=F.row_number().over(window),
)
)
+--------------+--------------+--------------+
|color |date |output |
+--------------+--------------+--------------+
|blue | 2023-01-01| 50|
+--------------+--------------+--------------+
|red | 2023-01-01| 1000|
+--------------+--------------+--------------+
.. code-block:: python
:caption: When using QA mode, the function will include all rows and a ``dedupe_index`` indicating the value of the condition.
from etl_toolkit import E, F, A, W
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 1000, "color": "red", "date": date(2023, 1, 1)},
{"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])
window = W.partitionBy("color").orderBy("date")
display(
A.dedupe_by_condition(
df,
condition=F.row_number().over(window),
qa=True,
)
)
+--------------+--------------+--------------+--------------+
|color |date |output |dedupe_index |
+--------------+--------------+--------------+--------------+
|blue | 2023-01-01| 50| 1|
+--------------+--------------+--------------+--------------+
|red | 2023-01-01| 1000| 1|
+--------------+--------------+--------------+--------------+
|red | 2023-01-01| 100| 2|
+--------------+--------------+--------------+--------------+
"""
base_df = df.withColumns({"dedupe_index": E.normalize_column(condition)})
if qa:
return base_df
return base_df.where(F.col("dedupe_index") == 1).drop("dedupe_index")
@track_usage
[docs]
def dedupe_by_row_number(
df: DataFrame,
dedupe_columns: list[str | Column],
order_columns: list[str | Column],
qa: bool = False,
) -> DataFrame:
"""
:bdg-primary:`QA Mode support`
.. role:: python(code)
:language: python
Function that dedupes rows from the supplied ``df`` by dropping rows that are sorted and the row number != 1.
The row ordering is handled within each window partition of the ``dedupe_columns`` and then sorted in order of ``order_columns``
(i.e. the columns produce a window expression :python:`F.row_number().over(W.partitionBy(*dedupe_columns).orderBy(*order_columns))`).
If :python:`qa=True`, then all rows will be preserved, and a ``dedupe_index`` column
indicating the row number will be added to the returned dataframe. This can be useful
when investigating the behavior of deduplication for QA purposes.
.. caution:: The deduplication logic in this function uses ``F.row_number`` from pyspark.
If a different ranking function is preferred, for ex. ``F.rank`` or ``F.dense_rank``, use
``A.dedupe_by_condition`` with a custom window expression.
:param df: DataFrame to deduplicate
:param dedupe_columns: A list of pyspark columns or strings that define the window partitions of the dataframe to sort the rows by. In most cases, the this will be the columns that define a row's uniqueness. If strings are provided, they will be resolved as Columns.
:param order_columns: A list of pyspark columns or strings that sorts rows within each window partition of the dataframe. This sorting will determine the row_number. Be explicit in sorting ascending or descending when specifying these columns. If strings are passed they are resolved as Columns.
:param qa: Boolean flag to control QA mode for this function. When ``True``, then all rows are preserved and a ``dedupe_index`` column is added to the dataframe that stores the row_number. If ``False``, rows with row numbers that do not equal 1 are dropped.
Examples
-----------
.. code-block:: python
:caption: Dedupe by taking the first value of rows for each color based on the date column
from etl_toolkit import E, F, A
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 1000, "color": "red", "date": date(2023, 1, 1)},
{"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])
display(
A.dedupe_by_row_number(
df,
dedupe_columns=["color"],
order_columns=["date"],
)
)
+--------------+--------------+--------------+
|color |date |value |
+--------------+--------------+--------------+
|blue | 2023-01-01| 50|
+--------------+--------------+--------------+
|red | 2023-01-01| 1000|
+--------------+--------------+--------------+
.. code-block:: python
:caption: When using the QA mode all rows are included and a ``dedupe_index`` column is added that indicates the row number.
from etl_toolkit import E, F, A
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 1000, "color": "red", "date": date(2023, 1, 1)},
{"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])
display(
A.dedupe_by_row_number(
df,
dedupe_columns=["color"],
order_columns=["date"],
qa=True,
)
)
+--------------+--------------+--------------+--------------+
|color |date |value |dedupe_index |
+--------------+--------------+--------------+--------------+
|blue | 2023-01-01| 50| 1|
+--------------+--------------+--------------+--------------+
|red | 2023-01-01| 1000| 1|
+--------------+--------------+--------------+--------------+
|red | 2023-01-01| 100| 2|
+--------------+--------------+--------------+--------------+
.. code-block:: python
:caption: Dedupe by taing the last value of rows for each color based on the date column.
Note that implementing descending order is via the ``F.desc`` function in pyspark.
from etl_toolkit import E, F, A
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 1)},
{"value": 1000, "color": "red", "date": date(2023, 1, 1)},
{"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])
display(
A.dedupe_by_row_number(
df,
dedupe_columns=["color"],
order_columns=[F.desc("date")],
)
)
+--------------+--------------+--------------+
|color |date |value |
+--------------+--------------+--------------+
|blue | 2023-01-01| 50|
+--------------+--------------+--------------+
|red | 2024-01-01| 100|
+--------------+--------------+--------------+
"""
dedupe_columns = [E.normalize_column(col) for col in dedupe_columns]
order_columns = [E.normalize_column(col) for col in order_columns]
condition = F.row_number().over(
W.partitionBy(*dedupe_columns).orderBy(*order_columns)
)
return dedupe_by_condition(
df,
condition=condition,
qa=qa,
)