from pyspark.sql import SparkSession
from pyspark.sql import functions as F, DataFrame
from yipit_databricks_utils.helpers.telemetry import track_usage
from yipit_databricks_client import get_spark_session
from etl_toolkit import E
from etl_toolkit.analyses.standard_metrics.config import entity_configuration
from etl_toolkit.analyses.standard_metrics.helpers import (
_validate_unified_kpi_input_df,
FEED_SLICE_COLUMN_COUNT,
FEED_SLICE_NAME_VALUE_COLUMNS,
METRIC_FEED_ID_MAPPING_TABLE,
_get_data_through,
_get_relevant_periodicities,
)
# Create human-readable periodicity labels for different analysis types
# This maps technical analysis names to user-friendly descriptions:
# - Daily values (no trailing period)
# - Weekly values (7-day trailing sum)
# - MTD (month-to-date) with year offset
# - QTD (quarter-to-date) with year offset
# - Monthly and Quarterly values
periodicity_root = E.chain_cases(
[
E.case(
E.all(F.col("periodicity") == "DAY", F.col("trailing_period").isNull()),
F.lit("daily"),
),
E.case(
E.all(F.col("trailing_period") == 7, F.col("aggregation_type") == "SUM"),
F.lit("weekly"),
),
E.case(
F.col("internal_dashboard_analysis_name").like("month_%_period_to_date%"),
F.lit("MTD"),
),
E.case(
F.col("internal_dashboard_analysis_name").like("quarter_%_period_to_date%"),
F.lit("QTD"),
),
E.case(
E.all(
F.col("periodicity").isin(["MONTH", "QUARTER"]),
~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
),
F.lower(F.concat(F.col("periodicity"), F.lit("ly"))),
),
],
)
year_label = E.chain_cases(
[
E.case(
F.col("calculation_type") == "GROWTH_RATE",
F.upper(F.split(F.col("internal_dashboard_analysis_name"), "_")[1]),
),
E.case(
E.all(
F.col("calculation_type") == "SIMPLE_AGGREGATE",
F.col("periodicity_root") == "MTD",
),
F.concat(F.lit("Y-"), (F.col("period_index") / 12).cast("int")),
),
E.case(
E.all(
F.col("calculation_type") == "SIMPLE_AGGREGATE",
F.col("periodicity_root") == "QTD",
),
F.concat(F.lit("Y-"), (F.col("period_index") / 4).cast("int")),
),
]
)
growth_rate_label = E.chain_cases(
[E.case(F.col("growth_rate_type") == "CAGR", F.lit("CAGR"))],
otherwise=F.lit("Growth"),
)
periodicity_label = E.chain_cases(
[
E.case(
E.all(
F.col("calculation_type") == "SIMPLE_AGGREGATE",
F.col("periodicity_root").like("%TD"),
),
F.concat_ws(" ", F.col("periodicity_root"), year_label),
),
E.case(
F.col("calculation_type") == "GROWTH_RATE",
F.concat(
year_label,
F.lit(" "),
growth_rate_label,
F.lit(" - "),
F.col("periodicity_root"),
),
),
],
otherwise=F.col("periodicity_root"),
)
# Filter to select only the most relevant periods for the feed
# This includes:
# - Daily values (most recent)
# - Non-daily values that aren't the current period (period_index != 0)
# - Month-to-date values at yearly intervals
# - Quarter-to-date values at yearly intervals
# - Weekly values (7-day trailing) at weekly intervals
period_index_filter = E.any(
[
F.col("internal_dashboard_analysis_name").isin(
["day_simple_aggregate", "day_simple_aggregate_sliced_data"]
),
E.all(
F.col("periodicity") != "DAY",
~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
E.any(
F.col("period_index") != 0,
F.col("periodicity") == F.col("source_table_granularity"),
# Include the non-PTD for the current period if the last day in the period
# is the max date in the data. This means the current PTD is the full period
F.col("period_end") == F.col("data_through"),
),
),
E.all(
F.col("internal_dashboard_analysis_name").like("month_%_period_to_date%"),
(F.col("period_index") % 12) == 0,
),
E.all(
F.col("internal_dashboard_analysis_name").like("quarter_%_period_to_date%"),
(F.col("period_index") % 4) == 0,
),
E.all( # weekly analyses are trailing 7 day sums
F.col("trailing_period") == 7,
F.col("aggregation_type") == "SUM",
(F.col("period_index") % 7) == 0,
),
]
)
# Exclude quarterly data for metrics that use VA data as part of their product methodology
uses_va_for_actuals_filter = E.all(
F.col("uses_va_for_actuals"),
F.col("periodicity") == "QUARTER",
)
# Filter out daily data for metrics with a weekly display granularity
# This prevents showing too granular data for metrics that should be viewed weekly
weekly_display_filter = E.all(
F.col("display_period_granularity") == "WEEK",
F.col("periodicity") == "DAY",
F.col("trailing_period").isNull(),
)
# Combined filter for the metric feed
# Applies all the filtering rules to show only the most relevant data points
feed_filter = E.all(
period_index_filter,
~uses_va_for_actuals_filter,
~weekly_display_filter,
F.col("aggregation_type") == F.col("metric_aggregate_function"),
F.col("internal_dashboard_analysis_name")
!= "quarter_1y_growth_rate_period_to_date_minus_7_days",
)
# Additional filter for the live feed which is more selective
# Excludes weekly data, growth rates, and sliced data for certain periodicities
live_feed_filter = E.all(
feed_filter,
F.col("calculation_type") != "GROWTH_RATE",
~(
F.col("periodicity").isin(["MONTH", "QUARTER"])
& (F.col("internal_dashboard_analysis_name").like("%sliced_data%"))
),
F.col("trailing_period").isNull(),
)
analysis_name_clean = E.chain_cases(
[
E.case(
F.col("slice_name_1").isNotNull(),
F.concat(
F.col("metric_name"),
F.lit(" - by "),
F.initcap(
F.concat_ws(
", ",
*[
F.regexp_replace(F.col(f"slice_name_{i}"), "_", " ")
for i in range(1, 11)
],
)
),
),
)
],
otherwise=F.concat(F.col("metric_name"), F.lit(" - Total")),
)
period_start_clean = E.chain_cases(
[E.case(F.col("periodicity") == "weekly", F.col("week_start_period_start"))],
otherwise=F.col("period_start"),
)
period_end_clean = E.chain_cases(
[E.case(F.col("periodicity") == "weekly", F.col("week_end_period_end"))],
otherwise=F.col("period_end"),
)
slice_names_clean = E.chain_cases(
[E.case(F.length(F.col("slice_names")) < 1, F.lit(None))],
otherwise=F.col("slice_names"),
)
slice_values_clean = E.chain_cases(
[E.case(F.length(F.col("slice_values")) < 1, F.lit(None))],
otherwise=F.col("slice_values"),
)
def _get_yd_product_df(df: DataFrame, mapping_df: DataFrame) -> DataFrame:
"""Map unified KPI entity columns to product."""
yd_product_df = mapping_df.select(
"top_level_entity_name",
"top_level_entity_ticker",
"entity_name",
F.col("yd_product").alias("legacy_yd_product"),
).distinct()
yd_product_join_columns = [
yd_product_df.top_level_entity_name.eqNullSafe(df.top_level_entity_name),
yd_product_df.top_level_entity_ticker.eqNullSafe(df.top_level_entity_ticker),
yd_product_df.entity_name.eqNullSafe(df.entity_name),
]
map_yd_product_df = (
df.join(yd_product_df, yd_product_join_columns, "left")
.drop(
yd_product_df.top_level_entity_name,
yd_product_df.top_level_entity_ticker,
yd_product_df.entity_name,
)
.withColumn("ticker", F.split(F.col("top_level_entity_ticker"), ":")[0])
)
return map_yd_product_df.withColumn(
"yd_product",
F.when(
F.col("legacy_yd_product").isNull(),
F.when(
E.any(F.col("ticker").isNull(), F.col("ticker").rlike(r"\d+")),
F.col("top_level_entity_name"),
).otherwise(F.col("ticker")),
).otherwise(F.col("legacy_yd_product")),
).drop("ticker")
def _get_metric_id_df(df: DataFrame, mapping_df: DataFrame) -> DataFrame:
"""Map unified KPI columns to legacy metric ID, if existing. Generate a new metric ID otherwise."""
metric_id_df = mapping_df.select(
"top_level_entity_name",
"top_level_entity_ticker",
"entity_name",
"unified_kpi_metric_name",
F.col("metric_name").alias("legacy_metric_name"),
F.col("metric_id").alias("legacy_metric_id"),
).distinct()
metric_id_join_columns = [
metric_id_df.top_level_entity_name.eqNullSafe(df.top_level_entity_name),
metric_id_df.top_level_entity_ticker.eqNullSafe(df.top_level_entity_ticker),
metric_id_df.entity_name.eqNullSafe(df.entity_name),
metric_id_df.unified_kpi_metric_name == df.unified_kpi_metric_name,
]
legacy_metric_id_df = df.join(metric_id_df, metric_id_join_columns, "left").drop(
metric_id_df.top_level_entity_name,
metric_id_df.top_level_entity_ticker,
metric_id_df.entity_name,
metric_id_df.unified_kpi_metric_name,
)
return legacy_metric_id_df.withColumn(
"metric_name",
F.coalesce(F.col("legacy_metric_name"), F.col("unified_kpi_metric_name")),
).withColumn(
"metric_id",
F.when(
F.col("legacy_metric_id").isNull(),
E.uuid5(
[
F.col("top_level_entity_name"),
F.col("top_level_entity_ticker"),
F.col("entity_name"),
F.col("unified_kpi_metric_name"),
]
),
).otherwise(F.col("legacy_metric_id")),
)
def _get_analysis_id_df(df: DataFrame, mapping_df: DataFrame) -> DataFrame:
"""Map unified KPI columns to legacy analysis IDs, if existing. Generate new analysis IDs otherwise."""
analysis_id_df = mapping_df.select(
"metric_id",
F.col("analysis_id").alias("legacy_analysis_id"),
"slice_names",
"periodicity_root",
"calculation_type",
).distinct()
analysis_id_join_columns = [
analysis_id_df.metric_id == df.metric_id,
analysis_id_df.periodicity_root == df.periodicity_root,
analysis_id_df.slice_names.eqNullSafe(df.slice_names),
analysis_id_df.calculation_type == df.calculation_type,
]
legacy_analysis_id_df = df.join(
analysis_id_df, analysis_id_join_columns, "left"
).drop(
analysis_id_df.metric_id,
analysis_id_df.periodicity_root,
analysis_id_df.slice_names,
analysis_id_df.calculation_type,
)
return legacy_analysis_id_df.withColumn(
"analysis_id",
F.when(
F.col("legacy_analysis_id").isNull(),
E.uuid5(
[
F.col("metric_id"),
F.col("periodicity_root"),
F.col("slice_names"),
F.col("calculation_type"),
]
),
).otherwise(F.col("legacy_analysis_id")),
)
@track_usage
[docs]
def standard_metric_live_feed(
df: DataFrame,
entity_configuration: entity_configuration,
spark: SparkSession = None,
) -> DataFrame:
"""
Transforms the unified KPI table to generate a dataframe containing YDL feed analyses. This function will be deprecated alongside YDL Feeds in July 2025.
:param df: A dataframe of a metric's unified KPI table
:param entity_configuration: A.entity_configuration configurations
:return: DataFrame of live feed analyses for a metric
Examples
^^^^^^^^
.. code-block:: python
:caption: Generating live feed analyses for a metric.
from etl_toolkit import A
input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")
entity_configuration = A.entity_configuration(
top_level_entity_name="Chewy",
top_level_entity_ticker="CHWY:XNYS",
figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Net Sales - Order Date",
company_comparable_kpi=False,
currency="USD",
value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="net_sales_order_date",
source_input_date_column="date",
max_relevant_years=4,
calendar_type="52_WEEK",
trailing_period_aggregate_function="SUM",
)
unified_kpi_df = A.standard_metric_unified_kpi(
input_df,
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
df = A.standard_metric_live_feed(unified_kpi_df, entity_configuration)
display(df)
+----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+
|yd_product_type |yd_product |metric_name |analysis_name |...|periodicity |slice_names |slice_values |value |period_start |period_end |currency |figi |...|
+----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+
|Live Metrics |CHWY Live |Net Sales (Order Date) |Net Sales (Order Date) - Total |...|daily |null |null |16420203.160226 |2025-01-01 |2025-01-01 |USD |BBG00P19DLQ4 |...|
+----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+
|Live Metrics |CHWY Live |Net Sales (Order Date) |Net Sales (Order Date) - Total |...|monthly |null |null |997044783.786796 |2025-01-01 |2025-01-31 |USD |BBG00P19DLQ4 |...|
+----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+
|Live Metrics |CHWY Live |Net Sales (Order Date) |Net Sales (Order Date) - Total |...|quarterly |null |null |3256134648.807972 |2024-10-28 |2025-02-02 |USD |BBG00P19DLQ4 |...|
+----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+
|Live Metrics |CHWY Live |Net Sales (Order Date) |Net Sales (Order Date) - Total |...|MTD Y-0 |null |null |332905441.752821 |2025-02-01 |2025-02-10 |USD |BBG00P19DLQ4 |...|
+----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+
|Live Metrics |CHWY Live |Net Sales (Order Date) |Net Sales (Order Date) - Total |...|QTD Y-0 |null |null |262939533.714792 |2025-02-03 |2025-02-10 |USD |BBG00P19DLQ4 |...|
+----------------+-----------+-----------------------+-------------------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-------------+---+
"""
_validate_unified_kpi_input_df(df)
spark = spark or get_spark_session()
metric_feed_id_mapping_df = (
spark.table(METRIC_FEED_ID_MAPPING_TABLE)
.where(F.col("yd_product_type") == "Live Metrics")
.drop("id", "analysis_name")
)
data_through_dict = _get_data_through(df)
filtered_df = (
df.withColumns(
{
"metric_aggregate_function": F.get_json_object(
F.col("metric_options"), "$.aggregate_function"
),
"growth_rate_type": F.get_json_object(
F.col("metric_options"), "$.growth_rate_type"
),
"source_table_granularity": F.get_json_object(
F.col("metric_options"), "$.source_table_granularity"
),
"periodicity_root": periodicity_root,
"data_through": F.lit(data_through_dict["data_through"]),
}
)
.where(live_feed_filter)
.withColumns(
{
"periodicity": periodicity_label,
"slice_names": F.concat_ws(
"|",
*[
F.col(f"slice_name_{i}")
for i in range(1, FEED_SLICE_COLUMN_COUNT + 1)
],
),
"slice_values": F.concat_ws(
"|",
*[
F.col(f"slice_value_{i}")
for i in range(1, FEED_SLICE_COLUMN_COUNT + 1)
],
),
}
)
.withColumns(
{
"slice_names": slice_names_clean,
"slice_values": slice_values_clean,
}
)
.withColumnRenamed("metric_name", "unified_kpi_metric_name")
.drop("analysis_name", "analysis_id", "metric_id")
)
yd_product_df = _get_yd_product_df(filtered_df, metric_feed_id_mapping_df)
metric_id_df = _get_metric_id_df(yd_product_df, metric_feed_id_mapping_df)
analysis_id_df = _get_analysis_id_df(metric_id_df, metric_feed_id_mapping_df)
live_feed_df = analysis_id_df.select(
F.lit("Live Metrics").alias("yd_product_type"),
"yd_product",
"metric_name",
analysis_name_clean.alias("analysis_name"),
F.col("company_comparable_kpi").alias("company_comparable"),
F.lit("v1").alias("methodology"),
"periodicity",
"slice_names",
"slice_values",
"period_start",
"period_end",
"value",
"currency",
"figi",
"metric_id",
"analysis_id",
*FEED_SLICE_NAME_VALUE_COLUMNS,
F.current_timestamp().alias("publication_timestamp"),
)
return live_feed_df
@track_usage
[docs]
def standard_metric_feed(
df: DataFrame,
entity_configuration: entity_configuration,
spark: SparkSession = None,
) -> DataFrame:
"""
Transforms the unified KPI table to generate a dataframe containing core metric feed analyses.
:param df: A dataframe of a metric's unified KPI table
:param entity_configuration: A.entity_configuration configurations
:return: DataFrame of core metric feed analyses for a metric
Examples
^^^^^^^^
.. code-block:: python
:caption: Generating core metric feed analyses for a metric.
from etl_toolkit import A
input_df = spark.table("yd_production.aap_reported.aap_gmv")
calendar_df = spark.table("yd_fp_investor_audit.aap_xnys_deliverable_gold.custom_calendar__dmv__000")
entity_configuration = A.entity_configuration(
top_level_entity_name="Advance Auto Parts",
top_level_entity_ticker="AAP:XNYS",
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Net Sales",
currency="USD",
value_divisor=1000000,
visible_alpha_id=5598475,
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="value",
source_input_date_column="date",
max_relevant_years=5,
calendar_type="52_WEEK",
source_table_filter_conditions=[F.col("metric")=="Net Sales"],
trailing_period_aggregate_function="SUM",
)
unified_kpi_df = A.standard_metric_unified_kpi(
input_df,
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
calendar_df,
)
df = A.standard_metric_feed(unified_kpi_df, entity_configuration)
display(df)
+----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+
|yd_product_type |yd_product |metric_name |analysis_name |...|periodicity |slice_names |slice_values |value |period_start |period_end |currency |figi |...|
+----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+
|Core Metrics |AAP |Net Sales |Net Sales - Total |...|daily |null |null |18512030.489125 |2025-01-01 |2025-01-01 |USD |null |...|
+----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+
|Core Metrics |AAP |Net Sales |Net Sales - Total |...|monthly |null |null |747219529.371808 |2025-01-01 |2025-01-31 |USD |null |...|
+----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+
|Core Metrics |AAP |Net Sales |Net Sales - Total |...|quarterly |null |null |2026418754.661688 |2024-10-06 |2024-12-28 |USD |null |...|
+----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+
|Core Metrics |AAP |Net Sales |Net Sales - Total |...|MTD Y-0 |null |null |29447541.625361 |2025-02-01 |2025-02-01 |USD |null |...|
+----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+
|Core Metrics |AAP |Net Sales |Net Sales - Total |...|QTD Y-0 |null |null |868300454.561080 |2024-12-09 |2025-02-01 |USD |null |...|
+----------------+-----------+------------+------------------+---+------------+------------+-------------+------------------+-------------+-----------+---------+-----+---+
"""
_validate_unified_kpi_input_df(df)
spark = spark or get_spark_session()
metric_feed_id_mapping_df = (
spark.table(METRIC_FEED_ID_MAPPING_TABLE)
.where(F.col("yd_product_type") == "Core Metrics")
.drop("id", "analysis_name")
)
data_through_dict = _get_data_through(df)
relevant_periodicities = _get_relevant_periodicities(df)
filtered_df = (
df.withColumns(
{
"metric_aggregate_function": F.get_json_object(
F.col("metric_options"), "$.aggregate_function"
),
"growth_rate_type": F.get_json_object(
F.col("metric_options"), "$.growth_rate_type"
),
"source_table_granularity": F.get_json_object(
F.col("metric_options"), "$.source_table_granularity"
),
"periodicity_root": periodicity_root,
"data_through": F.lit(data_through_dict["data_through"]),
}
)
.where(feed_filter)
.where(F.col("periodicity").isin(relevant_periodicities))
.withColumns(
{
"periodicity": periodicity_label,
"slice_names": F.concat_ws(
"|",
*[
F.col(f"slice_name_{i}")
for i in range(1, FEED_SLICE_COLUMN_COUNT + 1)
],
),
"slice_values": F.concat_ws(
"|",
*[
F.col(f"slice_value_{i}")
for i in range(1, FEED_SLICE_COLUMN_COUNT + 1)
],
),
"period_start": period_start_clean,
"period_end": period_end_clean,
}
)
.withColumns(
{
"slice_names": slice_names_clean,
"slice_values": slice_values_clean,
}
)
.withColumnRenamed("metric_name", "unified_kpi_metric_name")
.drop("analysis_name", "analysis_id", "metric_id")
)
yd_product_df = _get_yd_product_df(filtered_df, metric_feed_id_mapping_df)
metric_id_df = _get_metric_id_df(yd_product_df, metric_feed_id_mapping_df)
analysis_id_df = _get_analysis_id_df(metric_id_df, metric_feed_id_mapping_df)
metric_feed_df = analysis_id_df.select(
F.lit("Core Metrics").alias("yd_product_type"),
"yd_product",
"metric_name",
analysis_name_clean.alias("analysis_name"),
F.col("company_comparable_kpi").alias("company_comparable"),
F.lit("v1").alias("methodology"),
"periodicity",
"slice_names",
"slice_values",
"period_start",
"period_end",
"value",
"currency",
"figi",
"metric_id",
"analysis_id",
*FEED_SLICE_NAME_VALUE_COLUMNS,
F.current_timestamp().alias("publication_timestamp"),
)
return metric_feed_df