Source code for etl_toolkit.analyses.standard_metrics.daily_ptd_progress

from pyspark.sql import functions as F, DataFrame, Window as W

from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit.analyses.standard_metrics.helpers import (
    _validate_unified_kpi_input_df,
    _get_metric_configs,
    _get_latest_growth_rates,
    _get_date_bounds,
    _get_filtered_calendar_df,
    _get_previous_year_columns,
)


def _get_base_df(
    df: DataFrame,
    metric_aggregation_type: str = None,
) -> DataFrame:
    """Generate base table with analysis filtered to daily aggregate."""
    base_df = df.where(
        F.col("internal_dashboard_analysis_name") == "day_simple_aggregate"
    ).where(F.col("aggregation_type") == metric_aggregation_type)

    return base_df


def _get_txd_base_df(df: DataFrame) -> DataFrame:
    """Generate base table with analysis filtered to daily trailing day aggregate."""
    txd_base_df = (
        df.where(
            F.col("internal_dashboard_analysis_name")
            == "day_simple_aggregate_trailing_day"
        )
        .where(
            F.col("aggregation_type") == "AVG"
        )  # Daily Comparable will always use AVG to aggregate
        .select("period_start", F.col("value").alias("txd_value"))
    )

    return txd_base_df


def _get_ptd_sales_expression(metric_aggregation_type: str, granularity: str):
    """Generate period-to-date (PTD) sales expression from daily periodicity."""
    # Determine period prefix and partition columns based on granularity
    period_prefix = "qtd"  # Quarter-to-date by default
    partition_by_columns = [
        "quarter_start_period_start",
        "quarter_end_period_start",
    ]

    # For half-year reporting, use different prefix and partition columns
    if granularity == "HALF_YEAR":
        period_prefix = "ptd"
        partition_by_columns = ["report_period_start"]

    # Create window specification for cumulative calculation within period
    ptd_sales_window = (
        W.partitionBy(*partition_by_columns)
        .orderBy("period_start")
        .rowsBetween(W.unboundedPreceding, W.currentRow)
    )

    # Use appropriate aggregation function based on metric type
    aggregate_function = F.avg if metric_aggregation_type == "AVG" else F.sum

    # Return the window expression with appropriate alias
    return (
        aggregate_function(F.col("value"))
        .over(ptd_sales_window)
        .alias(f"{period_prefix}_sales")
    )


