from pyspark.sql import DataFrame, functions as F
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit.analyses.standard_metrics.helpers import (
_validate_unified_kpi_input_df,
NULL_DECIMAL_PLACEHOLDER,
)
@track_usage
[docs]
def standard_metric_daily_growth(df: DataFrame) -> DataFrame:
"""
Transforms a metric's unified KPI table to generate a dataframe containing daily growth analyses.
:param df: A dataframe of a metric's unified KPI table
:return: DataFrame of daily growth analyses for a metric
Examples
^^^^^^^^
.. code-block:: python
:caption: Generating daily growth analyses for a metric.
from etl_toolkit import A
input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")
entity_configuration = A.entity_configuration(
top_level_entity_name="Chewy",
top_level_entity_ticker="CHWY:XNYS",
figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Net Sales - Order Date",
company_comparable_kpi=False,
currency="USD",
value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="net_sales_order_date",
source_input_date_column="date",
max_relevant_years=4,
calendar_type="52_WEEK",
trailing_period_aggregate_function="SUM",
)
unified_kpi_df = A.standard_metric_unified_kpi(
input_df,
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
df = A.standard_metric_daily_growth(unified_kpi_df)
display(df)
+------------------------+----------------------+-------------+-----------+-----------------------+------------------------+--------------------------------+--------------------------------+---+
|top_level_entity_ticker |top_level_entity_name |period_start |period_end |metric_name |dashboard_analysis_name |day_1y_growth_rate_trailing_day |day_2y_growth_rate_trailing_day |...|
+------------------------+----------------------+-------------+-----------+-----------------------+------------------------+--------------------------------+--------------------------------+---+
|CHWY:XNYS |Chewy |2025-02-01 |2025-02-01 |Net Sales - Order Date |Daily Growth |0.012224 |0.017250 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+------------------------+--------------------------------+--------------------------------+---+
|CHWY:XNYS |Chewy |2025-02-02 |2025-02-02 |Net Sales - Order Date |Daily Growth |0.014802 |0.016585 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+------------------------+--------------------------------+--------------------------------+---+
|CHWY:XNYS |Chewy |2025-02-03 |2025-02-03 |Net Sales - Order Date |Daily Growth |0.040221 |0.024634 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+------------------------+--------------------------------+--------------------------------+---+
"""
_validate_unified_kpi_input_df(df)
# Generate base df with analysis filtered to daily aggregated sum
base_df = (
df.where(
F.col("internal_dashboard_analysis_name").like(
"day_%y_growth_rate_trailing_day"
)
)
.groupBy(
"period_start",
"period_end",
"top_level_entity_ticker",
"top_level_entity_name",
"metric_name",
"currency",
"value_divisor",
"internal_metric_id",
)
.pivot("internal_dashboard_analysis_name")
.sum("value")
)
# Add empty columns for any missing years 1-4 to maintain consistent schema
for year in range(1, 5):
if f"day_{year}y_growth_rate_trailing_day" not in base_df.columns:
base_df = base_df.withColumn(
f"day_{year}y_growth_rate_trailing_day", NULL_DECIMAL_PLACEHOLDER
)
pivot_df = base_df.groupBy(
"period_start",
"period_end",
"top_level_entity_ticker",
"top_level_entity_name",
"metric_name",
"currency",
"value_divisor",
"internal_metric_id",
).agg(
F.min("day_1y_growth_rate_trailing_day").alias(
"day_1y_growth_rate_trailing_day"
),
F.min("day_2y_growth_rate_trailing_day").alias(
"day_2y_growth_rate_trailing_day"
),
F.min("day_3y_growth_rate_trailing_day").alias(
"day_3y_growth_rate_trailing_day"
),
F.min("day_4y_growth_rate_trailing_day").alias(
"day_4y_growth_rate_trailing_day"
),
)
daily_growth_df = pivot_df.select(
"top_level_entity_ticker",
"top_level_entity_name",
"period_start",
"period_end",
"metric_name",
F.lit("Daily Growth").alias("dashboard_analysis_name"),
"day_1y_growth_rate_trailing_day",
"day_2y_growth_rate_trailing_day",
"day_3y_growth_rate_trailing_day",
"day_4y_growth_rate_trailing_day",
"value_divisor",
"currency",
F.current_timestamp().alias("publication_timestamp"),
"internal_metric_id",
)
return daily_growth_df