Source code for etl_toolkit.analyses.standard_metrics.weekly_monthly_qtd_progress

from pyspark.sql import SparkSession
from pyspark.sql import functions as F, DataFrame, Window as W

from yipit_databricks_utils.helpers.telemetry import track_usage
from yipit_databricks_utils.helpers.pyspark_utils import get_spark_session

from etl_toolkit import E
from etl_toolkit.analyses.standard_metrics.helpers import (
    _validate_unified_kpi_input_df,
    _get_metric_configs,
    _get_latest_growth_rates,
    _get_date_bounds,
    _get_filtered_calendar_df,
    _get_previous_year_columns,
    STANDARD_DECIMAL_TYPE,
)


def _get_qtd_calc_df(df: DataFrame, metric_aggregation_type: str) -> DataFrame:
    """Calculate quarter-to-date (QTD) nominal values for each period and pivot the data."""

    pivot_df = (
        df.select(
            "value",
            "quarter_end_period_end",
            "period_index",
            "period_start",
            "period_end",
            "internal_dashboard_analysis_name",
        )
        .withColumn(
            "internal_dashboard_analysis_name",
            F.regexp_replace(
                F.regexp_replace(
                    F.col("internal_dashboard_analysis_name"), "(day|month)_", "period_"
                ),
                "growth_rate_trailing_day",
                "growth_rate",
            ),
        )
        .groupBy(
            "period_start",
            "period_end",
            "period_index",
            "quarter_end_period_end",
        )
        .pivot("internal_dashboard_analysis_name")
        .sum("value")
    )

    qtd_val_window = (
        W.partitionBy("quarter_end_period_end")
        .orderBy("period_start")
        .rowsBetween(W.unboundedPreceding, W.currentRow)
    )

    aggregate_function = F.avg if metric_aggregation_type == "AVG" else F.sum
    qtd_val_expression = (
        aggregate_function(F.col("period_simple_aggregate"))
        .over(qtd_val_window)
        .cast(STANDARD_DECIMAL_TYPE)
    )

    return pivot_df.withColumn("qtd_period_val", qtd_val_expression)


def _get_current_previous_values(df: DataFrame):
    """Extract key values from current and previous periods for comparison."""

    current_values = (
        df.where(F.col("period_index") == 0)
        .select(
            "qtd_yy_period",
            "quarter_year_label",
        )
        .first()
        .asDict()
    )

    previous_values = (
        df.where(F.col("quarter_index") == 1)
        .select("quarter_year_label")
        .first()
        .asDict()
    )

    previous_week_values = (
        df.where(F.col("period_index") == 7).select("qtd_yy_period").first().asDict()
    )

    return {
        "current_qtd_yy_period": F.lit(current_values["qtd_yy_period"]).cast(
            STANDARD_DECIMAL_TYPE
        ),
        "current_quarter_year_label": F.lit(current_values["quarter_year_label"]),
        "previous_quarter_year_label": F.lit(previous_values["quarter_year_label"]),
        "previous_week_qtd_yy_period": F.lit(
            previous_week_values["qtd_yy_period"]
        ).cast(STANDARD_DECIMAL_TYPE),
    }


def _get_qtd_yy_final_df(
    df: DataFrame, metric_configs: dict, date_bounds: dict
) -> DataFrame:
    """Calculate year-over-year QTD values and add metric configuration metadata."""

    qtd_yy_df = df.withColumns(
        {
            "quarter_year_label": F.concat(F.col("quarter"), F.col("year")),
            "qtd_yy_period": F.try_subtract(
                F.try_divide(F.col("qtd_period_val"), F.col("qtd_1y_period_val")),
                F.lit(1),
            ),
            "qtd_yy_period_previous_year": F.try_subtract(
                F.try_divide(F.col("qtd_1y_period_val"), F.col("qtd_2y_period_val")),
                F.lit(1),
            ),
        }
    )

    current_previous_values = _get_current_previous_values(qtd_yy_df)

    metric_max_relevant_years = int(metric_configs["max_relevant_years"])
    metric_relative_year_comparisons = list(range(1, metric_max_relevant_years + 1))

    qtd_yy_final_df = qtd_yy_df.withColumns(current_previous_values).withColumns(
        {
            "value_divisor": F.lit(metric_configs["value_divisor"]),
            "metric_aggregate_sql_operator": F.lit(
                metric_configs["metric_config_aggregation_type"]
            ),
            "metric_growth_rate_type": F.lit(metric_configs["growth_rate_type"]),
            "metric_va_id": F.lit(metric_configs["visible_alpha_id"]).cast("int"),
            "relative_years": F.lit(metric_relative_year_comparisons),
            "data_source_table_grain": F.lit(
                metric_configs["source_table_granularity"]
            ),
            "internal_metric_id": F.lit(metric_configs["internal_metric_id"]).cast(
                "int"
            ),
            "metric_trailing_sql_aggregate_operator": F.lit(
                metric_configs["trailing_period_aggregate_function"]
            ).cast("string"),
            "month_label_expression": F.concat(
                F.monthname("period_end"), F.date_format(F.col("period_end"), "yy")
            ),
            "max_quarter_end_period_end": F.lit(date_bounds["max_report_period_end"]),
            "max_period_start": F.lit(date_bounds["max_period_start"]),
            "max_period_end": F.lit(date_bounds["max_period_end"]),
            "min_period_start": F.lit(date_bounds["min_period_start"]),
        }
    )

    return qtd_yy_final_df


