Source code for etl_toolkit.analyses.standard_metrics.daily_growth

from pyspark.sql import DataFrame, functions as F

from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit.analyses.standard_metrics.helpers import (
    _validate_unified_kpi_input_df,
    NULL_DECIMAL_PLACEHOLDER,
)


@track_usage
[docs] def standard_metric_daily_growth(df: DataFrame) -> DataFrame: """ Transforms a metric's unified KPI table to generate a dataframe containing daily growth analyses. :param df: A dataframe of a metric's unified KPI table :return: DataFrame of daily growth analyses for a metric Examples ^^^^^^^^ .. code-block:: python :caption: Generating daily growth analyses for a metric. from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_daily_growth(unified_kpi_df) display(df) +------------------------+----------------------+-------------+-----------+-----------------------+------------------------+--------------------------------+--------------------------------+---+ |top_level_entity_ticker |top_level_entity_name |period_start |period_end |metric_name |dashboard_analysis_name |day_1y_growth_rate_trailing_day |day_2y_growth_rate_trailing_day |...| +------------------------+----------------------+-------------+-----------+-----------------------+------------------------+--------------------------------+--------------------------------+---+ |CHWY:XNYS |Chewy |2025-02-01 |2025-02-01 |Net Sales - Order Date |Daily Growth |0.012224 |0.017250 |...| +------------------------+----------------------+-------------+-----------+-----------------------+------------------------+--------------------------------+--------------------------------+---+ |CHWY:XNYS |Chewy |2025-02-02 |2025-02-02 |Net Sales - Order Date |Daily Growth |0.014802 |0.016585 |...| +------------------------+----------------------+-------------+-----------+-----------------------+------------------------+--------------------------------+--------------------------------+---+ |CHWY:XNYS |Chewy |2025-02-03 |2025-02-03 |Net Sales - Order Date |Daily Growth |0.040221 |0.024634 |...| +------------------------+----------------------+-------------+-----------+-----------------------+------------------------+--------------------------------+--------------------------------+---+ """ _validate_unified_kpi_input_df(df) # Generate base df with analysis filtered to daily aggregated sum base_df = ( df.where( F.col("internal_dashboard_analysis_name").like( "day_%y_growth_rate_trailing_day" ) ) .groupBy( "period_start", "period_end", "top_level_entity_ticker", "top_level_entity_name", "metric_name", "currency", "value_divisor", "internal_metric_id", ) .pivot("internal_dashboard_analysis_name") .sum("value") ) # Add empty columns for any missing years 1-4 to maintain consistent schema for year in range(1, 5): if f"day_{year}y_growth_rate_trailing_day" not in base_df.columns: base_df = base_df.withColumn( f"day_{year}y_growth_rate_trailing_day", NULL_DECIMAL_PLACEHOLDER ) pivot_df = base_df.groupBy( "period_start", "period_end", "top_level_entity_ticker", "top_level_entity_name", "metric_name", "currency", "value_divisor", "internal_metric_id", ).agg( F.min("day_1y_growth_rate_trailing_day").alias( "day_1y_growth_rate_trailing_day" ), F.min("day_2y_growth_rate_trailing_day").alias( "day_2y_growth_rate_trailing_day" ), F.min("day_3y_growth_rate_trailing_day").alias( "day_3y_growth_rate_trailing_day" ), F.min("day_4y_growth_rate_trailing_day").alias( "day_4y_growth_rate_trailing_day" ), ) daily_growth_df = pivot_df.select( "top_level_entity_ticker", "top_level_entity_name", "period_start", "period_end", "metric_name", F.lit("Daily Growth").alias("dashboard_analysis_name"), "day_1y_growth_rate_trailing_day", "day_2y_growth_rate_trailing_day", "day_3y_growth_rate_trailing_day", "day_4y_growth_rate_trailing_day", "value_divisor", "currency", F.current_timestamp().alias("publication_timestamp"), "internal_metric_id", ) return daily_growth_df