Source code for etl_toolkit.analyses.standard_metrics.net_adds

from pyspark.sql import DataFrame, functions as F, Window as W

from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit.analyses.scalar import get_aggregates
from etl_toolkit.analyses.standard_metrics.helpers import (
    _validate_unified_kpi_input_df,
    _get_metric_configs,
    NULL_DECIMAL_PLACEHOLDER,
)


@track_usage
[docs] def standard_metric_net_adds(df: DataFrame, calendar_df: DataFrame) -> DataFrame: """ Transforms a metric's unified KPI table to generate a dataframe containing net add analyses. :param df: A dataframe of a metric's unified KPI table :return: DataFrame of net add analyses for a metric Examples ^^^^^^^^ .. code-block:: python :caption: Generating net add analyses for a metric. from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Toast", top_level_entity_ticker="TOST:XNYS", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Location Net Adds", company_comparable_kpi=True, uses_va_for_actuals=False, display_period_granularity="DAY", report_period_granularity="QUARTER", currency=None, value_divisor=1, visible_alpha_id=14282041, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="locations", source_input_date_column="date", source_table_granularity="DAY", aggregate_function=None, max_relevant_years=2, growth_rate_type=None, calendar_type=None, source_table_filter_conditions=None, slice_columns=None, trailing_period_length=None, trailing_period_aggregate_function=None, ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_net_adds(unified_kpi_df, calendar_df) display(df) +-------------+-----------+--------------+------------+---+--------------+------------------------+----------------------+----------------------------------------+---+ |period_start |period_end |quarter_start |quarter_end |...|value |daily_net_add_qtd_value |value_previous_1_week |daily_net_add_qtd_value_previous_1_week |...| +-------------+-----------+--------------+------------+---+--------------+------------------------+----------------------+----------------------------------------+---+ |2024-10-01 |2024-10-01 |2024-10-01 |2024-12-31 |...|127066.757493 |66.757493 |126534.726587 |6534.726587 |...| +-------------+-----------+--------------+------------+---+--------------+------------------------+----------------------+----------------------------------------+---+ |2024-10-02 |2024-10-02 |2024-10-01 |2024-12-31 |...|127121.858916 |121.858916 |126597.423004 |6597.423004 |...| +-------------+-----------+--------------+------------+---+--------------+------------------------+----------------------+----------------------------------------+---+ |2024-10-03 |2024-10-03 |2024-10-01 |2024-12-31 |...|127182.258553 |182.258553 |126653.519799 |6653.519799 |...| +-------------+-----------+--------------+------------+---+--------------+------------------------+----------------------+----------------------------------------+---+ """ _validate_unified_kpi_input_df(df) # Generate base calendar with full quarter dates for extrapolation base_calendar_df = calendar_df.select( F.col("day").alias("period_start"), F.col("day").alias("period_end"), "quarter_period_start", "days_in_quarter", (F.col("quarter_length_in_days") - F.col("days_in_quarter")).alias( "days_remaining_in_quarter" ), F.substring(F.col("quarter_label"), 1, 2).alias("quarter"), F.year(F.col("quarter_period_start")).alias("year"), ) # Generate base df with analysis filtered to daily aggregate base_df = df.where( F.col("internal_dashboard_analysis_name") == "day_simple_aggregate" ) # Generate max dates for extrapolation max_quarter_end_period_end = get_aggregates( base_df, value_column="quarter_end_period_start", aggregate_functions=["max"], )["max"] min_period_start = get_aggregates( base_df, value_column="period_start", aggregate_functions=["min"], )["min"] # Join calendar table to base table with max and min date restrictions daily_sales_df = ( base_calendar_df.where(F.col("period_start") >= min_period_start) .where(F.col("period_start") <= max_quarter_end_period_end) .join(base_df.drop("period_end"), "period_start", "left") ) # Generate daily net adds daily_net_add_df = daily_sales_df.withColumn( "yesterday_value", F.lag(F.col("value"), 1).over(W.orderBy("period_start")) ).withColumn("daily_net_add", F.col("value") - F.col("yesterday_value")) qtd_value_window = ( W.partitionBy("quarter_start_period_start", "quarter_end_period_start") .orderBy("period_start") .rowsBetween(W.unboundedPreceding, W.currentRow) ) # Generate cumulative adds df # Select specific QTD daily progress columns # Find QTD sales qtd_value_df = daily_net_add_df.select( "period_start", "period_end", "metric_name", "analysis_name", "top_level_entity_ticker", "top_level_entity_name", "quarter", "year", "days_in_quarter", "days_remaining_in_quarter", F.coalesce( F.col("quarter_start_period_start"), F.col("quarter_period_start") ).alias("quarter_start_period_start"), "quarter_end_period_start", "quarter_label_period_end", "period_index", "value", "daily_net_add", F.sum(F.col("daily_net_add")) # Generate QTD value from daily periodicity .over(qtd_value_window) .alias("daily_net_add_qtd_value"), "value_divisor", "aggregation_type", "internal_metric_id", ) metric_configs = _get_metric_configs(df) metric_calendar_type = metric_configs["calendar_type"] metric_max_relevant_years = int(metric_configs["max_relevant_years"]) if metric_calendar_type == "52_WEEK": join_col = ["join_year", "quarter", "days_in_quarter"] else: join_col = ["join_date"] for year_previous in range(1, metric_max_relevant_years + 1): # Iterate through relevant years and joins previous years QTD values for corresponding day in a quarter # Handles leap year by determining if PTD is calculated uses date/date or period day/period day qtd_value_df = qtd_value_df.withColumns( { "join_year": (F.col("year") - year_previous), "join_date": F.col("period_start") - F.expr(f"INTERVAL {year_previous} YEAR"), } ) join_df = qtd_value_df.select( F.col("period_start").alias("join_date"), F.col("year").alias("join_year"), "quarter", "days_in_quarter", F.col("daily_net_add_qtd_value").alias( f"daily_net_add_qtd_value_previous_{year_previous}_year" ), ).select(*join_col, f"daily_net_add_qtd_value_previous_{year_previous}_year") qtd_value_df = qtd_value_df.join( join_df, join_col, "left", ).drop("join_date", "join_year") growth_df = qtd_value_df.withColumn( "quarter_period_index", # Used for final filtering (F.dense_rank().over(W.orderBy(F.col("quarter_start_period_start").desc()))) - 1, ) previous_week_df = growth_df.withColumns( { # Derived T7D value for comparison to current day "value_previous_1_week": F.lag(F.col("value"), 7).over( W.orderBy("period_start") ), "daily_net_add_qtd_value_previous_1_week": F.lag( F.col("daily_net_add_qtd_value"), 7 ).over(W.orderBy("period_start")), } ) # Isolate current fiscal year, used in extrapolation tool static_values = ( previous_week_df.where(F.col("period_index") == 0) .select( F.col("year").alias("fiscal_year"), F.col("period_start").alias("current_period_start"), "value_divisor", F.col("days_in_quarter").alias("current_days_in_quarter"), ) .first() .asDict() ) # Add empty columns for missing years 1-4 to maintain consistent schema for year in range(1, 5): if ( f"daily_net_add_qtd_value_previous_{year}_year" not in previous_week_df.columns ): previous_week_df = previous_week_df.withColumn( f"daily_net_add_qtd_value_previous_{year}_year", NULL_DECIMAL_PLACEHOLDER, ) metric_relative_year_comparisons = list(range(1, metric_max_relevant_years + 1)) net_adds_df = previous_week_df.where(F.col("quarter_period_index") <= 1).select( "period_start", "period_end", F.col("quarter_start_period_start").alias("quarter_start"), F.col("quarter_end_period_start").alias("quarter_end"), "quarter_period_index", F.col("quarter_label_period_end").alias("quarter_year_label"), F.col("quarter").alias("quarter_label"), F.col("year").alias("fiscal_year"), "days_in_quarter", "days_remaining_in_quarter", "analysis_name", F.lit("Cumulative Net Adds").alias("dashboard_analysis_name"), "value", "daily_net_add_qtd_value", "value_previous_1_week", "daily_net_add_qtd_value_previous_1_week", "daily_net_add_qtd_value_previous_1_year", "daily_net_add_qtd_value_previous_2_year", "daily_net_add_qtd_value_previous_3_year", "daily_net_add_qtd_value_previous_4_year", "aggregation_type", F.lit(metric_configs["metric_name"]).cast("string").alias("metric_name"), F.lit(metric_configs["top_level_entity_ticker"]) .cast("string") .alias("top_level_entity_ticker"), F.lit(metric_configs["top_level_entity_name"]) .cast("string") .alias("top_level_entity_name"), F.lit(metric_configs["visible_alpha_id"]).cast("int").alias("metric_va_id"), F.lit(metric_relative_year_comparisons).alias( "metric_relative_year_comparisons" ), F.lit(static_values["fiscal_year"]).alias("current_fiscal_year"), F.lit(static_values["current_period_start"]).alias("current_date"), F.lit(static_values["value_divisor"]).alias("value_divisor"), F.datediff(F.col("period_start"), F.lit(static_values["current_period_start"])) .cast("bigint") .alias("extrapolated_day_index"), F.col("period_index").alias("sequential_index"), F.current_timestamp().alias("publication_timestamp"), F.when( ( (F.col("days_in_quarter") <= 5) & (F.lit(static_values["current_days_in_quarter"]) >= 6) ) | ((F.col("days_in_quarter") <= 5) & (F.col("quarter_period_index") > 0)), F.lit(0), ) .otherwise(F.lit(1)) .alias("hide_first_week"), "internal_metric_id", ) return net_adds_df