Source code for etl_toolkit.analyses.standard_metrics.quarter_month_pivot

from pyspark.sql import DataFrame, functions as F

from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit import E
from etl_toolkit.analyses.standard_metrics.helpers import (
    _validate_unified_kpi_input_df,
    _get_metric_configs,
)


analysis_label = E.chain_cases(
    [
        E.case(F.col("calculation_type") == "SIMPLE_AGGREGATE", F.lit("Nominal")),
        E.case(F.col("duration") == 1, F.lit("Y/Y Growth")),
        E.case(
            E.all(
                F.col("calculation_type") == "GROWTH_RATE",
                F.col("growth_rate_type") == "SIMPLE",
                F.col("duration") > 1,
            ),
            F.concat(F.col("duration"), F.lit("Y Growth")),
        ),
        E.case(
            E.all(
                F.col("calculation_type") == "GROWTH_RATE",
                F.col("growth_rate_type") == "CAGR",
                F.col("duration") > 1,
            ),
            F.concat(F.col("duration"), F.lit("Y CAGR")),
        ),
    ],
)

fiscal_period_label = E.chain_cases(
    [
        E.case(
            E.all(
                F.col("source_table_granularity") == "DAY",
                F.col("periodicity") == "MONTH",
                F.col("period_index") == 0,
                # For the current period (period_index == 0), the fiscal period should be labeled with
                # MTD unless the max date is the last day of the month (data_through == max_month)
                F.col("data_through") != F.col("max_month"),
            ),
            F.concat(
                F.lit("MTD "),
                F.month(F.col("period_end")),
                F.lit("/"),
                F.day(F.col("period_end")),
            ),
        ),
        E.case(
            E.all(
                F.col("source_table_granularity") == "MONTH",
                F.col("periodicity") == "MONTH",
                F.col("period_index") == 0,
            ),
            F.concat(
                F.monthname(F.col("period_end")),
                F.lit("-"),
                F.date_format(F.col("period_end"), "yy"),
            ),
        ),
        E.case(
            E.all(
                F.col("periodicity") == "QUARTER",
                F.col("period_index") == 0,
                # For the current period (period_index == 0), the fiscal period should be labeled with
                # QTD unless the max date is the last day of the quarter (data_through == max_quarter)
                F.col("data_through") != F.col("max_quarter"),
            ),
            F.concat(
                F.lit("QTD "),
                F.month(F.col("period_end")),
                F.lit("/"),
                F.day(F.col("period_end")),
            ),
        ),
        E.case(
            E.all(
                F.col("periodicity") == "HALF_YEAR",
                F.col("period_index") == 0,
            ),
            F.concat(
                F.lit("PTD "),
                F.month(F.col("period_end")),
                F.lit("/"),
                F.day(F.col("period_end")),
            ),
        ),
        E.case(
            E.all(
                F.col("periodicity") == "MONTH",
                # When it is not the current period (period_index > 0) or the max date is
                # the last day of the month (data_through == max_month), the fiscal period
                # should use the monthly label (e.g. Mar-25) instead of the MTD label
                E.any(
                    F.col("period_index") > 0,
                    F.col("data_through") == F.col("max_month"),
                ),
            ),
            F.concat(
                F.monthname(F.col("period_end")),
                F.lit("-"),
                F.date_format(F.col("period_end"), "yy"),
            ),
        ),
        E.case(
            E.all(
                F.col("periodicity") == "QUARTER",
                # When it is not the current period (period_index > 0) or the max date is
                # the last day of the quarter (data_through == max_quarter), the fiscal period
                # should use the quarterly (e.g. 1Q25) label instead of the QTD label
                E.any(
                    F.col("period_index") > 0,
                    F.col("data_through") == F.col("max_quarter"),
                ),
            ),
            F.col("quarter_label_period_end"),
        ),
        E.case(
            E.all(
                F.col("periodicity") == "HALF_YEAR",
                F.col("period_index") > 0,
            ),
            F.col("half_year_label"),
        ),
    ]
)

fiscal_period_sort = E.chain_cases(
    [
        E.case(
            E.all(
                F.col("periodicity").isin(["QUARTER", "HALF_YEAR"]),
                F.col("period_index") == 1,
            ),
            1,
        ),
        E.case(E.all(F.col("periodicity") == "MONTH", F.col("period_index") == 2), 2),
        E.case(E.all(F.col("periodicity") == "MONTH", F.col("period_index") == 1), 3),
        E.case(E.all(F.col("periodicity") == "MONTH", F.col("period_index") == 0), 4),
        E.case(E.all(F.col("periodicity") == "QUARTER", F.col("period_index") == 0), 5),
        E.case(
            E.all(F.col("periodicity") == "HALF_YEAR", F.col("period_index") == 0), 6
        ),
    ]
)

