Source code for etl_toolkit.analyses.dedupe

from pyspark.sql import functions as F, DataFrame, Window as W, Column
from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit import expressions as E


@track_usage
[docs] def dedupe_by_condition( df: DataFrame, condition: str | Column, qa: bool = False, ) -> DataFrame: """ :bdg-primary:`QA Mode support` .. role:: python(code) :language: python Function that applies a generic deduplication transformation where rows will be excluded if the supplied condition does not equal the expected_value. The expected value is 1 as a common use case is to dedupe based on a row number over a custom window expression. If :python:`qa=True`, then all rows will be preserved, and a ``dedupe_index`` column matching the condition will be added to the returned dataframe. This can be useful when investigating the behavior of deduplication for QA purposes. :param df: DataFrame to deduplicate :param condition: A pyspark Column or string that should return a 1 if the row is meant to be preserved. If a string is provided, it is resolved as a Column. :param qa: Boolean flag to control QA mode for this function. When ``True``, then all rows are preserved and a ``dedupe_index`` column is added to the dataframe that stores the value of the ``condition``. If ``False``, rows that with a condition value that does not equal 1 are dropped. .. code-block:: python :caption: Dedupe by taking the first value of rows for each color based on the date column. Note that we use a Window expression to define the condition. from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) window = W.partitionBy("color").orderBy("date") display( A.dedupe_by_condition( df, condition=F.row_number().over(window), ) ) +--------------+--------------+--------------+ |color |date |output | +--------------+--------------+--------------+ |blue | 2023-01-01| 50| +--------------+--------------+--------------+ |red | 2023-01-01| 1000| +--------------+--------------+--------------+ .. code-block:: python :caption: When using QA mode, the function will include all rows and a ``dedupe_index`` indicating the value of the condition. from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) window = W.partitionBy("color").orderBy("date") display( A.dedupe_by_condition( df, condition=F.row_number().over(window), qa=True, ) ) +--------------+--------------+--------------+--------------+ |color |date |output |dedupe_index | +--------------+--------------+--------------+--------------+ |blue | 2023-01-01| 50| 1| +--------------+--------------+--------------+--------------+ |red | 2023-01-01| 1000| 1| +--------------+--------------+--------------+--------------+ |red | 2023-01-01| 100| 2| +--------------+--------------+--------------+--------------+ """ base_df = df.withColumns({"dedupe_index": E.normalize_column(condition)}) if qa: return base_df return base_df.where(F.col("dedupe_index") == 1).drop("dedupe_index")
@track_usage
[docs] def dedupe_by_row_number( df: DataFrame, dedupe_columns: list[str | Column], order_columns: list[str | Column], qa: bool = False, ) -> DataFrame: """ :bdg-primary:`QA Mode support` .. role:: python(code) :language: python Function that dedupes rows from the supplied ``df`` by dropping rows that are sorted and the row number != 1. The row ordering is handled within each window partition of the ``dedupe_columns`` and then sorted in order of ``order_columns`` (i.e. the columns produce a window expression :python:`F.row_number().over(W.partitionBy(*dedupe_columns).orderBy(*order_columns))`). If :python:`qa=True`, then all rows will be preserved, and a ``dedupe_index`` column indicating the row number will be added to the returned dataframe. This can be useful when investigating the behavior of deduplication for QA purposes. .. caution:: The deduplication logic in this function uses ``F.row_number`` from pyspark. If a different ranking function is preferred, for ex. ``F.rank`` or ``F.dense_rank``, use ``A.dedupe_by_condition`` with a custom window expression. :param df: DataFrame to deduplicate :param dedupe_columns: A list of pyspark columns or strings that define the window partitions of the dataframe to sort the rows by. In most cases, the this will be the columns that define a row's uniqueness. If strings are provided, they will be resolved as Columns. :param order_columns: A list of pyspark columns or strings that sorts rows within each window partition of the dataframe. This sorting will determine the row_number. Be explicit in sorting ascending or descending when specifying these columns. If strings are passed they are resolved as Columns. :param qa: Boolean flag to control QA mode for this function. When ``True``, then all rows are preserved and a ``dedupe_index`` column is added to the dataframe that stores the row_number. If ``False``, rows with row numbers that do not equal 1 are dropped. Examples ----------- .. code-block:: python :caption: Dedupe by taking the first value of rows for each color based on the date column from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) display( A.dedupe_by_row_number( df, dedupe_columns=["color"], order_columns=["date"], ) ) +--------------+--------------+--------------+ |color |date |value | +--------------+--------------+--------------+ |blue | 2023-01-01| 50| +--------------+--------------+--------------+ |red | 2023-01-01| 1000| +--------------+--------------+--------------+ .. code-block:: python :caption: When using the QA mode all rows are included and a ``dedupe_index`` column is added that indicates the row number. from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) display( A.dedupe_by_row_number( df, dedupe_columns=["color"], order_columns=["date"], qa=True, ) ) +--------------+--------------+--------------+--------------+ |color |date |value |dedupe_index | +--------------+--------------+--------------+--------------+ |blue | 2023-01-01| 50| 1| +--------------+--------------+--------------+--------------+ |red | 2023-01-01| 1000| 1| +--------------+--------------+--------------+--------------+ |red | 2023-01-01| 100| 2| +--------------+--------------+--------------+--------------+ .. code-block:: python :caption: Dedupe by taing the last value of rows for each color based on the date column. Note that implementing descending order is via the ``F.desc`` function in pyspark. from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) display( A.dedupe_by_row_number( df, dedupe_columns=["color"], order_columns=[F.desc("date")], ) ) +--------------+--------------+--------------+ |color |date |value | +--------------+--------------+--------------+ |blue | 2023-01-01| 50| +--------------+--------------+--------------+ |red | 2024-01-01| 100| +--------------+--------------+--------------+ """ dedupe_columns = [E.normalize_column(col) for col in dedupe_columns] order_columns = [E.normalize_column(col) for col in order_columns] condition = F.row_number().over( W.partitionBy(*dedupe_columns).orderBy(*order_columns) ) return dedupe_by_condition( df, condition=condition, qa=qa, )