Source code for etl_toolkit.analyses.standard_metrics.trailing_day_pivot

from pyspark.sql import DataFrame, functions as F, Window as W

from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit import E
from etl_toolkit.analyses.standard_metrics.helpers import (
    _validate_unified_kpi_input_df,
    _get_metric_configs,
)


analysis_filter = E.chain_cases(
    [
        E.case(
            F.col("source_table_granularity") == "DAY",
            E.any(
                E.all(
                    F.col("internal_dashboard_analysis_name")
                    == "day_simple_aggregate_trailing_day",
                    F.col("aggregation_type")
                    == F.col("metric_config_aggregate_function"),
                ),
                E.all(
                    F.col("internal_dashboard_analysis_name").like(
                        "day%y_growth_rate_trailing_day"
                    ),
                    F.col("aggregation_type")
                    == F.col("trailing_period_aggregate_function"),
                ),
            ),
        ),
        E.case(
            F.col("source_table_granularity") == "MONTH",
            E.any(
                F.col("internal_dashboard_analysis_name") == "month_simple_aggregate",
                F.col("internal_dashboard_analysis_name").like("month_%y_growth_rate"),
            ),
        ),
    ]
)


analysis_label = E.chain_cases(
    [
        E.case(F.col("calculation_type") == "SIMPLE_AGGREGATE", F.lit("Nominal")),
        E.case(F.col("duration") == 1, F.lit("Y/Y Growth")),
        E.case(
            E.all(F.col("growth_rate_type") == "SIMPLE", F.col("duration") > 1),
            F.concat(F.col("duration"), F.lit("Y Growth")),
        ),
        E.case(
            E.all(F.col("growth_rate_type") == "CAGR", F.col("duration") > 1),
            F.concat(F.col("duration"), F.lit("Y CAGR")),
        ),
    ],
)


@track_usage
[docs] def standard_metric_trailing_day_pivot(df: DataFrame) -> DataFrame: """ Transforms a metric's unified KPI table to generate a dataframe containing trailing day analyses. :param df: A dataframe of a metric's unified KPI table :return: DataFrame of trailing day analyses for a metric Examples ^^^^^^^^ .. code-block:: python :caption: Generating trailing day analyses for a metric. from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_txd_pivot(unified_kpi_df) display(df) +------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+ |top_level_entity_ticker |top_level_entity_name |period_start |period_end |metric_name |...|analysis_label |...|value |previous_week_txd_value |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+ |CHWY:XNYS |Chewy |2025-01-28 |2025-02-03 |Net Sales - Order Date |...|Nominal |...|233.494012 |230.214723 |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+ |CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|Y/Y Growth |...|0.040221 |0.075195 |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+ |CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|2Y CAGR |...|0.024634 |0.041169 |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+ |CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|3Y CAGR |...|0.116819 |0.043802 |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+ |CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|4Y CAGR |...|0.105718 |0.085727 |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+ """ _validate_unified_kpi_input_df(df) prev_week_window = W.partitionBy( F.col("aggregation_type"), F.col("internal_dashboard_analysis_name") ).orderBy(F.col("period_start")) sequential_index_window = W.orderBy(F.col("period_index")) prev_period_window = W.partitionBy( F.col("aggregation_type"), F.col("internal_dashboard_analysis_name") ).orderBy(F.col("sequential_index").desc()) metric_configurations = _get_metric_configs(df) filtered_df = ( df.withColumns( { "source_table_granularity": F.lit( metric_configurations["source_table_granularity"] ), "trailing_period_aggregate_function": F.lit( metric_configurations["trailing_period_aggregate_function"] ).cast("string"), "metric_config_aggregate_function": F.lit( metric_configurations["metric_config_aggregation_type"] ), "growth_rate_type": F.lit(metric_configurations["growth_rate_type"]), "duration": F.regexp_replace( F.col("internal_dashboard_analysis_name"), r"(\D)", "" ).try_cast("int"), "adjusted_period_start": F.when( F.col("source_table_granularity") == "DAY", F.dateadd(F.col("period_end"), -(F.col("trailing_period") - 1)), ).otherwise(F.col("period_start")), "analysis_label": analysis_label, "analysis_label_sort": F.coalesce(F.col("duration") + 1, F.lit(1)), "format_label": F.when( F.col("calculation_type") == "SIMPLE_AGGREGATE", F.lit("NUMBER") ).otherwise(F.lit("PERCENT")), "final_value": F.when( F.col("calculation_type") == "SIMPLE_AGGREGATE", F.col("value") / F.col("value_divisor"), ).otherwise(F.col("value")), "dashboard_analysis_name": F.when( F.col("source_table_granularity") == "DAY", F.lit("Trailing Day - Key Metrics"), ).otherwise(F.lit("End of Period - Key Metrics")), # Using max_sequential_index, always include the last 5 weeks of trailing data for daily data # or the last 6 months of trailing data for monthly data "max_sequential_index": F.when( F.col("source_table_granularity") == "DAY", 4 ).otherwise(5), } ) .where(analysis_filter) .withColumn( "previous_week_txd_value", F.when( F.col("source_table_granularity") == "DAY", F.lag(F.col("final_value"), 7).over(prev_week_window), ), ) .where( E.any( F.col("source_table_granularity") == "MONTH", (F.col("period_index") % F.col("trailing_period")) == 0, ) ) .withColumn( "sequential_index", (F.dense_rank().over(sequential_index_window)) - 1 ) .withColumn( "previous_period_value", F.lag(F.col("final_value"), 1).over(prev_period_window), ) ) txd_pivot_df = filtered_df.where( F.col("sequential_index") <= F.col("max_sequential_index") ).select( "top_level_entity_ticker", "top_level_entity_name", F.col("adjusted_period_start").alias("period_start"), "period_end", "metric_name", "analysis_name", "dashboard_analysis_name", "analysis_label", "analysis_label_sort", "format_label", F.col("final_value").alias("value"), "previous_week_txd_value", F.col("trailing_period_aggregate_function").alias( "trailing_period_aggregation_type" ), "currency", "value_divisor", "trailing_period", "sequential_index", F.current_timestamp().alias("publication_timestamp"), F.col("metric_config_aggregate_function").alias("qtd_agg_type"), F.col("aggregation_type").alias("analysis_agg_type"), "previous_period_value", F.col("source_table_granularity").alias("data_source_table_grain"), "internal_metric_id", ) return txd_pivot_df