Source code for etl_toolkit.analyses.standard_metrics.metric_feed

from pyspark.sql import SparkSession
from pyspark.sql import functions as F, DataFrame

from yipit_databricks_utils.helpers.telemetry import track_usage
from yipit_databricks_client import get_spark_session

from etl_toolkit import E
from etl_toolkit.analyses.standard_metrics.config import entity_configuration
from etl_toolkit.analyses.standard_metrics.helpers import (
    _validate_unified_kpi_input_df,
    FEED_SLICE_COLUMN_COUNT,
    FEED_SLICE_NAME_VALUE_COLUMNS,
    METRIC_FEED_ID_MAPPING_TABLE,
    _get_data_through,
    _get_relevant_periodicities,
)


# Create human-readable periodicity labels for different analysis types
# This maps technical analysis names to user-friendly descriptions:
# - Daily values (no trailing period)
# - Weekly values (7-day trailing sum)
# - MTD (month-to-date) with year offset
# - QTD (quarter-to-date) with year offset
# - Monthly and Quarterly values
periodicity_root = E.chain_cases(
    [
        E.case(
            E.all(F.col("periodicity") == "DAY", F.col("trailing_period").isNull()),
            F.lit("daily"),
        ),
        E.case(
            E.all(F.col("trailing_period") == 7, F.col("aggregation_type") == "SUM"),
            F.lit("weekly"),
        ),
        E.case(
            F.col("internal_dashboard_analysis_name").like("month_%_period_to_date%"),
            F.lit("MTD"),
        ),
        E.case(
            F.col("internal_dashboard_analysis_name").like("quarter_%_period_to_date%"),
            F.lit("QTD"),
        ),
        E.case(
            E.all(
                F.col("periodicity").isin(["MONTH", "QUARTER"]),
                ~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
            ),
            F.lower(F.concat(F.col("periodicity"), F.lit("ly"))),
        ),
    ],
)

year_label = E.chain_cases(
    [
        E.case(
            F.col("calculation_type") == "GROWTH_RATE",
            F.upper(F.split(F.col("internal_dashboard_analysis_name"), "_")[1]),
        ),
        E.case(
            E.all(
                F.col("calculation_type") == "SIMPLE_AGGREGATE",
                F.col("periodicity_root") == "MTD",
            ),
            F.concat(F.lit("Y-"), (F.col("period_index") / 12).cast("int")),
        ),
        E.case(
            E.all(
                F.col("calculation_type") == "SIMPLE_AGGREGATE",
                F.col("periodicity_root") == "QTD",
            ),
            F.concat(F.lit("Y-"), (F.col("period_index") / 4).cast("int")),
        ),
    ]
)

growth_rate_label = E.chain_cases(
    [E.case(F.col("growth_rate_type") == "CAGR", F.lit("CAGR"))],
    otherwise=F.lit("Growth"),
)

periodicity_label = E.chain_cases(
    [
        E.case(
            E.all(
                F.col("calculation_type") == "SIMPLE_AGGREGATE",
                F.col("periodicity_root").like("%TD"),
            ),
            F.concat_ws(" ", F.col("periodicity_root"), year_label),
        ),
        E.case(
            F.col("calculation_type") == "GROWTH_RATE",
            F.concat(
                year_label,
                F.lit(" "),
                growth_rate_label,
                F.lit(" - "),
                F.col("periodicity_root"),
            ),
        ),
    ],
    otherwise=F.col("periodicity_root"),
)

# Filter to select only the most relevant periods for the feed
# This includes:
# - Daily values (most recent)
# - Non-daily values that aren't the current period (period_index != 0)
# - Month-to-date values at yearly intervals
# - Quarter-to-date values at yearly intervals
# - Weekly values (7-day trailing) at weekly intervals
period_index_filter = E.any(
    [
        F.col("internal_dashboard_analysis_name").isin(
            ["day_simple_aggregate", "day_simple_aggregate_sliced_data"]
        ),
        E.all(
            F.col("periodicity") != "DAY",
            ~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
            E.any(
                F.col("period_index") != 0,
                F.col("periodicity") == F.col("source_table_granularity"),
                # Include the non-PTD for the current period if the last day in the period
                # is the max date in the data. This means the current PTD is the full period
                F.col("period_end") == F.col("data_through"),
            ),
        ),
        E.all(
            F.col("internal_dashboard_analysis_name").like("month_%_period_to_date%"),
            (F.col("period_index") % 12) == 0,
        ),
        E.all(
            F.col("internal_dashboard_analysis_name").like("quarter_%_period_to_date%"),
            (F.col("period_index") % 4) == 0,
        ),
        E.all(  # weekly analyses are trailing 7 day sums
            F.col("trailing_period") == 7,
            F.col("aggregation_type") == "SUM",
            (F.col("period_index") % 7) == 0,
        ),
    ]
)