# Use PTD analyses for the current period (period_index=0) and non-PTD analyses for everything else
combine_ptd_and_full_period_filter = E.any(
    E.all(
        F.col("period_index") == 0,
        F.col("periodicity").isin(["MONTH", "HALF_YEAR"]),
        F.col("internal_dashboard_analysis_name").endswith("period_to_date"),
    ),
    E.all(
        F.col("period_index") == 0,
        F.col("internal_dashboard_analysis_name").like("quarter%period_to_date"),
        F.col("data_through")
        < F.col("max_quarter"),  # Needed to handle 90/97 fiscal quarter issue
    ),
    E.all(
        F.col("period_index") == 0,
        F.col("periodicity") == "QUARTER",
        ~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
        F.col("data_through")
        == F.col("max_quarter"),  # Needed to handle 90/97 fiscal quarter issue
    ),
    E.all(
        F.col("period_index") > 0,
        ~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
    ),
    E.all(
        F.col("period_index") == 0,
        F.col("source_table_granularity") == "MONTH",
        F.col("periodicity") == "MONTH",
        ~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
    ),
)


def _get_data_through(df: DataFrame) -> dict:
    """Get the data through date for metric breakdowns. This is needed to account for 90/97 fiscal quarters."""
    max_period_end_df = (
        df.where(F.col("internal_dashboard_analysis_name").like("%simple_aggregate"))
        .groupBy("periodicity")
        .agg(F.max("period_end").alias("max_period_end"))
        .withColumn(
            "temp_periodicity", F.concat(F.lit("max_"), F.lower(F.col("periodicity")))
        )
        .groupBy()
        .pivot("temp_periodicity")
        .agg(F.first("max_period_end"))
    )

    max_date_col = ["max_day", "max_month", "max_quarter"]
    for col in max_date_col:
        if col not in max_period_end_df.columns:
            max_period_end_df = max_period_end_df.withColumn(col, F.lit(None))

    data_through_dict = (
        max_period_end_df.withColumn(
            "data_through",
            F.coalesce(F.col("max_day"), F.col("max_month"), F.col("max_quarter")),
        )
        .first()
        .asDict()
    )

    return data_through_dict


@track_usage
[docs] def standard_metric_quarter_month_pivot(df: DataFrame) -> DataFrame: """ Transforms a metric's unified KPI table to generate a dataframe containing MTD and QTD analyses. :param df: A dataframe of a metric's unified KPI table :return: DataFrame of MTD and QTD analyses for a metric Examples ^^^^^^^^ .. code-block:: python :caption: Generating MTD and QTD analyses for a metric. from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_quarter_month_pivot(unified_kpi_df) display(df) +------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+ |top_level_entity_ticker |top_level_entity_name |period_start |period_end |metric_name |...|periodicity |analysis_label |...|value |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+ |CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|QUARTER |Nominal |...|3252.308829 |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+ |CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|QUARTER |Y/Y Growth |...|0.152283 |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+ |CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|QUARTER |2Y CAGR |...|0.094586 |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+ |CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|QUARTER |3Y CAGR |...|0.106234 |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+ |CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|QUARTER |4Y CAGR |...|0.123719 |...| +------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+ """ _validate_unified_kpi_input_df(df) data_through_dict = _get_data_through(df) metric_configurations = _get_metric_configs(df) filtered_df = ( df.where(F.col("periodicity") != "DAY") # Filter to quarterly and monthly data .where(F.col("slice_name_1").isNull()) # Exclude sliced analyses .withColumns( { "source_table_granularity": F.lit( metric_configurations["source_table_granularity"] ), "metric_config_aggregate_function": F.lit( metric_configurations["metric_config_aggregation_type"] ), "growth_rate_type": F.lit(metric_configurations["growth_rate_type"]), "duration": F.regexp_replace( F.col("internal_dashboard_analysis_name"), r"(\D)", "" ).try_cast("int"), "data_through": F.lit(data_through_dict["data_through"]), "max_quarter": F.lit(data_through_dict["max_quarter"]), "max_month": F.lit(data_through_dict["max_month"]), "analysis_label": analysis_label, "analysis_label_sort": F.coalesce(F.col("duration") + 1, F.lit(1)), "fiscal_period_label": fiscal_period_label, "fiscal_period_sort": fiscal_period_sort, "format_label": F.when( F.col("calculation_type") == "SIMPLE_AGGREGATE", F.lit("NUMBER") ).otherwise(F.lit("PERCENT")), "final_value": F.when( F.col("calculation_type") == "SIMPLE_AGGREGATE", F.col("value") / F.col("value_divisor"), ).otherwise(F.col("value")), } ) .where(F.col("metric_config_aggregate_function") == F.col("aggregation_type")) .where(combine_ptd_and_full_period_filter) ) pivot_df = ( filtered_df # Filter on the most recent full quarter and QTD (period_index=0,1,2) # When applicable, the pivot table looks at the most recent full two months and MTD .where(F.col("period_index").isin([0, 1, 2])).select( "top_level_entity_ticker", "top_level_entity_name", "period_start", "period_end", "metric_name", F.col("visible_alpha_id").alias("metric_va_id"), "analysis_name", F.lit("Key Metrics").alias("dashboard_analysis_name"), "periodicity", "analysis_label", "analysis_label_sort", "fiscal_period_label", "fiscal_period_sort", "format_label", F.col("final_value").alias("value"), "currency", "value_divisor", "quarter_label_period_end", F.col("period_index").alias("sequential_index"), F.current_timestamp().alias("publication_timestamp"), F.col("source_table_granularity").alias("data_source_table_grain"), "internal_metric_id", ) ) return pivot_df