def _join_ptd_sales_df(
    df: DataFrame,
    metric_calendar_type: str,
    metric_max_relevant_years: int,
    granularity: str,
) -> DataFrame:
    """
    Iterate through relevant years and join previous years' data to enable year-over-year
    comparisons of period-to-date performance. Handles different calendar types and
    leap year adjustments by determining if PTD should be calculated using:
    - date-to-date comparison (exact calendar dates)
    - period day-to-period day comparison (relative position in period)
    """
    # Set up variables based on granularity (quarter or half-year)
    period_prefix = "qtd"  # Quarter-to-date by default
    days_in_column = "days_in_quarter"

    # For quarter-end dates, use the total quarter sales instead of QTD sales
    ptd_sales_previous_year_expression = F.when(
        F.col("end_of_quarter") == 1, F.col("total_quarter_sales")
    ).otherwise(F.col(f"{period_prefix}_sales"))

    # For half-year reporting, use different column names and expressions
    if granularity == "HALF_YEAR":
        period_prefix = "ptd"
        days_in_column = "days_in_report"
        ptd_sales_previous_year_expression = F.col(f"{period_prefix}_sales")

    # Determine join columns based on calendar type
    # 52_WEEK calendar joins on year, quarter, and days in period
    # EXACT_N_YEARS calendar joins on exact calendar date from previous year
    if metric_calendar_type == "52_WEEK":
        join_columns = ["join_year", "quarter", days_in_column]
    else:
        join_columns = ["join_date"]

    # Process each previous year up to the maximum relevant years
    for year_previous in range(1, metric_max_relevant_years + 1):
        # Create join columns for the current iteration
        df = df.withColumns(
            {
                "join_year": F.col("year"),
                "join_date": F.col("period_start")
                - F.expr(f"INTERVAL {year_previous} YEAR"),
            }
        )

        # Create a DataFrame with previous year's data to join
        join_df = df.select(
            days_in_column,
            F.col("period_start").alias("join_date"),
            (F.col("year") + year_previous).alias("join_year"),
            "quarter",
            F.col("value").alias(f"value_previous_{year_previous}_year"),
            F.col("txd_value").alias(f"txd_value_previous_{year_previous}_year"),
            ptd_sales_previous_year_expression.alias(
                f"{period_prefix}_sales_previous_{year_previous}_year"
            ),
        ).select(
            *join_columns,
            f"value_previous_{year_previous}_year",
            f"txd_value_previous_{year_previous}_year",
            f"{period_prefix}_sales_previous_{year_previous}_year",
        )

        df = df.join(join_df, join_columns, "left").drop("join_date", "join_year")

    if metric_max_relevant_years == 1:
        # If these is only 1 previous year, automatically add the second year for daily progress previous y/y calculation
        df = df.withColumn(
            "join_date", F.col("period_start") - F.expr("INTERVAL 2 YEAR")
        )

        join_df = df.select(
            F.col("period_start").alias("join_date"),
            (F.col("year") + 2).alias("join_year"),
            "quarter",
            days_in_column,
            F.col(f"{period_prefix}_sales").alias(
                f"{period_prefix}_sales_previous_2_year"
            ),
        )

        df = df.join(join_df, join_columns, "left").drop("join_date", "join_year")

    return df


def _get_previous_week_df(
    df: DataFrame,
    metric_calendar_type: str,
    metric_max_relevant_years: int,
    granularity: str,
) -> DataFrame:
    period_prefix = "qtd"
    period_index_column = "quarter_period_index"
    period_index_window = W.orderBy(F.col("quarter_start_period_start").desc())
    if granularity == "HALF_YEAR":
        period_prefix = "ptd"
        period_index_column = "report_period_index"
        period_index_window = W.orderBy(F.col("report_period_start").desc())

    ptd_sales_df = _join_ptd_sales_df(
        df, metric_calendar_type, metric_max_relevant_years, granularity
    )

    # Filldown PTD values from previous years for retail calendars that have an
    # extra week every 6 years. Only fill PTD values for the 2 years prior for PTD chart
    ptd_filldown_df = ptd_sales_df.withColumns(
        {
            f"{period_prefix}_sales_previous_1_year": F.first_value(
                F.col(f"{period_prefix}_sales_previous_1_year")
            ).over(
                W.partitionBy(
                    F.count(F.col(f"{period_prefix}_sales_previous_1_year")).over(
                        W.orderBy("period_start")
                    )
                ).orderBy("period_start")
            ),
            f"{period_prefix}_sales_previous_2_year": F.first_value(
                F.col(f"{period_prefix}_sales_previous_2_year")
            ).over(
                W.partitionBy(
                    F.count(F.col(f"{period_prefix}_sales_previous_2_year")).over(
                        W.orderBy("period_start")
                    )
                ).orderBy("period_start")
            ),
        }
    )

    # Calculate current and previous year growth
    growth_df = ptd_filldown_df.withColumns(
        {
            period_index_column: (
                (F.dense_rank().over(period_index_window)) - 1
            ),  # Assign perid index column for final filtering
            f"{period_prefix}_y_y_growth_current": F.try_subtract(
                F.try_divide(
                    F.col(f"{period_prefix}_sales"),
                    F.col(f"{period_prefix}_sales_previous_1_year"),
                ),
                F.lit(1),
            ),
            f"{period_prefix}_y_y_growth_previous_1_year": F.try_subtract(
                F.try_divide(
                    F.col(f"{period_prefix}_sales_previous_1_year"),
                    F.col(f"{period_prefix}_sales_previous_2_year"),
                ),
                F.lit(1),
            ),
        }
    )

    # Derive T7D growth for comparison to current day
    previous_week_df = growth_df.withColumn(
        f"{period_prefix}_y_y_growth_previous_1_week",
        F.lag(F.col(f"{period_prefix}_y_y_growth_current"), 7).over(
            W.orderBy("period_start")
        ),
    )

    return previous_week_df


