Source code for etl_toolkit.analyses.standard_metrics.data_download

from pyspark.sql import functions as F, DataFrame

from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit import E
from etl_toolkit.analyses.standard_metrics.config import entity_configuration
from etl_toolkit.analyses.standard_metrics.helpers import (
    SLICE_NAME_VALUE_COLUMNS,
    _validate_unified_kpi_input_df,
    _get_data_through,
    _get_relevant_periodicities,
)

# Filter to determine the correct aggregation type to display for each analysis
# This prioritizes:
# 1. Trailing period aggregation function for trailing analyses
# 2. SUM for daily data (to align with most common display preference)
# 3. Otherwise, use the original aggregation_type from the analysis
aggregation_type_filter = E.chain_cases(
    [
        E.case(
            F.col("trailing_period").isNotNull(),
            F.col("trailing_period_aggregate_function"),
        ),
        E.case(F.col("periodicity") == "DAY", F.lit("SUM")),
    ],
    otherwise=F.col("aggregation_type"),
)

# Filter to only include full periods (not partial period-to-date data)
full_period_filter = E.chain_cases(
    [
        E.case(
            F.col("periodicity") != F.col("source_table_granularity"),
            E.any(F.col("period_index") > 0, F.col("date") == F.col("max_date")),
        )
    ],
    otherwise=(F.col("periodicity") == F.col("source_table_granularity")),
)

# Filter out daily data for metrics with a weekly display granularity
weekly_display_filter = E.all(
    F.col("display_period_granularity") == "WEEK",
    F.col("periodicity") == "DAY",
    F.col("trailing_period").isNull(),
)

# Combined filter for data download to include only the most relevant analyses
# This excludes period-to-date, uses the preferred aggregation type, and respects display granularity
data_download_filter = E.all(
    ~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
    F.col("aggregation_type") == F.col("aggregation_type_filter"),
    ~weekly_display_filter,
)


@track_usage
[docs] def standard_metric_data_download( df: DataFrame, entity_configuration: entity_configuration ) -> DataFrame: """ Transforms the unified KPI table to generate a dataframe containing analyses for the data download. :param df: A dataframe of a metric's unified KPI table :param entity_configuration: A.entity_configuration configurations :return: DataFrame of data download analyses for a metric Examples ^^^^^^^^ .. code-block:: python :caption: Generating data download analyses for a metric. from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_data_download(unified_kpi_df, entity_configuration) display(df) +----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+ |date |top_level_entity_ticker |top_level_entity_name |internal_dashboard_analysis_name |analysis_label_name |metric_name |slice_name_1 |slice_value_1 |...|value |...| +----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+ |2025-02-02|CHWY:XNYS |Chewy |day_1y_growth_rate_trailing_day |T7D SUM 1Y Growth |Net Sales - Order Date |null |null |...|0.019454 |...| +----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+ |2025-02-02|CHWY:XNYS |Chewy |day_2y_growth_rate_trailing_day |T7D SUM 2Y Growth |Net Sales - Order Date |null |null |...|0.019231 |...| +----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+ |2025-02-02|CHWY:XNYS |Chewy |day_3y_growth_rate_trailing_day |T7D SUM 3Y Growth |Net Sales - Order Date |null |null |...|0.110622 |...| +----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+ |2025-02-02|CHWY:XNYS |Chewy |day_4y_growth_rate_trailing_day |T7D SUM 4Y Growth |Net Sales - Order Date |null |null |...|0.104899 |...| +----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+ |2025-02-02|CHWY:XNYS |Chewy |day_simple_aggregate |Nominal Value. |Net Sales - Order Date |null |null |...|32434644.148619 |...| +----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+ |2025-02-02|CHWY:XNYS |Chewy |day_simple_aggregate_trailing_day |T7D SUM |Net Sales - Order Date |null |null |...|237115829.752783 |...| +----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+ """ _validate_unified_kpi_input_df(df) relevant_analyses_df = df.withColumns( { "source_table_granularity": F.get_json_object( F.col("metric_options"), "$.source_table_granularity" ), "trailing_period_aggregate_function": F.get_json_object( F.col("metric_options"), "$.trailing_period.aggregate_sql_operator" ), "qtd_cumulative_type": F.get_json_object( F.col("metric_options"), "$.aggregate_function" ), "aggregation_type_filter": aggregation_type_filter, "trailing_analysis_label_name": E.chain_cases( [ E.case( F.col("trailing_period").isNotNull(), F.concat( F.lit("T"), F.col("trailing_period"), F.lit("D "), F.col("aggregation_type"), ), ) ] ), "growth_rate_analysis_label_name": E.chain_cases( [ E.case( F.col("calculation_type") == "GROWTH_RATE", F.concat( F.upper( F.split(F.col("internal_dashboard_analysis_name"), "_")[ 1 ] ), F.lit(" Growth"), ), ) ] ), "temp_analysis_label_name": F.concat_ws( " ", F.col("trailing_analysis_label_name"), F.col("growth_rate_analysis_label_name"), ), "analysis_label_name": E.chain_cases( [ E.case( F.length(F.col("temp_analysis_label_name")) > 0, F.col("temp_analysis_label_name"), ) ], otherwise=F.lit("Nominal Value"), ), "date": E.chain_cases( [E.case(F.col("periodicity") == "DAY", F.col("period_start"))], otherwise=F.col("period_end"), ), } ).where(data_download_filter) data_through_date = _get_data_through(relevant_analyses_df)["data_through"] relevant_periodicities = _get_relevant_periodicities(relevant_analyses_df) data_download_df = ( relevant_analyses_df.withColumn("max_date", F.lit(data_through_date)) .where(full_period_filter) .where(F.col("periodicity").isin(relevant_periodicities)) .select( "date", "top_level_entity_ticker", "top_level_entity_name", "internal_dashboard_analysis_name", "analysis_label_name", "metric_name", *SLICE_NAME_VALUE_COLUMNS, "value", F.current_timestamp().alias("publication_timestamp"), "periodicity", "internal_metric_id", "qtd_cumulative_type", "currency", "derived_metric", "numerator", "denominator", "operator", ) ) return data_download_df