# Exclude quarterly data for metrics that use VA data as part of their product methodology
uses_va_for_actuals_filter = E.all(
    F.col("uses_va_for_actuals"),
    F.col("periodicity") == "QUARTER",
)

# Filter out daily data for metrics with a weekly display granularity
# This prevents showing too granular data for metrics that should be viewed weekly
weekly_display_filter = E.all(
    F.col("display_period_granularity") == "WEEK",
    F.col("periodicity") == "DAY",
    F.col("trailing_period").isNull(),
)

# Combined filter for the metric feed
# Applies all the filtering rules to show only the most relevant data points
feed_filter = E.all(
    period_index_filter,
    ~uses_va_for_actuals_filter,
    ~weekly_display_filter,
    F.col("aggregation_type") == F.col("metric_aggregate_function"),
    F.col("internal_dashboard_analysis_name")
    != "quarter_1y_growth_rate_period_to_date_minus_7_days",
)

# Additional filter for the live feed which is more selective
# Excludes weekly data, growth rates, and sliced data for certain periodicities
live_feed_filter = E.all(
    feed_filter,
    F.col("calculation_type") != "GROWTH_RATE",
    ~(
        F.col("periodicity").isin(["MONTH", "QUARTER"])
        & (F.col("internal_dashboard_analysis_name").like("%sliced_data%"))
    ),
    F.col("trailing_period").isNull(),
)

analysis_name_clean = E.chain_cases(
    [
        E.case(
            F.col("slice_name_1").isNotNull(),
            F.concat(
                F.col("metric_name"),
                F.lit(" - by "),
                F.initcap(
                    F.concat_ws(
                        ", ",
                        *[
                            F.regexp_replace(F.col(f"slice_name_{i}"), "_", " ")
                            for i in range(1, 11)
                        ],
                    )
                ),
            ),
        )
    ],
    otherwise=F.concat(F.col("metric_name"), F.lit(" - Total")),
)

period_start_clean = E.chain_cases(
    [E.case(F.col("periodicity") == "weekly", F.col("week_start_period_start"))],
    otherwise=F.col("period_start"),
)

period_end_clean = E.chain_cases(
    [E.case(F.col("periodicity") == "weekly", F.col("week_end_period_end"))],
    otherwise=F.col("period_end"),
)

slice_names_clean = E.chain_cases(
    [E.case(F.length(F.col("slice_names")) < 1, F.lit(None))],
    otherwise=F.col("slice_names"),
)

slice_values_clean = E.chain_cases(
    [E.case(F.length(F.col("slice_values")) < 1, F.lit(None))],
    otherwise=F.col("slice_values"),
)


def _get_yd_product_df(df: DataFrame, mapping_df: DataFrame) -> DataFrame:
    """Map unified KPI entity columns to product."""
    yd_product_df = mapping_df.select(
        "top_level_entity_name",
        "top_level_entity_ticker",
        "entity_name",
        F.col("yd_product").alias("legacy_yd_product"),
    ).distinct()

    yd_product_join_columns = [
        yd_product_df.top_level_entity_name.eqNullSafe(df.top_level_entity_name),
        yd_product_df.top_level_entity_ticker.eqNullSafe(df.top_level_entity_ticker),
        yd_product_df.entity_name.eqNullSafe(df.entity_name),
    ]

    map_yd_product_df = (
        df.join(yd_product_df, yd_product_join_columns, "left")
        .drop(
            yd_product_df.top_level_entity_name,
            yd_product_df.top_level_entity_ticker,
            yd_product_df.entity_name,
        )
        .withColumn("ticker", F.split(F.col("top_level_entity_ticker"), ":")[0])
    )

    return map_yd_product_df.withColumn(
        "yd_product",
        F.when(
            F.col("legacy_yd_product").isNull(),
            F.when(
                E.any(F.col("ticker").isNull(), F.col("ticker").rlike(r"\d+")),
                F.col("top_level_entity_name"),
            ).otherwise(F.col("ticker")),
        ).otherwise(F.col("legacy_yd_product")),
    ).drop("ticker")