def _get_daily_table_df(
    df: DataFrame,
    granularity: str,
    spark: SparkSession = None,
) -> DataFrame:
    spark = spark or get_spark_session()

    days_in_column = "days_in_quarter"
    join_expression = F.col("day") == F.col("period_start")
    if granularity == "MONTH":
        days_in_column = "months_in_quarter"
        join_expression = F.date_trunc("month", F.col("day")) == F.col("period_start")

    days_df = (spark.table("yd_1p_central.time.days")).select("day")

    daily_table_df = (
        days_df.join(
            df,
            join_expression,
            "left",
        )
        .where(
            E.between(
                F.col("day"),
                F.col("min_period_start"),
                F.col("max_quarter_end_period_end"),
            )
        )
        .select(
            "day",
            F.col("quarter").alias("yd_quarter"),
            F.col(days_in_column).alias(f"yd_{days_in_column}"),
            F.col("quarter_index").alias("yd_quarter_index"),
            F.when(
                F.col("quarter_index") == 0, F.col("current_quarter_year_label")
            ).alias("current_quarter_display"),
            F.when(
                F.col("quarter_index") == 1, F.col("previous_quarter_year_label")
            ).alias("previous_quarter_display"),
        )
    )

    return daily_table_df


def _get_qtd_progress_df(
    df: DataFrame,
    metric_max_relevant_years: int,
    granularity: str,
    metric_configs: dict,
) -> DataFrame:
    days_in_column = (
        "months_in_quarter" if granularity == "MONTH" else "days_in_quarter"
    )

    previous_year_columns = _get_previous_year_columns(
        metric_max_relevant_years, granularity
    )

    qtd_period_columns = [
        "qtd_period_val",
        "period_index",
        "qtd_1y_period_val",
        "qtd_2y_period_val",
        "qtd_yy_period",
        "qtd_yy_period_previous_year",
    ]

    final_columns = [
        "relative_years",
        "data_source_table_grain",
        F.current_timestamp().alias("publication_timestamp"),
    ]

    if granularity == "MONTH":
        final_columns.insert(0, "month_label_expression")
        final_columns.append("internal_metric_id")
    elif granularity == "WEEK":
        qtd_period_columns.append("previous_week_qtd_yy_period")
        final_columns.insert(2, "metric_trailing_sql_aggregate_operator")

    configs_df = df.withColumns(
        {
            "metric_name": F.lit(metric_configs["metric_name"]).cast("string"),
            "top_level_entity_ticker": F.lit(
                metric_configs["top_level_entity_ticker"]
            ).cast("string"),
            "top_level_entity_name": F.lit(
                metric_configs["top_level_entity_name"]
            ).cast("string"),
        }
    )

    qtd_progress_df = configs_df.select(
        "day",
        "yd_quarter",
        f"yd_{days_in_column}",
        "yd_quarter_index",
        "metric_name",
        "top_level_entity_ticker",
        "top_level_entity_name",
        F.concat(
            F.coalesce(F.col("top_level_entity_ticker"), F.lit("NONE")),
            F.coalesce(F.col("top_level_entity_name"), F.lit("NONE")),
            F.col("metric_name"),
        ).alias("metric_join_key"),
        "period_start",
        "period_end",
        "quarter_period_start",
        "quarter_period_end",
        days_in_column,
        "quarter",
        "year",
        "max_quarter_end_period_end",
        "max_period_start",
        "max_period_end",
        "min_period_start",
        "quarter_index",
        *previous_year_columns,
        *qtd_period_columns,
        "quarter_year_label",
        "current_qtd_yy_period",
        "value_divisor",
        "current_quarter_year_label",
        "previous_quarter_year_label",
        "current_quarter_display",
        "previous_quarter_display",
        "metric_aggregate_sql_operator",
        "metric_growth_rate_type",
        "metric_va_id",
        *final_columns,
    )

    return qtd_progress_df


