from pyspark.sql import DataFrame, functions as F
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit import E
from etl_toolkit.analyses.standard_metrics.helpers import (
_validate_unified_kpi_input_df,
_get_metric_configs,
)
analysis_label = E.chain_cases(
[
E.case(F.col("calculation_type") == "SIMPLE_AGGREGATE", F.lit("Nominal")),
E.case(F.col("duration") == 1, F.lit("Y/Y Growth")),
E.case(
E.all(
F.col("calculation_type") == "GROWTH_RATE",
F.col("growth_rate_type") == "SIMPLE",
F.col("duration") > 1,
),
F.concat(F.col("duration"), F.lit("Y Growth")),
),
E.case(
E.all(
F.col("calculation_type") == "GROWTH_RATE",
F.col("growth_rate_type") == "CAGR",
F.col("duration") > 1,
),
F.concat(F.col("duration"), F.lit("Y CAGR")),
),
],
)
fiscal_period_label = E.chain_cases(
[
E.case(
E.all(
F.col("source_table_granularity") == "DAY",
F.col("periodicity") == "MONTH",
F.col("period_index") == 0,
# For the current period (period_index == 0), the fiscal period should be labeled with
# MTD unless the max date is the last day of the month (data_through == max_month)
F.col("data_through") != F.col("max_month"),
),
F.concat(
F.lit("MTD "),
F.month(F.col("period_end")),
F.lit("/"),
F.day(F.col("period_end")),
),
),
E.case(
E.all(
F.col("source_table_granularity") == "MONTH",
F.col("periodicity") == "MONTH",
F.col("period_index") == 0,
),
F.concat(
F.monthname(F.col("period_end")),
F.lit("-"),
F.date_format(F.col("period_end"), "yy"),
),
),
E.case(
E.all(
F.col("periodicity") == "QUARTER",
F.col("period_index") == 0,
# For the current period (period_index == 0), the fiscal period should be labeled with
# QTD unless the max date is the last day of the quarter (data_through == max_quarter)
F.col("data_through") != F.col("max_quarter"),
),
F.concat(
F.lit("QTD "),
F.month(F.col("period_end")),
F.lit("/"),
F.day(F.col("period_end")),
),
),
E.case(
E.all(
F.col("periodicity") == "HALF_YEAR",
F.col("period_index") == 0,
),
F.concat(
F.lit("PTD "),
F.month(F.col("period_end")),
F.lit("/"),
F.day(F.col("period_end")),
),
),
E.case(
E.all(
F.col("periodicity") == "MONTH",
# When it is not the current period (period_index > 0) or the max date is
# the last day of the month (data_through == max_month), the fiscal period
# should use the monthly label (e.g. Mar-25) instead of the MTD label
E.any(
F.col("period_index") > 0,
F.col("data_through") == F.col("max_month"),
),
),
F.concat(
F.monthname(F.col("period_end")),
F.lit("-"),
F.date_format(F.col("period_end"), "yy"),
),
),
E.case(
E.all(
F.col("periodicity") == "QUARTER",
# When it is not the current period (period_index > 0) or the max date is
# the last day of the quarter (data_through == max_quarter), the fiscal period
# should use the quarterly (e.g. 1Q25) label instead of the QTD label
E.any(
F.col("period_index") > 0,
F.col("data_through") == F.col("max_quarter"),
),
),
F.col("quarter_label_period_end"),
),
E.case(
E.all(
F.col("periodicity") == "HALF_YEAR",
F.col("period_index") > 0,
),
F.col("half_year_label"),
),
]
)
fiscal_period_sort = E.chain_cases(
[
E.case(
E.all(
F.col("periodicity").isin(["QUARTER", "HALF_YEAR"]),
F.col("period_index") == 1,
),
1,
),
E.case(E.all(F.col("periodicity") == "MONTH", F.col("period_index") == 2), 2),
E.case(E.all(F.col("periodicity") == "MONTH", F.col("period_index") == 1), 3),
E.case(E.all(F.col("periodicity") == "MONTH", F.col("period_index") == 0), 4),
E.case(E.all(F.col("periodicity") == "QUARTER", F.col("period_index") == 0), 5),
E.case(
E.all(F.col("periodicity") == "HALF_YEAR", F.col("period_index") == 0), 6
),
]
)
# Use PTD analyses for the current period (period_index=0) and non-PTD analyses for everything else
combine_ptd_and_full_period_filter = E.any(
E.all(
F.col("period_index") == 0,
F.col("periodicity").isin(["MONTH", "HALF_YEAR"]),
F.col("internal_dashboard_analysis_name").endswith("period_to_date"),
),
E.all(
F.col("period_index") == 0,
F.col("internal_dashboard_analysis_name").like("quarter%period_to_date"),
F.col("data_through")
< F.col("max_quarter"), # Needed to handle 90/97 fiscal quarter issue
),
E.all(
F.col("period_index") == 0,
F.col("periodicity") == "QUARTER",
~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
F.col("data_through")
== F.col("max_quarter"), # Needed to handle 90/97 fiscal quarter issue
),
E.all(
F.col("period_index") > 0,
~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
),
E.all(
F.col("period_index") == 0,
F.col("source_table_granularity") == "MONTH",
F.col("periodicity") == "MONTH",
~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
),
)
def _get_data_through(df: DataFrame) -> dict:
"""Get the data through date for metric breakdowns. This is needed to account for 90/97 fiscal quarters."""
max_period_end_df = (
df.where(F.col("internal_dashboard_analysis_name").like("%simple_aggregate"))
.groupBy("periodicity")
.agg(F.max("period_end").alias("max_period_end"))
.withColumn(
"temp_periodicity", F.concat(F.lit("max_"), F.lower(F.col("periodicity")))
)
.groupBy()
.pivot("temp_periodicity")
.agg(F.first("max_period_end"))
)
max_date_col = ["max_day", "max_month", "max_quarter"]
for col in max_date_col:
if col not in max_period_end_df.columns:
max_period_end_df = max_period_end_df.withColumn(col, F.lit(None))
data_through_dict = (
max_period_end_df.withColumn(
"data_through",
F.coalesce(F.col("max_day"), F.col("max_month"), F.col("max_quarter")),
)
.first()
.asDict()
)
return data_through_dict
@track_usage
[docs]
def standard_metric_quarter_month_pivot(df: DataFrame) -> DataFrame:
"""
Transforms a metric's unified KPI table to generate a dataframe containing MTD and QTD analyses.
:param df: A dataframe of a metric's unified KPI table
:return: DataFrame of MTD and QTD analyses for a metric
Examples
^^^^^^^^
.. code-block:: python
:caption: Generating MTD and QTD analyses for a metric.
from etl_toolkit import A
input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")
entity_configuration = A.entity_configuration(
top_level_entity_name="Chewy",
top_level_entity_ticker="CHWY:XNYS",
figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Net Sales - Order Date",
company_comparable_kpi=False,
currency="USD",
value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="net_sales_order_date",
source_input_date_column="date",
max_relevant_years=4,
calendar_type="52_WEEK",
trailing_period_aggregate_function="SUM",
)
unified_kpi_df = A.standard_metric_unified_kpi(
input_df,
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
df = A.standard_metric_quarter_month_pivot(unified_kpi_df)
display(df)
+------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+
|top_level_entity_ticker |top_level_entity_name |period_start |period_end |metric_name |...|periodicity |analysis_label |...|value |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+
|CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|QUARTER |Nominal |...|3252.308829 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+
|CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|QUARTER |Y/Y Growth |...|0.152283 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+
|CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|QUARTER |2Y CAGR |...|0.094586 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+
|CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|QUARTER |3Y CAGR |...|0.106234 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+
|CHWY:XNYS |Chewy |2024-10-28 |2025-02-02 |Net Sales - Order Date |...|QUARTER |4Y CAGR |...|0.123719 |...|
+------------------------+----------------------+-------------+-----------+-----------------------+---+------------+---------------+---+------------+---+
"""
_validate_unified_kpi_input_df(df)
data_through_dict = _get_data_through(df)
metric_configurations = _get_metric_configs(df)
filtered_df = (
df.where(F.col("periodicity") != "DAY") # Filter to quarterly and monthly data
.where(F.col("slice_name_1").isNull()) # Exclude sliced analyses
.withColumns(
{
"source_table_granularity": F.lit(
metric_configurations["source_table_granularity"]
),
"metric_config_aggregate_function": F.lit(
metric_configurations["metric_config_aggregation_type"]
),
"growth_rate_type": F.lit(metric_configurations["growth_rate_type"]),
"duration": F.regexp_replace(
F.col("internal_dashboard_analysis_name"), r"(\D)", ""
).try_cast("int"),
"data_through": F.lit(data_through_dict["data_through"]),
"max_quarter": F.lit(data_through_dict["max_quarter"]),
"max_month": F.lit(data_through_dict["max_month"]),
"analysis_label": analysis_label,
"analysis_label_sort": F.coalesce(F.col("duration") + 1, F.lit(1)),
"fiscal_period_label": fiscal_period_label,
"fiscal_period_sort": fiscal_period_sort,
"format_label": F.when(
F.col("calculation_type") == "SIMPLE_AGGREGATE", F.lit("NUMBER")
).otherwise(F.lit("PERCENT")),
"final_value": F.when(
F.col("calculation_type") == "SIMPLE_AGGREGATE",
F.col("value") / F.col("value_divisor"),
).otherwise(F.col("value")),
}
)
.where(F.col("metric_config_aggregate_function") == F.col("aggregation_type"))
.where(combine_ptd_and_full_period_filter)
)
pivot_df = (
filtered_df
# Filter on the most recent full quarter and QTD (period_index=0,1,2)
# When applicable, the pivot table looks at the most recent full two months and MTD
.where(F.col("period_index").isin([0, 1, 2])).select(
"top_level_entity_ticker",
"top_level_entity_name",
"period_start",
"period_end",
"metric_name",
F.col("visible_alpha_id").alias("metric_va_id"),
"analysis_name",
F.lit("Key Metrics").alias("dashboard_analysis_name"),
"periodicity",
"analysis_label",
"analysis_label_sort",
"fiscal_period_label",
"fiscal_period_sort",
"format_label",
F.col("final_value").alias("value"),
"currency",
"value_divisor",
"quarter_label_period_end",
F.col("period_index").alias("sequential_index"),
F.current_timestamp().alias("publication_timestamp"),
F.col("source_table_granularity").alias("data_source_table_grain"),
"internal_metric_id",
)
)
return pivot_df