def _get_metric_id_df(df: DataFrame, mapping_df: DataFrame) -> DataFrame:
    """Map unified KPI columns to legacy metric ID, if existing. Generate a new metric ID otherwise."""
    metric_id_df = mapping_df.select(
        "top_level_entity_name",
        "top_level_entity_ticker",
        "entity_name",
        "unified_kpi_metric_name",
        F.col("metric_name").alias("legacy_metric_name"),
        F.col("metric_id").alias("legacy_metric_id"),
    ).distinct()

    metric_id_join_columns = [
        metric_id_df.top_level_entity_name.eqNullSafe(df.top_level_entity_name),
        metric_id_df.top_level_entity_ticker.eqNullSafe(df.top_level_entity_ticker),
        metric_id_df.entity_name.eqNullSafe(df.entity_name),
        metric_id_df.unified_kpi_metric_name == df.unified_kpi_metric_name,
    ]

    legacy_metric_id_df = df.join(metric_id_df, metric_id_join_columns, "left").drop(
        metric_id_df.top_level_entity_name,
        metric_id_df.top_level_entity_ticker,
        metric_id_df.entity_name,
        metric_id_df.unified_kpi_metric_name,
    )

    return legacy_metric_id_df.withColumn(
        "metric_name",
        F.coalesce(F.col("legacy_metric_name"), F.col("unified_kpi_metric_name")),
    ).withColumn(
        "metric_id",
        F.when(
            F.col("legacy_metric_id").isNull(),
            E.uuid5(
                [
                    F.col("top_level_entity_name"),
                    F.col("top_level_entity_ticker"),
                    F.col("entity_name"),
                    F.col("unified_kpi_metric_name"),
                ]
            ),
        ).otherwise(F.col("legacy_metric_id")),
    )


def _get_analysis_id_df(df: DataFrame, mapping_df: DataFrame) -> DataFrame:
    """Map unified KPI columns to legacy analysis IDs, if existing. Generate new analysis IDs otherwise."""
    analysis_id_df = mapping_df.select(
        "metric_id",
        F.col("analysis_id").alias("legacy_analysis_id"),
        "slice_names",
        "periodicity_root",
        "calculation_type",
    ).distinct()

    analysis_id_join_columns = [
        analysis_id_df.metric_id == df.metric_id,
        analysis_id_df.periodicity_root == df.periodicity_root,
        analysis_id_df.slice_names.eqNullSafe(df.slice_names),
        analysis_id_df.calculation_type == df.calculation_type,
    ]

    legacy_analysis_id_df = df.join(
        analysis_id_df, analysis_id_join_columns, "left"
    ).drop(
        analysis_id_df.metric_id,
        analysis_id_df.periodicity_root,
        analysis_id_df.slice_names,
        analysis_id_df.calculation_type,
    )

    return legacy_analysis_id_df.withColumn(
        "analysis_id",
        F.when(
            F.col("legacy_analysis_id").isNull(),
            E.uuid5(
                [
                    F.col("metric_id"),
                    F.col("periodicity_root"),
                    F.col("slice_names"),
                    F.col("calculation_type"),
                ]
            ),
        ).otherwise(F.col("legacy_analysis_id")),
    )