@track_usage
[docs] def standard_metric_weekly_qtd_progress( df: DataFrame, calendar_df: DataFrame, spark: SparkSession = None, ) -> DataFrame: """ Transforms a metric's unified KPI table to generate a dataframe containing weekly analyses through the quarter and the comparable weekly analyses in prior years. :param df: A dataframe of a metric's unified KPI table :param calendar_df: Calendar dataframe :param spark: Optional SparkSession (will create one if not provided) :return: DataFrame of weekly QTD analyses for a metric Examples ^^^^^^^^ .. code-block:: python :caption: Generating weekly quarter-to-date progress analyses for a metric. from etl_toolkit import A input_df = spark.table("yd_production.cmg_reported.dash_daily_rev") calendar_df = spark.table("yd_fp_investor_audit.calendar_gold.standard_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chipotle", top_level_entity_ticker="CMG:XNYS", exchange=None, entity_name=None, figi=None, ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Total Revenue", company_comparable_kpi=True, uses_va_for_actuals=False, display_period_granularity="WEEK", report_period_granularity="QUARTER", currency="USD", value_divisor=1000, visible_alpha_id=6639756, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="revenue", source_input_date_column="date", source_table_granularity="DAY", aggregate_function="SUM", max_relevant_years=2, growth_rate_type="CAGR", calendar_type="EXACT_N_YEARS", source_table_filter_conditions=None, slice_columns=None, trailing_period_length=7, trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_weekly_qtd_progress(unified_kpi_df, calendar_df) display(df) +-----------+---+---------------------+-------------------+---+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+ |day |...|quarter_period_start |quarter_period_end |...|period_simple_aggregate |...|period_1y_growth_rate |period_1y_simple_aggregate |...|qtd_period_val |...|qtd_1y_period_val |...|qtd_yy_period_val |...| +-----------+---+---------------------+-------------------+---+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+ |2025-01-01 |...|2025-01-01 |2025-03-31 |...|22438072.921040 |...|0.108714 |23344562.935425 |...|22438072.921040 |...|23344562.935425 |...|-0.038831 |...| +-----------+---+---------------------+-------------------+---+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+ |2025-01-02 |...|2025-01-01 |2025-03-31 |...|31351900.461439 |...|0.080413 |29583525.428142 |...|53789973.382479 |...|52928088.363567 |...|0.016284 |...| +-----------+---+---------------------+-------------------+---+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+ |2025-01-03 |...|2025-01-01 |2025-03-31 |...|34759511.041158 |...|0.079731 |29983351.185830 |...|88549484.423637 |...|82911439.549397 |...|0.068001 |...| +-----------+---+---------------------+-------------------+---+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+ """ granularity = "WEEK" metric_configs = _get_metric_configs(df) metric_aggregation_type = metric_configs["metric_config_aggregation_type"] trailing_aggregation_type = metric_configs["trailing_period_aggregate_function"] metric_calendar_type = metric_configs["calendar_type"] metric_max_relevant_years = int(metric_configs["max_relevant_years"]) # Generate base df with analyses filtered to period aggregate and growth rates base_df = df.where( F.col("internal_dashboard_analysis_name").isin( [ "day_simple_aggregate", "day_simple_aggregate_trailing_day", "day_1y_growth_rate_trailing_day", "day_2y_growth_rate_trailing_day", "day_3y_growth_rate_trailing_day", "day_4y_growth_rate_trailing_day", ] ) ).where( F.when( F.col("trailing_period").isNull(), F.col("aggregation_type") == metric_aggregation_type, ).otherwise(F.col("aggregation_type") == trailing_aggregation_type) ) latest_growth_rate_col = _get_latest_growth_rates(df, granularity) qtd_calc_df = _get_qtd_calc_df(base_df, metric_aggregation_type) # Generate base calendar with full quarter dates for extrapolation base_calendar_df = calendar_df.select( F.col("day").alias("period_start"), F.col("day").alias("period_end"), "quarter_period_start", "quarter_period_end", F.substring(F.col("quarter_label"), 1, 2).alias("quarter"), F.right(F.col("year_label"), F.lit(4)).alias("year"), "days_in_quarter", ) date_bounds = _get_date_bounds(base_df, granularity) filtered_calendar_df = _get_filtered_calendar_df(base_calendar_df, date_bounds) prep_df = filtered_calendar_df.join( qtd_calc_df, ["period_start", "period_end"], "left" ).withColumn( "quarter_index", (F.dense_rank().over(W.orderBy(F.col("quarter_period_end").desc())) - 1), ) if metric_calendar_type == "52_WEEK": join_columns = ["join_year", "quarter", "days_in_quarter"] else: join_columns = ["join_date"] for year_previous in range(1, metric_max_relevant_years + 1): # Join previous years data for QTD calculations prep_df = prep_df.withColumns( { "join_year": F.col("year"), "join_date": ( F.col("period_start") - F.expr(f"INTERVAL {year_previous} YEAR") ), } ) join_df = prep_df.select( "quarter", "days_in_quarter", F.col("period_start").alias("join_date"), (F.col("year") + year_previous).alias("join_year"), F.col("period_simple_aggregate").alias( f"period_{year_previous}y_simple_aggregate" ), F.col("qtd_period_val").alias(f"qtd_{year_previous}y_period_val"), F.col("period_simple_aggregate_trailing_day").alias( f"period_{year_previous}y_simple_aggregate_trailing_day" ), ).select( *join_columns, f"period_{year_previous}y_simple_aggregate", f"qtd_{year_previous}y_period_val", f"period_{year_previous}y_simple_aggregate_trailing_day", ) prep_df = prep_df.join(join_df, join_columns, "left").drop( "join_date", "join_year" ) qtd_yy_final_df = _get_qtd_yy_final_df( prep_df, metric_configs, date_bounds ).withColumns(latest_growth_rate_col) spark = spark or get_spark_session() daily_table_df = _get_daily_table_df(qtd_yy_final_df, granularity, spark=spark) combined_df = daily_table_df.join( qtd_yy_final_df, F.col("day") == F.col("period_end"), "left" ) qtd_progress_df = _get_qtd_progress_df( combined_df, metric_max_relevant_years, granularity, metric_configs ) return qtd_progress_df
@track_usage
[docs] def standard_metric_monthly_qtd_progress( df: DataFrame, calendar_df: DataFrame, spark: SparkSession = None, ) -> DataFrame: """ Transforms a metric's unified KPI table to generate a dataframe containing monthly analyses through the quarter and the comparable monthly analyses in prior years. :param df: A dataframe of a metric's unified KPI table :param calendar_df: Calendar dataframe :param spark: Optional SparkSession (will create one if not provided) :return: DataFrame of monthly QTD analyses for a metric Examples ^^^^^^^^ .. code-block:: python :caption: Generating monthly quarter-to-date progress analyses for a metric. from etl_toolkit import A input_df = spark.table("yd_production.cn_shortvid_reported.active_users") calendar_df = spark.table("yd_fp_investor_audit.calendar_gold.standard_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="ByteDance", top_level_entity_ticker=None, exchange=None, entity_name="Douyin", figi=None, ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Monthly Active Users", company_comparable_kpi=False, uses_va_for_actuals=False, display_period_granularity="MONTH", report_period_granularity="QUARTER", value_divisor=1000000, visible_alpha_id=None, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="mau", source_input_date_column="month", source_table_granularity="MONTH", max_relevant_years=2, aggregate_function="AVG", growth_rate_type="CAGR", calendar_type="EXACT_N_YEARS", source_table_filter_conditions=[F.col("appname") == "Douyin Core"], slice_columns=["appname"], trailing_period_length=None, trailing_period_aggregate_function=None, ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_monthly_qtd_progress(unified_kpi_df, calendar_df) display(df) +---+-------------+-----------+-------------------+---------------------+---+------------------------+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+ |...|period_start |period_end |quarter_period_start |quarter_period_end |...|period_simple_aggregate |period_1y_growth_rate |period_1y_simple_aggregate |...|qtd_period_val |...|qtd_1y_period_val |...|qtd_yy_period_val |...| +---+-------------+-----------+-------------------+---------------------+---+------------------------+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+ |...|2024-10-01 |2024-10-31 |2024-10-01 |2024-12-31 |...|1036495571.000000 |0.175859 |881479766.500000 |...|1036495571.000000 |...|881479766.500000 |...|0.175859 |...| +---+-------------+-----------+-------------------+---------------------+---+------------------------+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+ |...|2024-11-01 |2024-11-30 |2024-10-01 |2024-12-31 |...|1055118217.000000 |0.179763 |894347880.300000 |...|1045806894.000000 |...|887913823.400000 |...|0.177825 |...| +---+-------------+-----------+-------------------+---------------------+---+------------------------+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+ |...|2024-12-01 |2024-12-31 |2024-10-01 |2024-12-31 |...|1072948696.000000 |0.177513 |911198916.900000 |...|1054854161.333333 |...|895675521.233333 |...|0.177719 |...| +---+-------------+-----------+-------------------+---------------------+------------------------+---+----------------------+---------------------------+---+-------------------+---+------------------+---+------------------+---+ """ granularity = "MONTH" metric_configs = _get_metric_configs(df) metric_aggregation_type = metric_configs["metric_config_aggregation_type"] metric_max_relevant_years = int(metric_configs["max_relevant_years"]) metric_relative_year_comparisons = list(range(1, metric_max_relevant_years + 1)) latest_growth_rate_col = _get_latest_growth_rates(df, granularity) # Generate base table with analysis filtered to period aggregate and growth rates base_df = df.where( F.col("internal_dashboard_analysis_name").isin( [ "month_simple_aggregate", "month_1y_growth_rate", "month_2y_growth_rate", "month_3y_growth_rate", "month_4y_growth_rate", ] ) ) qtd_calc_df = _get_qtd_calc_df(base_df, metric_aggregation_type) # Generate base calendar with full quarter dates for extrapolation base_calendar_df = calendar_df.select( "quarter_period_start", "quarter_period_end", F.substring(F.col("quarter_label"), 1, 2).alias("quarter"), F.right(F.col("year_label"), F.lit(4)).alias("year"), F.col("month_period_start").alias("period_start"), F.col("month_period_end").alias("period_end"), F.months_between(F.add_months(F.col("day"), 1), F.col("quarter_period_start")) .cast("bigint") .alias("months_in_quarter"), ) # Aggregate calendar to monthly periods aggregate_calendar_df = base_calendar_df.groupBy("period_start", "period_end").agg( F.first("quarter_period_start").alias("quarter_period_start"), F.first("quarter_period_end").alias("quarter_period_end"), F.first("months_in_quarter").alias("months_in_quarter"), F.first("quarter").alias("quarter"), F.first("year").alias("year"), ) date_bounds = _get_date_bounds(base_df, granularity) filtered_calendar_df = _get_filtered_calendar_df(aggregate_calendar_df, date_bounds) prep_df = filtered_calendar_df.join( qtd_calc_df, ["period_start", "period_end"], "left" ).withColumn( "quarter_index", (F.dense_rank().over(W.orderBy(F.col("quarter_period_end").desc())) - 1), ) for year_previous in range(1, metric_max_relevant_years + 1): # Join previous years data for QTD calculations prep_df = prep_df.withColumns( { "join_year": F.col("year"), "join_date": ( F.col("period_start") - F.expr(f"INTERVAL {year_previous} YEAR") ), } ) join_df = prep_df.select( "months_in_quarter", F.col("period_start").alias("join_date"), F.col("period_simple_aggregate").alias( f"period_{year_previous}y_simple_aggregate" ), F.col("qtd_period_val").alias(f"qtd_{year_previous}y_period_val"), ) prep_df = prep_df.join( join_df, ["months_in_quarter", "join_date"], "left" ).drop("join_date", "join_year") qtd_yy_final_df = _get_qtd_yy_final_df( prep_df, metric_configs, date_bounds ).withColumns(latest_growth_rate_col) spark = spark or get_spark_session() daily_table_df = _get_daily_table_df(qtd_yy_final_df, granularity, spark=spark) combined_df = daily_table_df.join( qtd_yy_final_df, F.col("day") == F.col("period_end"), "left" ) qtd_progress_df = _get_qtd_progress_df( combined_df, metric_max_relevant_years, granularity, metric_configs ) return qtd_progress_df