Source code for etl_toolkit.analyses.qa

from typing import Optional, List, Tuple
from pyspark.sql import functions as F, DataFrame
from etl_toolkit import E, A
from etl_toolkit.exceptions import InvalidInputException

from yipit_databricks_utils.helpers.pyspark_utils import get_spark_session
from yipit_databricks_utils.helpers.telemetry import track_usage


def validate_dataframes(df1: DataFrame, df2: DataFrame) -> None:
    """Validate that the input DataFrames are valid."""
    if df1 is None or df2 is None:
        raise InvalidInputException("Both DataFrames must be provided and not None")


@track_usage
[docs] def get_schema_comparison( df1: DataFrame, df2: DataFrame, diff_only: bool = False ) -> DataFrame: """ Compare schemas from two DataFrames and returns a DataFrame with the differences. For schema, all columns are compared regardless of what is passed to columns parameter. :param df1: The first DataFrame to compare. :param df2: The second DataFrame to compare. :param diff_only: If True, only columns that are different will be returned in schema comparison dataframe. :return: DataFrame with the schema comparison. Examples ^^^^^^^^^^^^^ .. code-block:: python # Schema comparison example with ``A.get_schema_comparison``. from pyspark.sql import functions as F, Row, DataFrame, types as T from etl_toolkit import analyses as A from pyspark.sql.types import ( StructType, StructField, IntegerType, StringType, DateType, TimestampType, BooleanType, ) from datetime import date, datetime df1 = spark.createDataFrame( [ (1, "John Doe", date(2024, 1, 1), datetime(2024, 1, 1), True), (2, "Jane Doe", date(2024, 1, 2), datetime(2024, 1, 2), False), ], StructType( [ StructField("id", IntegerType()), StructField("name", StringType()), StructField("date", DateType()), StructField("datetime", TimestampType()), StructField("is_active", BooleanType()), ] ) ) df2 = spark.createDataFrame( [ (1, "John Doe", datetime(2024, 1, 1), True, datetime(2024, 12, 16)), (2, "Jane Doe", datetime(2024, 1, 2), False, datetime(2024, 12, 16)), ], StructType( [ StructField("id", StringType()), StructField("name", StringType()), StructField("datetime", StringType()), StructField("is_active", StringType()), StructField("_timestamp_ingested", TimestampType()), ] ) ) compare_df = get_schema_comparison(df1, df2) compare_df.show() +-------------------+---------------+---------------+-------+ | column_name| df1_data_type| df2_data_type|is_diff| +-------------------+---------------+---------------+-------+ | name| StringType()| StringType()| false| | date| DateType()| NULL| true| | datetime|TimestampType()| StringType()| true| | is_active| BooleanType()| StringType()| true| | id| IntegerType()| StringType()| true| |_timestamp_ingested| NULL|TimestampType()| true| +-------------------+---------------+---------------+-------+ """ validate_dataframes(df1, df2) spark = get_spark_session() # Get df1 and df2 schema df1_cols = [f.name for f in df1.schema.fields] df2_cols = [f.name for f in df2.schema.fields] all_cols = sorted(set(df1_cols + df2_cols)) # Create dataframe that compares schemas schema_data = [] for c in all_cols: df1_type = str(df1.schema[c].dataType) if c in df1_cols else None df2_type = str(df2.schema[c].dataType) if c in df2_cols else None is_diff = True if df1_type != df2_type else False schema_data.append((c, df1_type, df2_type, is_diff)) # Create the comparison dataframe compare_schema_df = spark.createDataFrame( schema_data, ["column_name", "df1_data_type", "df2_data_type", "is_diff"], ) if diff_only: # Filter only columns that are different compare_schema_df = compare_schema_df.filter(F.col("is_diff") == True) return compare_schema_df
@track_usage
[docs] def get_rows_comparison( df1: DataFrame, df2: DataFrame, columns: Optional[List[str]] = None, sample_size: Optional[int] = None, checksum_algorithm: str = "hash", ) -> Tuple[DataFrame, DataFrame]: """ Compare rows from both DataFrames and return two DataFrames with the differences. It compares rows by computing a checksum for both dataframes using either the intersection of columns from both dataframes or the columns specified in the columns parameter. Both dataframes are left-anti joined twice on the checksum column to identify differences. For large datasets, consider using the sample_size parameter to limit the comparison to a subset of rows. :param df1: The first DataFrame to compare. :param df2: The second DataFrame to compare. :param columns: A list of columns to compare. If not specified, intersection of the columns from both dataframes will be used. :param sample_size: Optional limit on the number of rows to include in the comparison. Useful for comparing large datasets. :param checksum_algorithm: Algorithm to use for checksum calculation, options are 'uuid5' or 'hash' (default). For large datasets, 'hash' is generally more efficient. :return: A tuple with two DataFrames, the first one with the rows that are in df1 but not in df2 and the second one with the rows that are in df2 but not in df1. Examples ^^^^^^^^^^^^^ .. code-block:: python # Rows comparison example with ``A.get_rows_comparison``. from pyspark.sql import functions as F, Row, DataFrame, types as T from etl_toolkit import analyses as A from pyspark.sql.types import ( StructType, StructField, IntegerType, StringType, DateType, TimestampType, BooleanType, ) from datetime import date, datetime df_schema = StructType( [ StructField("id", IntegerType()), StructField("name", StringType()), StructField("date", DateType()), StructField("datetime", TimestampType()), StructField("is_active", BooleanType()), ] ) df1 = spark.createDataFrame( [ (1, "John Doe", date(2024, 1, 1), datetime(2024, 1, 1), True), (2, "Jane Doe", date(2024, 1, 2), datetime(2024, 1, 2), False), (3, "Test Name 1", date(2024, 1, 3), datetime(2024, 1, 3), True), (4, "Test Name 2", date(2024, 1, 4), datetime(2024, 1, 4), False), ], df_schema, ) df2 = spark.createDataFrame( [ (3, "Test Name 1", date(2024, 1, 3), datetime(2024, 1, 3), True), (5, "Test Name 3", date(2024, 1, 5), datetime(2024, 1, 5), True), ], df_schema, ) leftanti_df1, leftanti_df2 = get_rows_comparison(df1, df2) leftanti_df1.show() leftanti_df2.show() +--------------------+---+-----------+----------+-------------------+---------+ | __checksum| id| name| date| datetime|is_active| +--------------------+---+-----------+----------+-------------------+---------+ |564e143e-0756-534...| 1| John Doe|2024-01-01|2024-01-01 00:00:00| true| |20d6aa5c-f281-593...| 2| Jane Doe|2024-01-02|2024-01-02 00:00:00| false| |9ccbc92c-3bd6-5d3...| 4|Test Name 2|2024-01-04|2024-01-04 00:00:00| false| +--------------------+---+-----------+----------+-------------------+---------+ +--------------------+---+-----------+----------+-------------------+---------+ | __checksum| id| name| date| datetime|is_active| +--------------------+---+-----------+----------+-------------------+---------+ |48f38d19-d5e2-52c...| 5|Test Name 3|2024-01-05|2024-01-05 00:00:00| true| +--------------------+---+-----------+----------+-------------------+---------+ """ validate_dataframes(df1, df2) # Get df1 and df2 columns df1_cols = [f.name for f in df1.schema.fields] df2_cols = [f.name for f in df2.schema.fields] if columns: # Validate that the columns exist in both dataframes missing_in_df1 = [c for c in columns if c not in df1_cols] missing_in_df2 = [c for c in columns if c not in df2_cols] if missing_in_df1: raise InvalidInputException(f"Columns {missing_in_df1} not found in df1") if missing_in_df2: raise InvalidInputException(f"Columns {missing_in_df2} not found in df2") compare_columns = columns else: # If no columns are specified, use the intersection of the columns in df1 and df2 compare_columns = list(set(df1_cols).intersection(df2_cols)) if not compare_columns: raise InvalidInputException( "No common columns found between the two DataFrames" ) # Apply sampling if requested - helps with large dataframes if sample_size is not None: if sample_size <= 0: raise InvalidInputException("Sample size must be a positive integer") df1 = df1.limit(sample_size) df2 = df2.limit(sample_size) # Add a checksum column for comparison checksum_col = "__checksum" if checksum_algorithm == "uuid5": # Cast columns to string to create the uuid5 hash df1_with_check = df1.withColumn( checksum_col, E.uuid5( [ F.coalesce(F.col(c).cast("string"), F.lit("NULL")) for c in compare_columns ] ), ) df2_with_check = df2.withColumn( checksum_col, E.uuid5( [ F.coalesce(F.col(c).cast("string"), F.lit("NULL")) for c in compare_columns ] ), ) elif checksum_algorithm == "hash": # Concatenate columns and compute a hash for better performance df1_with_check = df1.withColumn( checksum_col, F.sha2( F.concat_ws( "|", *[ F.coalesce(F.col(c).cast("string"), F.lit("NULL")) for c in compare_columns ], ), 256, ), ) df2_with_check = df2.withColumn( checksum_col, F.sha2( F.concat_ws( "|", *[ F.coalesce(F.col(c).cast("string"), F.lit("NULL")) for c in compare_columns ], ), 256, ), ) else: raise InvalidInputException( f"Invalid checksum algorithm: {checksum_algorithm}. Must be 'uuid5' or 'hash'" ) # Optimize column order for better join performance df1_optimized = A.shift_columns(df1_with_check, priority_columns=[checksum_col]) df2_optimized = A.shift_columns(df2_with_check, priority_columns=[checksum_col]) # Collect checksum values from df2 for broadcasting (optimization for large datasets) df2_checksums = df2_optimized.select(checksum_col) # Get the rows that are in df1 but not in df2 - use broadcast join hint for better performance leftanti_df1 = df1_optimized.join( F.broadcast(df2_checksums), on=[checksum_col], how="leftanti" ) # Collect checksum values from df1 for broadcasting df1_checksums = df1_optimized.select(checksum_col) # Get the rows that are in df2 but not in df1 - use broadcast join hint leftanti_df2 = df2_optimized.join( F.broadcast(df1_checksums), on=[checksum_col], how="leftanti" ) return leftanti_df1, leftanti_df2