from pyspark.sql import SparkSession
from pyspark.sql import functions as F, DataFrame, Window as W
from yipit_databricks_utils.helpers.telemetry import track_usage
from yipit_databricks_utils.helpers.pyspark_utils import get_spark_session
from etl_toolkit import E
from etl_toolkit.analyses.standard_metrics.helpers import (
_validate_unified_kpi_input_df,
_get_metric_configs,
_get_latest_growth_rates,
_get_date_bounds,
_get_filtered_calendar_df,
_get_previous_year_columns,
STANDARD_DECIMAL_TYPE,
)
def _get_qtd_calc_df(df: DataFrame, metric_aggregation_type: str) -> DataFrame:
"""Calculate quarter-to-date (QTD) nominal values for each period and pivot the data."""
pivot_df = (
df.select(
"value",
"quarter_end_period_end",
"period_index",
"period_start",
"period_end",
"internal_dashboard_analysis_name",
)
.withColumn(
"internal_dashboard_analysis_name",
F.regexp_replace(
F.regexp_replace(
F.col("internal_dashboard_analysis_name"), "(day|month)_", "period_"
),
"growth_rate_trailing_day",
"growth_rate",
),
)
.groupBy(
"period_start",
"period_end",
"period_index",
"quarter_end_period_end",
)
.pivot("internal_dashboard_analysis_name")
.sum("value")
)
qtd_val_window = (
W.partitionBy("quarter_end_period_end")
.orderBy("period_start")
.rowsBetween(W.unboundedPreceding, W.currentRow)
)
aggregate_function = F.avg if metric_aggregation_type == "AVG" else F.sum
qtd_val_expression = (
aggregate_function(F.col("period_simple_aggregate"))
.over(qtd_val_window)
.cast(STANDARD_DECIMAL_TYPE)
)
return pivot_df.withColumn("qtd_period_val", qtd_val_expression)
def _get_current_previous_values(df: DataFrame):
"""Extract key values from current and previous periods for comparison."""
current_values = (
df.where(F.col("period_index") == 0)
.select(
"qtd_yy_period",
"quarter_year_label",
)
.first()
.asDict()
)
previous_values = (
df.where(F.col("quarter_index") == 1)
.select("quarter_year_label")
.first()
.asDict()
)
previous_week_values = (
df.where(F.col("period_index") == 7).select("qtd_yy_period").first().asDict()
)
return {
"current_qtd_yy_period": F.lit(current_values["qtd_yy_period"]).cast(
STANDARD_DECIMAL_TYPE
),
"current_quarter_year_label": F.lit(current_values["quarter_year_label"]),
"previous_quarter_year_label": F.lit(previous_values["quarter_year_label"]),
"previous_week_qtd_yy_period": F.lit(
previous_week_values["qtd_yy_period"]
).cast(STANDARD_DECIMAL_TYPE),
}
def _get_qtd_yy_final_df(
df: DataFrame, metric_configs: dict, date_bounds: dict
) -> DataFrame:
"""Calculate year-over-year QTD values and add metric configuration metadata."""
qtd_yy_df = df.withColumns(
{
"quarter_year_label": F.concat(F.col("quarter"), F.col("year")),
"qtd_yy_period": F.try_subtract(
F.try_divide(F.col("qtd_period_val"), F.col("qtd_1y_period_val")),
F.lit(1),
),
"qtd_yy_period_previous_year": F.try_subtract(
F.try_divide(F.col("qtd_1y_period_val"), F.col("qtd_2y_period_val")),
F.lit(1),
),
}
)
current_previous_values = _get_current_previous_values(qtd_yy_df)
metric_max_relevant_years = int(metric_configs["max_relevant_years"])
metric_relative_year_comparisons = list(range(1, metric_max_relevant_years + 1))
qtd_yy_final_df = qtd_yy_df.withColumns(current_previous_values).withColumns(
{
"value_divisor": F.lit(metric_configs["value_divisor"]),
"metric_aggregate_sql_operator": F.lit(
metric_configs["metric_config_aggregation_type"]
),
"metric_growth_rate_type": F.lit(metric_configs["growth_rate_type"]),
"metric_va_id": F.lit(metric_configs["visible_alpha_id"]).cast("int"),
"relative_years": F.lit(metric_relative_year_comparisons),
"data_source_table_grain": F.lit(
metric_configs["source_table_granularity"]
),
"internal_metric_id": F.lit(metric_configs["internal_metric_id"]).cast(
"int"
),
"metric_trailing_sql_aggregate_operator": F.lit(
metric_configs["trailing_period_aggregate_function"]
).cast("string"),
"month_label_expression": F.concat(
F.monthname("period_end"), F.date_format(F.col("period_end"), "yy")
),
"max_quarter_end_period_end": F.lit(date_bounds["max_report_period_end"]),
"max_period_start": F.lit(date_bounds["max_period_start"]),
"max_period_end": F.lit(date_bounds["max_period_end"]),
"min_period_start": F.lit(date_bounds["min_period_start"]),
}
)
return qtd_yy_final_df
def _get_daily_table_df(
df: DataFrame,
granularity: str,
spark: SparkSession = None,
) -> DataFrame:
spark = spark or get_spark_session()
days_in_column = "days_in_quarter"
join_expression = F.col("day") == F.col("period_start")
if granularity == "MONTH":
days_in_column = "months_in_quarter"
join_expression = F.date_trunc("month", F.col("day")) == F.col("period_start")
days_df = (spark.table("yd_1p_central.time.days")).select("day")
daily_table_df = (
days_df.join(
df,
join_expression,
"left",
)
.where(
E.between(
F.col("day"),
F.col("min_period_start"),
F.col("max_quarter_end_period_end"),
)
)
.select(
"day",
F.col("quarter").alias("yd_quarter"),
F.col(days_in_column).alias(f"yd_{days_in_column}"),
F.col("quarter_index").alias("yd_quarter_index"),
F.when(
F.col("quarter_index") == 0, F.col("current_quarter_year_label")
).alias("current_quarter_display"),
F.when(
F.col("quarter_index") == 1, F.col("previous_quarter_year_label")
).alias("previous_quarter_display"),
)
)
return daily_table_df
def _get_qtd_progress_df(
df: DataFrame,
metric_max_relevant_years: int,
granularity: str,
metric_configs: dict,
) -> DataFrame:
days_in_column = (
"months_in_quarter" if granularity == "MONTH" else "days_in_quarter"
)
previous_year_columns = _get_previous_year_columns(
metric_max_relevant_years, granularity
)
qtd_period_columns = [
"qtd_period_val",
"period_index",
"qtd_1y_period_val",
"qtd_2y_period_val",
"qtd_yy_period",
"qtd_yy_period_previous_year",
]
final_columns = [
"relative_years",
"data_source_table_grain",
F.current_timestamp().alias("publication_timestamp"),
]
if granularity == "MONTH":
final_columns.insert(0, "month_label_expression")
final_columns.append("internal_metric_id")
elif granularity == "WEEK":
qtd_period_columns.append("previous_week_qtd_yy_period")
final_columns.insert(2, "metric_trailing_sql_aggregate_operator")
configs_df = df.withColumns(
{
"metric_name": F.lit(metric_configs["metric_name"]).cast("string"),
"top_level_entity_ticker": F.lit(
metric_configs["top_level_entity_ticker"]
).cast("string"),
"top_level_entity_name": F.lit(
metric_configs["top_level_entity_name"]
).cast("string"),
}
)
qtd_progress_df = configs_df.select(
"day",
"yd_quarter",
f"yd_{days_in_column}",
"yd_quarter_index",
"metric_name",
"top_level_entity_ticker",
"top_level_entity_name",
F.concat(
F.coalesce(F.col("top_level_entity_ticker"), F.lit("NONE")),
F.coalesce(F.col("top_level_entity_name"), F.lit("NONE")),
F.col("metric_name"),
).alias("metric_join_key"),
"period_start",
"period_end",
"quarter_period_start",
"quarter_period_end",
days_in_column,
"quarter",
"year",
"max_quarter_end_period_end",
"max_period_start",
"max_period_end",
"min_period_start",
"quarter_index",
*previous_year_columns,
*qtd_period_columns,
"quarter_year_label",
"current_qtd_yy_period",
"value_divisor",
"current_quarter_year_label",
"previous_quarter_year_label",
"current_quarter_display",
"previous_quarter_display",
"metric_aggregate_sql_operator",
"metric_growth_rate_type",
"metric_va_id",
*final_columns,
)
return qtd_progress_df
@track_usage
[docs]
def standard_metric_weekly_qtd_progress(
df: DataFrame,
calendar_df: DataFrame,
spark: SparkSession = None,
) -> DataFrame:
"""
Transforms a metric's unified KPI table to generate a dataframe containing weekly analyses through the quarter and the comparable weekly analyses in prior years.
:param df: A dataframe of a metric's unified KPI table
:param calendar_df: Calendar dataframe
:param spark: Optional SparkSession (will create one if not provided)
:return: DataFrame of weekly QTD analyses for a metric
Examples
^^^^^^^^
.. code-block:: python
:caption: Generating weekly quarter-to-date progress analyses for a metric.
from etl_toolkit import A
input_df = spark.table("yd_production.cmg_reported.dash_daily_rev")
calendar_df = spark.table("yd_fp_investor_audit.calendar_gold.standard_calendar__dmv__000")
entity_configuration = A.entity_configuration(
top_level_entity_name="Chipotle",
top_level_entity_ticker="CMG:XNYS",
exchange=None,
entity_name=None,
figi=None,
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Total Revenue",
company_comparable_kpi=True,
uses_va_for_actuals=False,
display_period_granularity="WEEK",
report_period_granularity="QUARTER",
currency="USD",
value_divisor=1000,
visible_alpha_id=6639756,
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="revenue",
source_input_date_column="date",
source_table_granularity="DAY",
aggregate_function="SUM",
max_relevant_years=2,
growth_rate_type="CAGR",
calendar_type="EXACT_N_YEARS",
source_table_filter_conditions=None,
slice_columns=None,
trailing_period_length=7,
trailing_period_aggregate_function="SUM",
)
unified_kpi_df = A.standard_metric_unified_kpi(
input_df,
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
df = A.standard_metric_weekly_qtd_progress(unified_kpi_df, calendar_df)
display(df)
+-----------+---+---------------------+-------------------+---+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+
|day |...|quarter_period_start |quarter_period_end |...|period_simple_aggregate |...|period_1y_growth_rate |period_1y_simple_aggregate |...|qtd_period_val |...|qtd_1y_period_val |...|qtd_yy_period_val |...|
+-----------+---+---------------------+-------------------+---+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+
|2025-01-01 |...|2025-01-01 |2025-03-31 |...|22438072.921040 |...|0.108714 |23344562.935425 |...|22438072.921040 |...|23344562.935425 |...|-0.038831 |...|
+-----------+---+---------------------+-------------------+---+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+
|2025-01-02 |...|2025-01-01 |2025-03-31 |...|31351900.461439 |...|0.080413 |29583525.428142 |...|53789973.382479 |...|52928088.363567 |...|0.016284 |...|
+-----------+---+---------------------+-------------------+---+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+
|2025-01-03 |...|2025-01-01 |2025-03-31 |...|34759511.041158 |...|0.079731 |29983351.185830 |...|88549484.423637 |...|82911439.549397 |...|0.068001 |...|
+-----------+---+---------------------+-------------------+---+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+
"""
granularity = "WEEK"
metric_configs = _get_metric_configs(df)
metric_aggregation_type = metric_configs["metric_config_aggregation_type"]
trailing_aggregation_type = metric_configs["trailing_period_aggregate_function"]
metric_calendar_type = metric_configs["calendar_type"]
metric_max_relevant_years = int(metric_configs["max_relevant_years"])
# Generate base df with analyses filtered to period aggregate and growth rates
base_df = df.where(
F.col("internal_dashboard_analysis_name").isin(
[
"day_simple_aggregate",
"day_simple_aggregate_trailing_day",
"day_1y_growth_rate_trailing_day",
"day_2y_growth_rate_trailing_day",
"day_3y_growth_rate_trailing_day",
"day_4y_growth_rate_trailing_day",
]
)
).where(
F.when(
F.col("trailing_period").isNull(),
F.col("aggregation_type") == metric_aggregation_type,
).otherwise(F.col("aggregation_type") == trailing_aggregation_type)
)
latest_growth_rate_col = _get_latest_growth_rates(df, granularity)
qtd_calc_df = _get_qtd_calc_df(base_df, metric_aggregation_type)
# Generate base calendar with full quarter dates for extrapolation
base_calendar_df = calendar_df.select(
F.col("day").alias("period_start"),
F.col("day").alias("period_end"),
"quarter_period_start",
"quarter_period_end",
F.substring(F.col("quarter_label"), 1, 2).alias("quarter"),
F.right(F.col("year_label"), F.lit(4)).alias("year"),
"days_in_quarter",
)
date_bounds = _get_date_bounds(base_df, granularity)
filtered_calendar_df = _get_filtered_calendar_df(base_calendar_df, date_bounds)
prep_df = filtered_calendar_df.join(
qtd_calc_df, ["period_start", "period_end"], "left"
).withColumn(
"quarter_index",
(F.dense_rank().over(W.orderBy(F.col("quarter_period_end").desc())) - 1),
)
if metric_calendar_type == "52_WEEK":
join_columns = ["join_year", "quarter", "days_in_quarter"]
else:
join_columns = ["join_date"]
for year_previous in range(1, metric_max_relevant_years + 1):
# Join previous years data for QTD calculations
prep_df = prep_df.withColumns(
{
"join_year": F.col("year"),
"join_date": (
F.col("period_start") - F.expr(f"INTERVAL {year_previous} YEAR")
),
}
)
join_df = prep_df.select(
"quarter",
"days_in_quarter",
F.col("period_start").alias("join_date"),
(F.col("year") + year_previous).alias("join_year"),
F.col("period_simple_aggregate").alias(
f"period_{year_previous}y_simple_aggregate"
),
F.col("qtd_period_val").alias(f"qtd_{year_previous}y_period_val"),
F.col("period_simple_aggregate_trailing_day").alias(
f"period_{year_previous}y_simple_aggregate_trailing_day"
),
).select(
*join_columns,
f"period_{year_previous}y_simple_aggregate",
f"qtd_{year_previous}y_period_val",
f"period_{year_previous}y_simple_aggregate_trailing_day",
)
prep_df = prep_df.join(join_df, join_columns, "left").drop(
"join_date", "join_year"
)
qtd_yy_final_df = _get_qtd_yy_final_df(
prep_df, metric_configs, date_bounds
).withColumns(latest_growth_rate_col)
spark = spark or get_spark_session()
daily_table_df = _get_daily_table_df(qtd_yy_final_df, granularity, spark=spark)
combined_df = daily_table_df.join(
qtd_yy_final_df, F.col("day") == F.col("period_end"), "left"
)
qtd_progress_df = _get_qtd_progress_df(
combined_df, metric_max_relevant_years, granularity, metric_configs
)
return qtd_progress_df
@track_usage
[docs]
def standard_metric_monthly_qtd_progress(
df: DataFrame,
calendar_df: DataFrame,
spark: SparkSession = None,
) -> DataFrame:
"""
Transforms a metric's unified KPI table to generate a dataframe containing monthly analyses through the quarter and the comparable monthly analyses in prior years.
:param df: A dataframe of a metric's unified KPI table
:param calendar_df: Calendar dataframe
:param spark: Optional SparkSession (will create one if not provided)
:return: DataFrame of monthly QTD analyses for a metric
Examples
^^^^^^^^
.. code-block:: python
:caption: Generating monthly quarter-to-date progress analyses for a metric.
from etl_toolkit import A
input_df = spark.table("yd_production.cn_shortvid_reported.active_users")
calendar_df = spark.table("yd_fp_investor_audit.calendar_gold.standard_calendar__dmv__000")
entity_configuration = A.entity_configuration(
top_level_entity_name="ByteDance",
top_level_entity_ticker=None,
exchange=None,
entity_name="Douyin",
figi=None,
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Monthly Active Users",
company_comparable_kpi=False,
uses_va_for_actuals=False,
display_period_granularity="MONTH",
report_period_granularity="QUARTER",
value_divisor=1000000,
visible_alpha_id=None,
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="mau",
source_input_date_column="month",
source_table_granularity="MONTH",
max_relevant_years=2,
aggregate_function="AVG",
growth_rate_type="CAGR",
calendar_type="EXACT_N_YEARS",
source_table_filter_conditions=[F.col("appname") == "Douyin Core"],
slice_columns=["appname"],
trailing_period_length=None,
trailing_period_aggregate_function=None,
)
unified_kpi_df = A.standard_metric_unified_kpi(
input_df,
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
df = A.standard_metric_monthly_qtd_progress(unified_kpi_df, calendar_df)
display(df)
+---+-------------+-----------+-------------------+---------------------+---+------------------------+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+
|...|period_start |period_end |quarter_period_start |quarter_period_end |...|period_simple_aggregate |period_1y_growth_rate |period_1y_simple_aggregate |...|qtd_period_val |...|qtd_1y_period_val |...|qtd_yy_period_val |...|
+---+-------------+-----------+-------------------+---------------------+---+------------------------+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+
|...|2024-10-01 |2024-10-31 |2024-10-01 |2024-12-31 |...|1036495571.000000 |0.175859 |881479766.500000 |...|1036495571.000000 |...|881479766.500000 |...|0.175859 |...|
+---+-------------+-----------+-------------------+---------------------+---+------------------------+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+
|...|2024-11-01 |2024-11-30 |2024-10-01 |2024-12-31 |...|1055118217.000000 |0.179763 |894347880.300000 |...|1045806894.000000 |...|887913823.400000 |...|0.177825 |...|
+---+-------------+-----------+-------------------+---------------------+---+------------------------+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+
|...|2024-12-01 |2024-12-31 |2024-10-01 |2024-12-31 |...|1072948696.000000 |0.177513 |911198916.900000 |...|1054854161.333333 |...|895675521.233333 |...|0.177719 |...|
+---+-------------+-----------+-------------------+---------------------+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+
"""
granularity = "MONTH"
metric_configs = _get_metric_configs(df)
metric_aggregation_type = metric_configs["metric_config_aggregation_type"]
metric_max_relevant_years = int(metric_configs["max_relevant_years"])
metric_relative_year_comparisons = list(range(1, metric_max_relevant_years + 1))
latest_growth_rate_col = _get_latest_growth_rates(df, granularity)
# Generate base table with analysis filtered to period aggregate and growth rates
base_df = df.where(
F.col("internal_dashboard_analysis_name").isin(
[
"month_simple_aggregate",
"month_1y_growth_rate",
"month_2y_growth_rate",
"month_3y_growth_rate",
"month_4y_growth_rate",
]
)
)
qtd_calc_df = _get_qtd_calc_df(base_df, metric_aggregation_type)
# Generate base calendar with full quarter dates for extrapolation
base_calendar_df = calendar_df.select(
"quarter_period_start",
"quarter_period_end",
F.substring(F.col("quarter_label"), 1, 2).alias("quarter"),
F.right(F.col("year_label"), F.lit(4)).alias("year"),
F.col("month_period_start").alias("period_start"),
F.col("month_period_end").alias("period_end"),
F.months_between(F.add_months(F.col("day"), 1), F.col("quarter_period_start"))
.cast("bigint")
.alias("months_in_quarter"),
)
# Aggregate calendar to monthly periods
aggregate_calendar_df = base_calendar_df.groupBy("period_start", "period_end").agg(
F.first("quarter_period_start").alias("quarter_period_start"),
F.first("quarter_period_end").alias("quarter_period_end"),
F.first("months_in_quarter").alias("months_in_quarter"),
F.first("quarter").alias("quarter"),
F.first("year").alias("year"),
)
date_bounds = _get_date_bounds(base_df, granularity)
filtered_calendar_df = _get_filtered_calendar_df(aggregate_calendar_df, date_bounds)
prep_df = filtered_calendar_df.join(
qtd_calc_df, ["period_start", "period_end"], "left"
).withColumn(
"quarter_index",
(F.dense_rank().over(W.orderBy(F.col("quarter_period_end").desc())) - 1),
)
for year_previous in range(1, metric_max_relevant_years + 1):
# Join previous years data for QTD calculations
prep_df = prep_df.withColumns(
{
"join_year": F.col("year"),
"join_date": (
F.col("period_start") - F.expr(f"INTERVAL {year_previous} YEAR")
),
}
)
join_df = prep_df.select(
"months_in_quarter",
F.col("period_start").alias("join_date"),
F.col("period_simple_aggregate").alias(
f"period_{year_previous}y_simple_aggregate"
),
F.col("qtd_period_val").alias(f"qtd_{year_previous}y_period_val"),
)
prep_df = prep_df.join(
join_df, ["months_in_quarter", "join_date"], "left"
).drop("join_date", "join_year")
qtd_yy_final_df = _get_qtd_yy_final_df(
prep_df, metric_configs, date_bounds
).withColumns(latest_growth_rate_col)
spark = spark or get_spark_session()
daily_table_df = _get_daily_table_df(qtd_yy_final_df, granularity, spark=spark)
combined_df = daily_table_df.join(
qtd_yy_final_df, F.col("day") == F.col("period_end"), "left"
)
qtd_progress_df = _get_qtd_progress_df(
combined_df, metric_max_relevant_years, granularity, metric_configs
)
return qtd_progress_df