def _get_current_values(df: DataFrame, granularity: str) -> dict:
    """Isolate the current fiscal year for use in extrapolation."""
    days_in_column = (
        "days_in_report" if granularity == "HALF_YEAR" else "days_in_quarter"
    )

    return (
        df.where(F.col("period_index") == 0)
        .select(
            F.col("year").alias("current_fiscal_year"),
            F.col(days_in_column).alias("current_days_in_period"),
        )
        .first()
        .asDict()
    )


def _get_ptd_progress_df(
    df: DataFrame,
    metric_max_relevant_years: int,
    granularity: str,
    metric_configs: dict,
) -> DataFrame:
    """Format the final PTD progress DataFrame."""

    period_prefix = "qtd"
    period_index_column = "quarter_period_index"
    days_in_column = "days_in_quarter"
    period_columns = [
        F.col("quarter_start_period_start").alias("quarter_start"),
        F.col("quarter_end_period_start").alias("quarter_end"),
        period_index_column,
    ]

    if granularity == "HALF_YEAR":
        period_prefix = "ptd"
        period_index_column = "report_period_index"
        days_in_column = "days_in_report"
        period_columns = [
            "report_period_start",
            "report_period_end",
            period_index_column,
            "report_label",
        ]

    previous_year_columns = _get_previous_year_columns(
        metric_max_relevant_years, granularity
    )

    current_values = _get_current_values(df, granularity)

    metric_max_relevant_years = int(metric_configs["max_relevant_years"])
    metric_relative_year_comparisons = list(range(1, metric_max_relevant_years + 1))

    configs_df = df.withColumns(
        {
            "value_divisor": F.lit(metric_configs["value_divisor"]),
            "aggregation_type": F.lit(metric_configs["aggregation_type"]),
            "metric_name": F.lit(metric_configs["metric_name"]),
            "top_level_entity_ticker": F.lit(
                metric_configs["top_level_entity_ticker"]
            ).cast("string"),
            "top_level_entity_name": F.lit(metric_configs["top_level_entity_name"]),
            "metric_aggregate_sql_operator": F.lit(
                metric_configs["metric_config_aggregation_type"]
            ),
            "metric_trailing_period_aggregate_sql_operator": F.lit(
                metric_configs["trailing_period_aggregate_function"]
            ).cast("string"),
            "metric_trailing_period_length": F.lit(
                metric_configs["trailing_period_length"]
            ).cast("int"),
            "metric_calendar_type": F.lit(metric_configs["calendar_type"]),
            "metric_growth_rate_type": F.lit(metric_configs["growth_rate_type"]),
            "metric_va_id": F.lit(metric_configs["visible_alpha_id"]).cast("int"),
            "metric_relative_year_comparisons": F.lit(metric_relative_year_comparisons),
            "current_fiscal_year": F.lit(current_values["current_fiscal_year"]),
            f"current_{days_in_column}": F.lit(
                current_values["current_days_in_period"]
            ),
            "internal_metric_id": F.lit(metric_configs["internal_metric_id"]).cast(
                "int"
            ),
        }
    )

    ptd_progress_df = configs_df.select(
        "period_start",
        "period_end",
        *period_columns,
        F.concat(F.col("quarter"), F.col("year")).alias("quarter_year_label"),
        F.col("quarter").alias("quarter_label"),
        F.col("year").alias("fiscal_year"),
        "analysis_name",
        F.lit("Daily Progress and Comparable").alias("dashboard_analysis_name"),
        f"{period_prefix}_y_y_growth_current",
        f"{period_prefix}_y_y_growth_previous_1_week",
        f"{period_prefix}_y_y_growth_previous_1_year",
        "value",
        "txd_value",
        f"{period_prefix}_sales",
        *previous_year_columns,
        "value_divisor",
        "aggregation_type",
        "metric_name",
        "top_level_entity_ticker",
        "top_level_entity_name",
        "metric_aggregate_sql_operator",
        "metric_trailing_period_aggregate_sql_operator",
        "metric_trailing_period_length",
        "metric_calendar_type",
        "metric_growth_rate_type",
        "metric_va_id",
        "metric_relative_year_comparisons",
        "current_fiscal_year",
        f"current_{days_in_column}",
        F.when(
            ((F.col(days_in_column) <= 5) & (F.col(f"current_{days_in_column}") >= 6))
            | ((F.col(days_in_column) <= 5) & (F.col(period_index_column) > 0)),
            F.lit(0),
        )
        .otherwise(F.lit(1))
        .alias("hide_first_week"),
        F.col("period_index").alias("sequential_index"),
        F.current_timestamp().alias("publication_timestamp"),
        "internal_metric_id",
    )

    return ptd_progress_df