@track_usage
[docs] def standard_metric_live_feed( df: DataFrame, entity_configuration: entity_configuration, spark: SparkSession = None, ) -> DataFrame: """ Transforms the unified KPI table to generate a dataframe containing YDL feed analyses. This function will be deprecated alongside YDL Feeds in July 2025. :param df: A dataframe of a metric's unified KPI table :param entity_configuration: A.entity_configuration configurations :return: DataFrame of live feed analyses for a metric Examples ^^^^^^^^ .. code-block:: python :caption: Generating live feed analyses for a metric. from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_live_feed(unified_kpi_df, entity_configuration) display(df) +----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+ |yd_product_type |yd_product |metric_name |analysis_name |...|periodicity |slice_names |slice_values |value |period_start |period_end |currency |figi |...| +----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+ |Live Metrics |CHWY Live |Net Sales (Order Date) |Net Sales (Order Date) - Total |...|daily |null |null |16420203.160226 |2025-01-01 |2025-01-01 |USD |BBG00P19DLQ4 |...| +----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+ |Live Metrics |CHWY Live |Net Sales (Order Date) |Net Sales (Order Date) - Total |...|monthly |null |null |997044783.786796 |2025-01-01 |2025-01-31 |USD |BBG00P19DLQ4 |...| +----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+ |Live Metrics |CHWY Live |Net Sales (Order Date) |Net Sales (Order Date) - Total |...|quarterly |null |null |3256134648.807972 |2024-10-28 |2025-02-02 |USD |BBG00P19DLQ4 |...| +----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+ |Live Metrics |CHWY Live |Net Sales (Order Date) |Net Sales (Order Date) - Total |...|MTD Y-0 |null |null |332905441.752821 |2025-02-01 |2025-02-10 |USD |BBG00P19DLQ4 |...| +----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+ |Live Metrics |CHWY Live |Net Sales (Order Date) |Net Sales (Order Date) - Total |...|QTD Y-0 |null |null |262939533.714792 |2025-02-03 |2025-02-10 |USD |BBG00P19DLQ4 |...| +----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+ """ _validate_unified_kpi_input_df(df) spark = spark or get_spark_session() metric_feed_id_mapping_df = ( spark.table(METRIC_FEED_ID_MAPPING_TABLE) .where(F.col("yd_product_type") == "Live Metrics") .drop("id", "analysis_name") ) data_through_dict = _get_data_through(df) filtered_df = ( df.withColumns( { "metric_aggregate_function": F.get_json_object( F.col("metric_options"), "$.aggregate_function" ), "growth_rate_type": F.get_json_object( F.col("metric_options"), "$.growth_rate_type" ), "source_table_granularity": F.get_json_object( F.col("metric_options"), "$.source_table_granularity" ), "periodicity_root": periodicity_root, "data_through": F.lit(data_through_dict["data_through"]), } ) .where(live_feed_filter) .withColumns( { "periodicity": periodicity_label, "slice_names": F.concat_ws( "|", *[ F.col(f"slice_name_{i}") for i in range(1, FEED_SLICE_COLUMN_COUNT + 1) ], ), "slice_values": F.concat_ws( "|", *[ F.col(f"slice_value_{i}") for i in range(1, FEED_SLICE_COLUMN_COUNT + 1) ], ), } ) .withColumns( { "slice_names": slice_names_clean, "slice_values": slice_values_clean, } ) .withColumnRenamed("metric_name", "unified_kpi_metric_name") .drop("analysis_name", "analysis_id", "metric_id") ) yd_product_df = _get_yd_product_df(filtered_df, metric_feed_id_mapping_df) metric_id_df = _get_metric_id_df(yd_product_df, metric_feed_id_mapping_df) analysis_id_df = _get_analysis_id_df(metric_id_df, metric_feed_id_mapping_df) live_feed_df = analysis_id_df.select( F.lit("Live Metrics").alias("yd_product_type"), "yd_product", "metric_name", analysis_name_clean.alias("analysis_name"), F.col("company_comparable_kpi").alias("company_comparable"), F.lit("v1").alias("methodology"), "periodicity", "slice_names", "slice_values", "period_start", "period_end", "value", "currency", "figi", "metric_id", "analysis_id", *FEED_SLICE_NAME_VALUE_COLUMNS, F.current_timestamp().alias("publication_timestamp"), ) return live_feed_df
@track_usage
[docs] def standard_metric_feed( df: DataFrame, entity_configuration: entity_configuration, spark: SparkSession = None, ) -> DataFrame: """ Transforms the unified KPI table to generate a dataframe containing core metric feed analyses. :param df: A dataframe of a metric's unified KPI table :param entity_configuration: A.entity_configuration configurations :return: DataFrame of core metric feed analyses for a metric Examples ^^^^^^^^ .. code-block:: python :caption: Generating core metric feed analyses for a metric. from etl_toolkit import A input_df = spark.table("yd_production.aap_reported.aap_gmv") calendar_df = spark.table("yd_fp_investor_audit.aap_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Advance Auto Parts", top_level_entity_ticker="AAP:XNYS", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales", currency="USD", value_divisor=1000000, visible_alpha_id=5598475, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="value", source_input_date_column="date", max_relevant_years=5, calendar_type="52_WEEK", source_table_filter_conditions=[F.col("metric")=="Net Sales"], trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df, ) df = A.standard_metric_feed(unified_kpi_df, entity_configuration) display(df) +----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+ |yd_product_type |yd_product |metric_name |analysis_name |...|periodicity |slice_names |slice_values |value |period_start |period_end |currency |figi |...| +----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+ |Core Metrics |AAP |Net Sales |Net Sales - Total |...|daily |null |null |18512030.489125 |2025-01-01 |2025-01-01 |USD |null |...| +----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+ |Core Metrics |AAP |Net Sales |Net Sales - Total |...|monthly |null |null |747219529.371808 |2025-01-01 |2025-01-31 |USD |null |...| +----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+ |Core Metrics |AAP |Net Sales |Net Sales - Total |...|quarterly |null |null |2026418754.661688 |2024-10-06 |2024-12-28 |USD |null |...| +----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+ |Core Metrics |AAP |Net Sales |Net Sales - Total |...|MTD Y-0 |null |null |29447541.625361 |2025-02-01 |2025-02-01 |USD |null |...| +----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+ |Core Metrics |AAP |Net Sales |Net Sales - Total |...|QTD Y-0 |null |null |868300454.561080 |2024-12-09 |2025-02-01 |USD |null |...| +----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+ """ _validate_unified_kpi_input_df(df) spark = spark or get_spark_session() metric_feed_id_mapping_df = ( spark.table(METRIC_FEED_ID_MAPPING_TABLE) .where(F.col("yd_product_type") == "Core Metrics") .drop("id", "analysis_name") ) data_through_dict = _get_data_through(df) relevant_periodicities = _get_relevant_periodicities(df) filtered_df = ( df.withColumns( { "metric_aggregate_function": F.get_json_object( F.col("metric_options"), "$.aggregate_function" ), "growth_rate_type": F.get_json_object( F.col("metric_options"), "$.growth_rate_type" ), "source_table_granularity": F.get_json_object( F.col("metric_options"), "$.source_table_granularity" ), "periodicity_root": periodicity_root, "data_through": F.lit(data_through_dict["data_through"]), } ) .where(feed_filter) .where(F.col("periodicity").isin(relevant_periodicities)) .withColumns( { "periodicity": periodicity_label, "slice_names": F.concat_ws( "|", *[ F.col(f"slice_name_{i}") for i in range(1, FEED_SLICE_COLUMN_COUNT + 1) ], ), "slice_values": F.concat_ws( "|", *[ F.col(f"slice_value_{i}") for i in range(1, FEED_SLICE_COLUMN_COUNT + 1) ], ), "period_start": period_start_clean, "period_end": period_end_clean, } ) .withColumns( { "slice_names": slice_names_clean, "slice_values": slice_values_clean, } ) .withColumnRenamed("metric_name", "unified_kpi_metric_name") .drop("analysis_name", "analysis_id", "metric_id") ) yd_product_df = _get_yd_product_df(filtered_df, metric_feed_id_mapping_df) metric_id_df = _get_metric_id_df(yd_product_df, metric_feed_id_mapping_df) analysis_id_df = _get_analysis_id_df(metric_id_df, metric_feed_id_mapping_df) metric_feed_df = analysis_id_df.select( F.lit("Core Metrics").alias("yd_product_type"), "yd_product", "metric_name", analysis_name_clean.alias("analysis_name"), F.col("company_comparable_kpi").alias("company_comparable"), F.lit("v1").alias("methodology"), "periodicity", "slice_names", "slice_values", "period_start", "period_end", "value", "currency", "figi", "metric_id", "analysis_id", *FEED_SLICE_NAME_VALUE_COLUMNS, F.current_timestamp().alias("publication_timestamp"), ) return metric_feed_df