from pyspark.sql import DataFrame, functions as F, Window as W
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit import E
from etl_toolkit.analyses.standard_metrics.helpers import (
_validate_unified_kpi_input_df,
_get_metric_configs,
)
analysis_filter = E.chain_cases(
[
E.case(
F.col("source_table_granularity") == "DAY",
E.any(
E.all(
F.col("internal_dashboard_analysis_name")
== "day_simple_aggregate_trailing_day",
F.col("aggregation_type")
== F.col("metric_config_aggregate_function"),
),
E.all(
F.col("internal_dashboard_analysis_name").like(
"day%y_growth_rate_trailing_day"
),
F.col("aggregation_type")
== F.col("trailing_period_aggregate_function"),
),
),
),
E.case(
F.col("source_table_granularity") == "MONTH",
E.any(
F.col("internal_dashboard_analysis_name") == "month_simple_aggregate",
F.col("internal_dashboard_analysis_name").like("month_%y_growth_rate"),
),
),
]
)
analysis_label = E.chain_cases(
[
E.case(F.col("calculation_type") == "SIMPLE_AGGREGATE", F.lit("Nominal")),
E.case(F.col("duration") == 1, F.lit("Y/Y Growth")),
E.case(
E.all(F.col("growth_rate_type") == "SIMPLE", F.col("duration") > 1),
F.concat(F.col("duration"), F.lit("Y Growth")),
),
E.case(
E.all(F.col("growth_rate_type") == "CAGR", F.col("duration") > 1),
F.concat(F.col("duration"), F.lit("Y CAGR")),
),
],
)
@track_usage
[docs]
def standard_metric_trailing_day_pivot(df: DataFrame) -> DataFrame:
"""
Transforms a metric's unified KPI table to generate a dataframe containing trailing day analyses.
:param df: A dataframe of a metric's unified KPI table
:return: DataFrame of trailing day analyses for a metric
Examples
^^^^^^^^
.. code-block:: python
:caption: Generating trailing day analyses for a metric.
from etl_toolkit import A
input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")
entity_configuration = A.entity_configuration(
top_level_entity_name="Chewy",
top_level_entity_ticker="CHWY:XNYS",
figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Net Sales - Order Date",
company_comparable_kpi=False,
currency="USD",
value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="net_sales_order_date",
source_input_date_column="date",
max_relevant_years=4,
calendar_type="52_WEEK",
trailing_period_aggregate_function="SUM",
)
unified_kpi_df = A.standard_metric_unified_kpi(
input_df,
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
df = A.standard_metric_txd_pivot(unified_kpi_df)
display(df)
+------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+
|top_level_entity_ticker |top_level_entity_name |period_start |period_end |metric_name |...|analysis_label |...|value |previous_week_txd_value |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+
|CHWY:XNYS |Chewy |2025-01-28 |2025-02-03 |Net Sales - Order Date |...|Nominal |...|233.494012 |230.214723 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+
|CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|Y/Y Growth |...|0.040221 |0.075195 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+
|CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|2Y CAGR |...|0.024634 |0.041169 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+
|CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|3Y CAGR |...|0.116819 |0.043802 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+
|CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|4Y CAGR |...|0.105718 |0.085727 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+---------------+---+------------+------------------------+---+
"""
_validate_unified_kpi_input_df(df)
prev_week_window = W.partitionBy(
F.col("aggregation_type"), F.col("internal_dashboard_analysis_name")
).orderBy(F.col("period_start"))
sequential_index_window = W.orderBy(F.col("period_index"))
prev_period_window = W.partitionBy(
F.col("aggregation_type"), F.col("internal_dashboard_analysis_name")
).orderBy(F.col("sequential_index").desc())
metric_configurations = _get_metric_configs(df)
filtered_df = (
df.withColumns(
{
"source_table_granularity": F.lit(
metric_configurations["source_table_granularity"]
),
"trailing_period_aggregate_function": F.lit(
metric_configurations["trailing_period_aggregate_function"]
).cast("string"),
"metric_config_aggregate_function": F.lit(
metric_configurations["metric_config_aggregation_type"]
),
"growth_rate_type": F.lit(metric_configurations["growth_rate_type"]),
"duration": F.regexp_replace(
F.col("internal_dashboard_analysis_name"), r"(\D)", ""
).try_cast("int"),
"adjusted_period_start": F.when(
F.col("source_table_granularity") == "DAY",
F.dateadd(F.col("period_end"), -(F.col("trailing_period") - 1)),
).otherwise(F.col("period_start")),
"analysis_label": analysis_label,
"analysis_label_sort": F.coalesce(F.col("duration") + 1, F.lit(1)),
"format_label": F.when(
F.col("calculation_type") == "SIMPLE_AGGREGATE", F.lit("NUMBER")
).otherwise(F.lit("PERCENT")),
"final_value": F.when(
F.col("calculation_type") == "SIMPLE_AGGREGATE",
F.col("value") / F.col("value_divisor"),
).otherwise(F.col("value")),
"dashboard_analysis_name": F.when(
F.col("source_table_granularity") == "DAY",
F.lit("Trailing Day - Key Metrics"),
).otherwise(F.lit("End of Period - Key Metrics")),
# Using max_sequential_index, always include the last 5 weeks of trailing data for daily data
# or the last 6 months of trailing data for monthly data
"max_sequential_index": F.when(
F.col("source_table_granularity") == "DAY", 4
).otherwise(5),
}
)
.where(analysis_filter)
.withColumn(
"previous_week_txd_value",
F.when(
F.col("source_table_granularity") == "DAY",
F.lag(F.col("final_value"), 7).over(prev_week_window),
),
)
.where(
E.any(
F.col("source_table_granularity") == "MONTH",
(F.col("period_index") % F.col("trailing_period")) == 0,
)
)
.withColumn(
"sequential_index", (F.dense_rank().over(sequential_index_window)) - 1
)
.withColumn(
"previous_period_value",
F.lag(F.col("final_value"), 1).over(prev_period_window),
)
)
txd_pivot_df = filtered_df.where(
F.col("sequential_index") <= F.col("max_sequential_index")
).select(
"top_level_entity_ticker",
"top_level_entity_name",
F.col("adjusted_period_start").alias("period_start"),
"period_end",
"metric_name",
"analysis_name",
"dashboard_analysis_name",
"analysis_label",
"analysis_label_sort",
"format_label",
F.col("final_value").alias("value"),
"previous_week_txd_value",
F.col("trailing_period_aggregate_function").alias(
"trailing_period_aggregation_type"
),
"currency",
"value_divisor",
"trailing_period",
"sequential_index",
F.current_timestamp().alias("publication_timestamp"),
F.col("metric_config_aggregate_function").alias("qtd_agg_type"),
F.col("aggregation_type").alias("analysis_agg_type"),
"previous_period_value",
F.col("source_table_granularity").alias("data_source_table_grain"),
"internal_metric_id",
)
return txd_pivot_df