@track_usage
[docs] def standard_metric_daily_qtd_progress( df: DataFrame, calendar_df: DataFrame ) -> DataFrame: """ Transforms a metric's unified KPI table to generate a dataframe containing daily analyses through the quarter and the comparable daily analyses in prior years. :param df: A dataframe of a metric's unified KPI table :param calendar_df: Calendar dataframe :return: DataFrame of daily QTD analyses for a metric Examples ^^^^^^^^ .. code-block:: python :caption: Generating daily quarter-to-date progress analyses for a metric. from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_daily_qtd_progress(unified_kpi_df, calendar_df) display(df) +-------------+-----------+--------------+------------+---+-----------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------+----------------------+---+ |period_start |period_end |quarter_start |quarter_end |...|qtd_y_y_growth_current |qtd_y_y_growth_previous_1_week |qtd_y_y_growth_previous_1_year |value |txd_value |qtd_sales |value_previous_1_year |...| +-------------+-----------+--------------+------------+---+-----------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------+----------------------+---+ |2025-01-01 |2025-01-01 |2024-10-28 |2025-02-02 |...|0.075278 |0.093199 |0.023156 |4555616.740619 |25919798.957255 |2168150987.259101 |35547740.244841 |...| +-------------+-----------+--------------+------------+---+-----------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------+----------------------+---+ |2025-01-02 |2025-01-02 |2024-10-28 |2025-02-02 |...|0.070494 |0.081441 |0.025001 |30841420.191023 |28067818.239408 |2198992407.450124 |37821651.385287 |...| +-------------+-----------+--------------+------------+---+-----------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------+----------------------+---+ |2025-01-03 |2025-01-03 |2024-10-28 |2025-02-02 |...|0.069268 |0.078758 |0.026635 |37438227.272619 |28563479.426870 |2236430634.722743 |37368904.542899 |...| +-------------+-----------+--------------+------------+---+-----------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------+----------------------+---+ """ # Validate input DataFrame has required columns and structure _validate_unified_kpi_input_df(df) granularity = "DAY" # Extract metric configuration from the input DataFrame metric_configs = _get_metric_configs(df) # Get the metric's aggregation type (SUM or AVG), calendar type, and max relevant years metric_aggregation_type = metric_configs["metric_config_aggregation_type"] metric_calendar_type = metric_configs["calendar_type"] metric_max_relevant_years = int(metric_configs["max_relevant_years"]) # Get the latest growth rates for comparison latest_growth_rate_col = _get_latest_growth_rates(df, granularity) # Filter to daily aggregates with the specified aggregation type base_df = _get_base_df(df, metric_aggregation_type) # Get trailing day aggregates txd_base_df = _get_txd_base_df(df) # Generate base calendar with full quarter dates for extrapolation base_calendar_df = calendar_df.select( F.col("day").alias("period_start"), F.col("day").alias("period_end"), "quarter_period_start", "quarter_period_end", F.substring(F.col("quarter_label"), 1, 2).alias("quarter"), "days_in_quarter", F.year( F.coalesce(F.col("custom_year_of_quarter"), F.col("quarter_period_start")) ).alias("year"), ) date_bounds = _get_date_bounds(base_df, granularity) filtered_calendar_df = _get_filtered_calendar_df(base_calendar_df, date_bounds) # Join base table to calendar table with min/max date restrictions daily_sales_df = filtered_calendar_df.join( base_df.drop("period_end"), "period_start", "left" ) ptd_sales_expression = _get_ptd_sales_expression( metric_aggregation_type, granularity ) # Get aggregate_function to use for total_quarter_sales aggregate_function = F.avg if metric_aggregation_type == "AVG" else F.sum ptd_sales_df = ( daily_sales_df.join(txd_base_df, "period_start", "left") .select( "period_start", "period_end", "analysis_name", "quarter", "year", "days_in_quarter", F.coalesce( F.col("quarter_start_period_start"), F.col("quarter_period_start") ).alias("quarter_start_period_start"), "quarter_end_period_start", "quarter_label_period_end", "period_index", "value", "txd_value", ptd_sales_expression, F.when(F.col("period_end") == F.col("quarter_end_period_start"), F.lit(1)) .otherwise(F.lit(0)) .alias("end_of_quarter"), ) .withColumns(latest_growth_rate_col) .withColumn( "total_quarter_sales", aggregate_function(F.col("value")).over( W.partitionBy( F.col("quarter_start_period_start"), F.col("quarter_end_period_start"), ) ), ) ) previous_week_df = _get_previous_week_df( ptd_sales_df, metric_calendar_type, metric_max_relevant_years, granularity ) filtered_df = previous_week_df.where(F.col("quarter_period_index") <= 1) ptd_progress_df = _get_ptd_progress_df( filtered_df, metric_max_relevant_years, granularity, metric_configs ) return ptd_progress_df
@track_usage
[docs] def standard_metric_half_year_progress( df: DataFrame, calendar_df: DataFrame ) -> DataFrame: """ Transforms a metric's unified KPI table to generate a dataframe containing daily analyses through the half year and the comparable daily analyses in prior years. :param df: A dataframe of a metric's unified KPI table :param calendar_df: Calendar dataframe :return: DataFrame of daily half year progress analyses for a metric Examples ^^^^^^^^ .. code-block:: python :caption: Generating daily half year progress analyses for a metric. from etl_toolkit import A input_df = spark.table("yd_production.itx_reported.itx_gmv_es") calendar_df = spark.table("yd_fp_investor_audit.itx_xmad_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Inditex", top_level_entity_ticker="ITX:XMAD", exchange=None, entity_name=None, figi=None, ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Spain Net Sales", company_comparable_kpi=True, uses_va_for_actuals=False, display_period_granularity="DAY", report_period_granularity="HALF_YEAR", currency="EUR", value_divisor=1000000, visible_alpha_id=None, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="value", source_input_date_column="date", source_table_granularity="DAY", aggregate_function="SUM", max_relevant_years=2, growth_rate_type="CAGR", calendar_type="EXACT_N_YEARS", source_table_filter_conditions=None, slice_columns=None, trailing_period_length=7, trailing_period_aggregate_function="AVG", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_half_year_progress(unified_kpi_df, calendar_df) display(df) +-------------+-----------+--------------------+------------------+---+-----------------------+-------------------------------+-------------------------------+----------------+----------------+------------------+----------------------+---+ |period_start |period_end |report_period_start |report_period_end |...|ptd_y_y_growth_current |ptd_y_y_growth_previous_1_week |ptd_y_y_growth_previous_1_year |value |txd_value |ptd_sales |value_previous_1_year |...| +-------------+-----------+--------------------+------------------+---+-----------------------+-------------------------------+-------------------------------+----------------+----------------+------------------+----------------------+---+ |2025-01-01 |2025-01-01 |2024-08-01 |2025-01-31 |...|0.090465 |0.093223 |0.126094 |2425970.385558 |19608148.865366 |2517065314.305471 |3136880.539406 |...| +-------------+-----------+--------------------+------------------+---+-----------------------+-------------------------------+-------------------------------+----------------+----------------+------------------+----------------------+---+ |2025-01-02 |2025-01-02 |2024-08-01 |2025-01-31 |...|0.090634 |0.091252 |0.126692 |30053304.417443 |20729009.931905 |2547118618.722914 |27198458.331024 |...| +-------------+-----------+--------------------+------------------+---+-----------------------+-------------------------------+-------------------------------+----------------+----------------+------------------+----------------------+---+ |2025-01-03 |2025-01-03 |2024-08-01 |2025-01-31 |...|0.091695 |0.091673 |0.126264 |31495293.875604 |21798167.567119 |2578613912.598518 |26579190.577483 |...| +-------------+-----------+--------------------+------------------+---+-----------------------+-------------------------------+-------------------------------+----------------+----------------+------------------+----------------------+---+ """ # Validate input DataFrame has required columns and structure _validate_unified_kpi_input_df(df) granularity = "HALF_YEAR" # Extract metric configuration from the input DataFrame metric_configs = _get_metric_configs(df) # Get the metric's aggregation type (SUM or AVG), calendar type, and max relevant years metric_aggregation_type = metric_configs["metric_config_aggregation_type"] metric_calendar_type = metric_configs["calendar_type"] metric_max_relevant_years = int(metric_configs["max_relevant_years"]) # Get the latest growth rates for comparison latest_growth_rate_col = _get_latest_growth_rates(df, granularity) # Filter to daily aggregates with the specified aggregation type base_df = _get_base_df(df, metric_aggregation_type) # Get trailing day aggregates txd_base_df = _get_txd_base_df(df) # Generate base calendar with full period dates for extrapolation base_calendar_df = calendar_df.select( F.col("day").alias("period_start"), F.col("day").alias("period_end"), F.substring(F.col("quarter_label"), 1, 2).alias("quarter"), F.right(F.col("year_label"), F.lit(4)).alias("year"), F.col("half_year_period_start").alias("report_period_start"), F.col("half_year_period_end").alias("report_period_end"), F.col("days_in_half_year").alias("days_in_report"), F.col("half_year_label").alias("report_label"), ) date_bounds = _get_date_bounds(base_df, granularity) filtered_calendar_df = _get_filtered_calendar_df(base_calendar_df, date_bounds) # Join base table to calendar table with min/max date restrictions daily_sales_df = filtered_calendar_df.join( base_df.drop("period_end"), "period_start", "left" ) ptd_sales_expression = _get_ptd_sales_expression( metric_aggregation_type, granularity ) ptd_sales_df = ( daily_sales_df.join(txd_base_df, "period_start", "left") .select( "period_start", "period_end", "analysis_name", "quarter", "year", "days_in_report", "report_period_start", "report_period_end", "period_index", "report_label", "value", "txd_value", ptd_sales_expression, ) .withColumns(latest_growth_rate_col) ) previous_week_df = _get_previous_week_df( ptd_sales_df, metric_calendar_type, metric_max_relevant_years, granularity, ) filtered_df = previous_week_df.where(F.col("report_period_index") <= 1) ptd_progress_df = _get_ptd_progress_df( filtered_df, metric_max_relevant_years, granularity, metric_configs ).drop( "internal_metric_id" ) # internal_metric_id was never added to half_year_progress template return ptd_progress_df