from pyspark.sql import functions as F, DataFrame, Window as W
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit.analyses.standard_metrics.helpers import (
_validate_unified_kpi_input_df,
_get_metric_configs,
_get_latest_growth_rates,
_get_date_bounds,
_get_filtered_calendar_df,
_get_previous_year_columns,
)
def _get_base_df(
df: DataFrame,
metric_aggregation_type: str = None,
) -> DataFrame:
"""Generate base table with analysis filtered to daily aggregate."""
base_df = df.where(
F.col("internal_dashboard_analysis_name") == "day_simple_aggregate"
).where(F.col("aggregation_type") == metric_aggregation_type)
return base_df
def _get_txd_base_df(df: DataFrame) -> DataFrame:
"""Generate base table with analysis filtered to daily trailing day aggregate."""
txd_base_df = (
df.where(
F.col("internal_dashboard_analysis_name")
== "day_simple_aggregate_trailing_day"
)
.where(
F.col("aggregation_type") == "AVG"
) # Daily Comparable will always use AVG to aggregate
.select("period_start", F.col("value").alias("txd_value"))
)
return txd_base_df
def _get_ptd_sales_expression(metric_aggregation_type: str, granularity: str):
"""Generate period-to-date (PTD) sales expression from daily periodicity."""
# Determine period prefix and partition columns based on granularity
period_prefix = "qtd" # Quarter-to-date by default
partition_by_columns = [
"quarter_start_period_start",
"quarter_end_period_start",
]
# For half-year reporting, use different prefix and partition columns
if granularity == "HALF_YEAR":
period_prefix = "ptd"
partition_by_columns = ["report_period_start"]
# Create window specification for cumulative calculation within period
ptd_sales_window = (
W.partitionBy(*partition_by_columns)
.orderBy("period_start")
.rowsBetween(W.unboundedPreceding, W.currentRow)
)
# Use appropriate aggregation function based on metric type
aggregate_function = F.avg if metric_aggregation_type == "AVG" else F.sum
# Return the window expression with appropriate alias
return (
aggregate_function(F.col("value"))
.over(ptd_sales_window)
.alias(f"{period_prefix}_sales")
)
def _join_ptd_sales_df(
df: DataFrame,
metric_calendar_type: str,
metric_max_relevant_years: int,
granularity: str,
) -> DataFrame:
"""
Iterate through relevant years and join previous years' data to enable year-over-year
comparisons of period-to-date performance. Handles different calendar types and
leap year adjustments by determining if PTD should be calculated using:
- date-to-date comparison (exact calendar dates)
- period day-to-period day comparison (relative position in period)
"""
# Set up variables based on granularity (quarter or half-year)
period_prefix = "qtd" # Quarter-to-date by default
days_in_column = "days_in_quarter"
# For quarter-end dates, use the total quarter sales instead of QTD sales
ptd_sales_previous_year_expression = F.when(
F.col("end_of_quarter") == 1, F.col("total_quarter_sales")
).otherwise(F.col(f"{period_prefix}_sales"))
# For half-year reporting, use different column names and expressions
if granularity == "HALF_YEAR":
period_prefix = "ptd"
days_in_column = "days_in_report"
ptd_sales_previous_year_expression = F.col(f"{period_prefix}_sales")
# Determine join columns based on calendar type
# 52_WEEK calendar joins on year, quarter, and days in period
# EXACT_N_YEARS calendar joins on exact calendar date from previous year
if metric_calendar_type == "52_WEEK":
join_columns = ["join_year", "quarter", days_in_column]
else:
join_columns = ["join_date"]
# Process each previous year up to the maximum relevant years
for year_previous in range(1, metric_max_relevant_years + 1):
# Create join columns for the current iteration
df = df.withColumns(
{
"join_year": F.col("year"),
"join_date": F.col("period_start")
- F.expr(f"INTERVAL {year_previous} YEAR"),
}
)
# Create a DataFrame with previous year's data to join
join_df = df.select(
days_in_column,
F.col("period_start").alias("join_date"),
(F.col("year") + year_previous).alias("join_year"),
"quarter",
F.col("value").alias(f"value_previous_{year_previous}_year"),
F.col("txd_value").alias(f"txd_value_previous_{year_previous}_year"),
ptd_sales_previous_year_expression.alias(
f"{period_prefix}_sales_previous_{year_previous}_year"
),
).select(
*join_columns,
f"value_previous_{year_previous}_year",
f"txd_value_previous_{year_previous}_year",
f"{period_prefix}_sales_previous_{year_previous}_year",
)
df = df.join(join_df, join_columns, "left").drop("join_date", "join_year")
if metric_max_relevant_years == 1:
# If these is only 1 previous year, automatically add the second year for daily progress previous y/y calculation
df = df.withColumn(
"join_date", F.col("period_start") - F.expr("INTERVAL 2 YEAR")
)
join_df = df.select(
F.col("period_start").alias("join_date"),
(F.col("year") + 2).alias("join_year"),
"quarter",
days_in_column,
F.col(f"{period_prefix}_sales").alias(
f"{period_prefix}_sales_previous_2_year"
),
)
df = df.join(join_df, join_columns, "left").drop("join_date", "join_year")
return df
def _get_previous_week_df(
df: DataFrame,
metric_calendar_type: str,
metric_max_relevant_years: int,
granularity: str,
) -> DataFrame:
period_prefix = "qtd"
period_index_column = "quarter_period_index"
period_index_window = W.orderBy(F.col("quarter_start_period_start").desc())
if granularity == "HALF_YEAR":
period_prefix = "ptd"
period_index_column = "report_period_index"
period_index_window = W.orderBy(F.col("report_period_start").desc())
ptd_sales_df = _join_ptd_sales_df(
df, metric_calendar_type, metric_max_relevant_years, granularity
)
# Filldown PTD values from previous years for retail calendars that have an
# extra week every 6 years. Only fill PTD values for the 2 years prior for PTD chart
ptd_filldown_df = ptd_sales_df.withColumns(
{
f"{period_prefix}_sales_previous_1_year": F.first_value(
F.col(f"{period_prefix}_sales_previous_1_year")
).over(
W.partitionBy(
F.count(F.col(f"{period_prefix}_sales_previous_1_year")).over(
W.orderBy("period_start")
)
).orderBy("period_start")
),
f"{period_prefix}_sales_previous_2_year": F.first_value(
F.col(f"{period_prefix}_sales_previous_2_year")
).over(
W.partitionBy(
F.count(F.col(f"{period_prefix}_sales_previous_2_year")).over(
W.orderBy("period_start")
)
).orderBy("period_start")
),
}
)
# Calculate current and previous year growth
growth_df = ptd_filldown_df.withColumns(
{
period_index_column: (
(F.dense_rank().over(period_index_window)) - 1
), # Assign perid index column for final filtering
f"{period_prefix}_y_y_growth_current": F.try_subtract(
F.try_divide(
F.col(f"{period_prefix}_sales"),
F.col(f"{period_prefix}_sales_previous_1_year"),
),
F.lit(1),
),
f"{period_prefix}_y_y_growth_previous_1_year": F.try_subtract(
F.try_divide(
F.col(f"{period_prefix}_sales_previous_1_year"),
F.col(f"{period_prefix}_sales_previous_2_year"),
),
F.lit(1),
),
}
)
# Derive T7D growth for comparison to current day
previous_week_df = growth_df.withColumn(
f"{period_prefix}_y_y_growth_previous_1_week",
F.lag(F.col(f"{period_prefix}_y_y_growth_current"), 7).over(
W.orderBy("period_start")
),
)
return previous_week_df
def _get_current_values(df: DataFrame, granularity: str) -> dict:
"""Isolate the current fiscal year for use in extrapolation."""
days_in_column = (
"days_in_report" if granularity == "HALF_YEAR" else "days_in_quarter"
)
return (
df.where(F.col("period_index") == 0)
.select(
F.col("year").alias("current_fiscal_year"),
F.col(days_in_column).alias("current_days_in_period"),
)
.first()
.asDict()
)
def _get_ptd_progress_df(
df: DataFrame,
metric_max_relevant_years: int,
granularity: str,
metric_configs: dict,
) -> DataFrame:
"""Format the final PTD progress DataFrame."""
period_prefix = "qtd"
period_index_column = "quarter_period_index"
days_in_column = "days_in_quarter"
period_columns = [
F.col("quarter_start_period_start").alias("quarter_start"),
F.col("quarter_end_period_start").alias("quarter_end"),
period_index_column,
]
if granularity == "HALF_YEAR":
period_prefix = "ptd"
period_index_column = "report_period_index"
days_in_column = "days_in_report"
period_columns = [
"report_period_start",
"report_period_end",
period_index_column,
"report_label",
]
previous_year_columns = _get_previous_year_columns(
metric_max_relevant_years, granularity
)
current_values = _get_current_values(df, granularity)
metric_max_relevant_years = int(metric_configs["max_relevant_years"])
metric_relative_year_comparisons = list(range(1, metric_max_relevant_years + 1))
configs_df = df.withColumns(
{
"value_divisor": F.lit(metric_configs["value_divisor"]),
"aggregation_type": F.lit(metric_configs["aggregation_type"]),
"metric_name": F.lit(metric_configs["metric_name"]),
"top_level_entity_ticker": F.lit(
metric_configs["top_level_entity_ticker"]
).cast("string"),
"top_level_entity_name": F.lit(metric_configs["top_level_entity_name"]),
"metric_aggregate_sql_operator": F.lit(
metric_configs["metric_config_aggregation_type"]
),
"metric_trailing_period_aggregate_sql_operator": F.lit(
metric_configs["trailing_period_aggregate_function"]
).cast("string"),
"metric_trailing_period_length": F.lit(
metric_configs["trailing_period_length"]
).cast("int"),
"metric_calendar_type": F.lit(metric_configs["calendar_type"]),
"metric_growth_rate_type": F.lit(metric_configs["growth_rate_type"]),
"metric_va_id": F.lit(metric_configs["visible_alpha_id"]).cast("int"),
"metric_relative_year_comparisons": F.lit(metric_relative_year_comparisons),
"current_fiscal_year": F.lit(current_values["current_fiscal_year"]),
f"current_{days_in_column}": F.lit(
current_values["current_days_in_period"]
),
"internal_metric_id": F.lit(metric_configs["internal_metric_id"]).cast(
"int"
),
}
)
ptd_progress_df = configs_df.select(
"period_start",
"period_end",
*period_columns,
F.concat(F.col("quarter"), F.col("year")).alias("quarter_year_label"),
F.col("quarter").alias("quarter_label"),
F.col("year").alias("fiscal_year"),
"analysis_name",
F.lit("Daily Progress and Comparable").alias("dashboard_analysis_name"),
f"{period_prefix}_y_y_growth_current",
f"{period_prefix}_y_y_growth_previous_1_week",
f"{period_prefix}_y_y_growth_previous_1_year",
"value",
"txd_value",
f"{period_prefix}_sales",
*previous_year_columns,
"value_divisor",
"aggregation_type",
"metric_name",
"top_level_entity_ticker",
"top_level_entity_name",
"metric_aggregate_sql_operator",
"metric_trailing_period_aggregate_sql_operator",
"metric_trailing_period_length",
"metric_calendar_type",
"metric_growth_rate_type",
"metric_va_id",
"metric_relative_year_comparisons",
"current_fiscal_year",
f"current_{days_in_column}",
F.when(
((F.col(days_in_column) <= 5) & (F.col(f"current_{days_in_column}") >= 6))
| ((F.col(days_in_column) <= 5) & (F.col(period_index_column) > 0)),
F.lit(0),
)
.otherwise(F.lit(1))
.alias("hide_first_week"),
F.col("period_index").alias("sequential_index"),
F.current_timestamp().alias("publication_timestamp"),
"internal_metric_id",
)
return ptd_progress_df
@track_usage
[docs]
def standard_metric_daily_qtd_progress(
df: DataFrame, calendar_df: DataFrame
) -> DataFrame:
"""
Transforms a metric's unified KPI table to generate a dataframe containing daily analyses through the quarter and the comparable daily analyses in prior years.
:param df: A dataframe of a metric's unified KPI table
:param calendar_df: Calendar dataframe
:return: DataFrame of daily QTD analyses for a metric
Examples
^^^^^^^^
.. code-block:: python
:caption: Generating daily quarter-to-date progress analyses for a metric.
from etl_toolkit import A
input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")
entity_configuration = A.entity_configuration(
top_level_entity_name="Chewy",
top_level_entity_ticker="CHWY:XNYS",
figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Net Sales - Order Date",
company_comparable_kpi=False,
currency="USD",
value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="net_sales_order_date",
source_input_date_column="date",
max_relevant_years=4,
calendar_type="52_WEEK",
trailing_period_aggregate_function="SUM",
)
unified_kpi_df = A.standard_metric_unified_kpi(
input_df,
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
df = A.standard_metric_daily_qtd_progress(unified_kpi_df, calendar_df)
display(df)
+-------------+-----------+--------------+------------+---+-----------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------+----------------------+---+
|period_start |period_end |quarter_start |quarter_end |...|qtd_y_y_growth_current |qtd_y_y_growth_previous_1_week |qtd_y_y_growth_previous_1_year |value |txd_value |qtd_sales |value_previous_1_year |...|
+-------------+-----------+--------------+------------+---+-----------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------+----------------------+---+
|2025-01-01 |2025-01-01 |2024-10-28 |2025-02-02 |...|0.075278 |0.093199 |0.023156 |4555616.740619 |25919798.957255 |2168150987.259101 |35547740.244841 |...|
+-------------+-----------+--------------+------------+---+-----------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------+----------------------+---+
|2025-01-02 |2025-01-02 |2024-10-28 |2025-02-02 |...|0.070494 |0.081441 |0.025001 |30841420.191023 |28067818.239408 |2198992407.450124 |37821651.385287 |...|
+-------------+-----------+--------------+------------+---+-----------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------+----------------------+---+
|2025-01-03 |2025-01-03 |2024-10-28 |2025-02-02 |...|0.069268 |0.078758 |0.026635 |37438227.272619 |28563479.426870 |2236430634.722743 |37368904.542899 |...|
+-------------+-----------+--------------+------------+---+-----------------------+-------------------------------+-------------------------------+----------------+-----------------+------------------+----------------------+---+
"""
# Validate input DataFrame has required columns and structure
_validate_unified_kpi_input_df(df)
granularity = "DAY"
# Extract metric configuration from the input DataFrame
metric_configs = _get_metric_configs(df)
# Get the metric's aggregation type (SUM or AVG), calendar type, and max relevant years
metric_aggregation_type = metric_configs["metric_config_aggregation_type"]
metric_calendar_type = metric_configs["calendar_type"]
metric_max_relevant_years = int(metric_configs["max_relevant_years"])
# Get the latest growth rates for comparison
latest_growth_rate_col = _get_latest_growth_rates(df, granularity)
# Filter to daily aggregates with the specified aggregation type
base_df = _get_base_df(df, metric_aggregation_type)
# Get trailing day aggregates
txd_base_df = _get_txd_base_df(df)
# Generate base calendar with full quarter dates for extrapolation
base_calendar_df = calendar_df.select(
F.col("day").alias("period_start"),
F.col("day").alias("period_end"),
"quarter_period_start",
"quarter_period_end",
F.substring(F.col("quarter_label"), 1, 2).alias("quarter"),
"days_in_quarter",
F.year(
F.coalesce(F.col("custom_year_of_quarter"), F.col("quarter_period_start"))
).alias("year"),
)
date_bounds = _get_date_bounds(base_df, granularity)
filtered_calendar_df = _get_filtered_calendar_df(base_calendar_df, date_bounds)
# Join base table to calendar table with min/max date restrictions
daily_sales_df = filtered_calendar_df.join(
base_df.drop("period_end"), "period_start", "left"
)
ptd_sales_expression = _get_ptd_sales_expression(
metric_aggregation_type, granularity
)
# Get aggregate_function to use for total_quarter_sales
aggregate_function = F.avg if metric_aggregation_type == "AVG" else F.sum
ptd_sales_df = (
daily_sales_df.join(txd_base_df, "period_start", "left")
.select(
"period_start",
"period_end",
"analysis_name",
"quarter",
"year",
"days_in_quarter",
F.coalesce(
F.col("quarter_start_period_start"), F.col("quarter_period_start")
).alias("quarter_start_period_start"),
"quarter_end_period_start",
"quarter_label_period_end",
"period_index",
"value",
"txd_value",
ptd_sales_expression,
F.when(F.col("period_end") == F.col("quarter_end_period_start"), F.lit(1))
.otherwise(F.lit(0))
.alias("end_of_quarter"),
)
.withColumns(latest_growth_rate_col)
.withColumn(
"total_quarter_sales",
aggregate_function(F.col("value")).over(
W.partitionBy(
F.col("quarter_start_period_start"),
F.col("quarter_end_period_start"),
)
),
)
)
previous_week_df = _get_previous_week_df(
ptd_sales_df, metric_calendar_type, metric_max_relevant_years, granularity
)
filtered_df = previous_week_df.where(F.col("quarter_period_index") <= 1)
ptd_progress_df = _get_ptd_progress_df(
filtered_df, metric_max_relevant_years, granularity, metric_configs
)
return ptd_progress_df
@track_usage
[docs]
def standard_metric_half_year_progress(
df: DataFrame, calendar_df: DataFrame
) -> DataFrame:
"""
Transforms a metric's unified KPI table to generate a dataframe containing daily analyses through the half year and the comparable daily analyses in prior years.
:param df: A dataframe of a metric's unified KPI table
:param calendar_df: Calendar dataframe
:return: DataFrame of daily half year progress analyses for a metric
Examples
^^^^^^^^
.. code-block:: python
:caption: Generating daily half year progress analyses for a metric.
from etl_toolkit import A
input_df = spark.table("yd_production.itx_reported.itx_gmv_es")
calendar_df = spark.table("yd_fp_investor_audit.itx_xmad_deliverable_gold.custom_calendar__dmv__000")
entity_configuration = A.entity_configuration(
top_level_entity_name="Inditex",
top_level_entity_ticker="ITX:XMAD",
exchange=None,
entity_name=None,
figi=None,
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Spain Net Sales",
company_comparable_kpi=True,
uses_va_for_actuals=False,
display_period_granularity="DAY",
report_period_granularity="HALF_YEAR",
currency="EUR",
value_divisor=1000000,
visible_alpha_id=None,
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="value",
source_input_date_column="date",
source_table_granularity="DAY",
aggregate_function="SUM",
max_relevant_years=2,
growth_rate_type="CAGR",
calendar_type="EXACT_N_YEARS",
source_table_filter_conditions=None,
slice_columns=None,
trailing_period_length=7,
trailing_period_aggregate_function="AVG",
)
unified_kpi_df = A.standard_metric_unified_kpi(
input_df,
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
df = A.standard_metric_half_year_progress(unified_kpi_df, calendar_df)
display(df)
+-------------+-----------+--------------------+------------------+---+-----------------------+-------------------------------+-------------------------------+----------------+----------------+------------------+----------------------+---+
|period_start |period_end |report_period_start |report_period_end |...|ptd_y_y_growth_current |ptd_y_y_growth_previous_1_week |ptd_y_y_growth_previous_1_year |value |txd_value |ptd_sales |value_previous_1_year |...|
+-------------+-----------+--------------------+------------------+---+-----------------------+-------------------------------+-------------------------------+----------------+----------------+------------------+----------------------+---+
|2025-01-01 |2025-01-01 |2024-08-01 |2025-01-31 |...|0.090465 |0.093223 |0.126094 |2425970.385558 |19608148.865366 |2517065314.305471 |3136880.539406 |...|
+-------------+-----------+--------------------+------------------+---+-----------------------+-------------------------------+-------------------------------+----------------+----------------+------------------+----------------------+---+
|2025-01-02 |2025-01-02 |2024-08-01 |2025-01-31 |...|0.090634 |0.091252 |0.126692 |30053304.417443 |20729009.931905 |2547118618.722914 |27198458.331024 |...|
+-------------+-----------+--------------------+------------------+---+-----------------------+-------------------------------+-------------------------------+----------------+----------------+------------------+----------------------+---+
|2025-01-03 |2025-01-03 |2024-08-01 |2025-01-31 |...|0.091695 |0.091673 |0.126264 |31495293.875604 |21798167.567119 |2578613912.598518 |26579190.577483 |...|
+-------------+-----------+--------------------+------------------+---+-----------------------+-------------------------------+-------------------------------+----------------+----------------+------------------+----------------------+---+
"""
# Validate input DataFrame has required columns and structure
_validate_unified_kpi_input_df(df)
granularity = "HALF_YEAR"
# Extract metric configuration from the input DataFrame
metric_configs = _get_metric_configs(df)
# Get the metric's aggregation type (SUM or AVG), calendar type, and max relevant years
metric_aggregation_type = metric_configs["metric_config_aggregation_type"]
metric_calendar_type = metric_configs["calendar_type"]
metric_max_relevant_years = int(metric_configs["max_relevant_years"])
# Get the latest growth rates for comparison
latest_growth_rate_col = _get_latest_growth_rates(df, granularity)
# Filter to daily aggregates with the specified aggregation type
base_df = _get_base_df(df, metric_aggregation_type)
# Get trailing day aggregates
txd_base_df = _get_txd_base_df(df)
# Generate base calendar with full period dates for extrapolation
base_calendar_df = calendar_df.select(
F.col("day").alias("period_start"),
F.col("day").alias("period_end"),
F.substring(F.col("quarter_label"), 1, 2).alias("quarter"),
F.right(F.col("year_label"), F.lit(4)).alias("year"),
F.col("half_year_period_start").alias("report_period_start"),
F.col("half_year_period_end").alias("report_period_end"),
F.col("days_in_half_year").alias("days_in_report"),
F.col("half_year_label").alias("report_label"),
)
date_bounds = _get_date_bounds(base_df, granularity)
filtered_calendar_df = _get_filtered_calendar_df(base_calendar_df, date_bounds)
# Join base table to calendar table with min/max date restrictions
daily_sales_df = filtered_calendar_df.join(
base_df.drop("period_end"), "period_start", "left"
)
ptd_sales_expression = _get_ptd_sales_expression(
metric_aggregation_type, granularity
)
ptd_sales_df = (
daily_sales_df.join(txd_base_df, "period_start", "left")
.select(
"period_start",
"period_end",
"analysis_name",
"quarter",
"year",
"days_in_report",
"report_period_start",
"report_period_end",
"period_index",
"report_label",
"value",
"txd_value",
ptd_sales_expression,
)
.withColumns(latest_growth_rate_col)
)
previous_week_df = _get_previous_week_df(
ptd_sales_df,
metric_calendar_type,
metric_max_relevant_years,
granularity,
)
filtered_df = previous_week_df.where(F.col("report_period_index") <= 1)
ptd_progress_df = _get_ptd_progress_df(
filtered_df, metric_max_relevant_years, granularity, metric_configs
).drop(
"internal_metric_id"
) # internal_metric_id was never added to half_year_progress template
return ptd_progress_df