from pyspark.sql import functions as F, DataFrame
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit import E
from etl_toolkit.analyses.standard_metrics.config import entity_configuration
from etl_toolkit.analyses.standard_metrics.helpers import (
SLICE_NAME_VALUE_COLUMNS,
_validate_unified_kpi_input_df,
_get_data_through,
_get_relevant_periodicities,
)
# Filter to determine the correct aggregation type to display for each analysis
# This prioritizes:
# 1. Trailing period aggregation function for trailing analyses
# 2. SUM for daily data (to align with most common display preference)
# 3. Otherwise, use the original aggregation_type from the analysis
aggregation_type_filter = E.chain_cases(
[
E.case(
F.col("trailing_period").isNotNull(),
F.col("trailing_period_aggregate_function"),
),
E.case(F.col("periodicity") == "DAY", F.lit("SUM")),
],
otherwise=F.col("aggregation_type"),
)
# Filter to only include full periods (not partial period-to-date data)
full_period_filter = E.chain_cases(
[
E.case(
F.col("periodicity") != F.col("source_table_granularity"),
E.any(F.col("period_index") > 0, F.col("date") == F.col("max_date")),
)
],
otherwise=(F.col("periodicity") == F.col("source_table_granularity")),
)
# Filter out daily data for metrics with a weekly display granularity
weekly_display_filter = E.all(
F.col("display_period_granularity") == "WEEK",
F.col("periodicity") == "DAY",
F.col("trailing_period").isNull(),
)
# Combined filter for data download to include only the most relevant analyses
# This excludes period-to-date, uses the preferred aggregation type, and respects display granularity
data_download_filter = E.all(
~F.col("internal_dashboard_analysis_name").like("%period_to_date%"),
F.col("aggregation_type") == F.col("aggregation_type_filter"),
~weekly_display_filter,
)
@track_usage
[docs]
def standard_metric_data_download(
df: DataFrame, entity_configuration: entity_configuration
) -> DataFrame:
"""
Transforms the unified KPI table to generate a dataframe containing analyses for the data download.
:param df: A dataframe of a metric's unified KPI table
:param entity_configuration: A.entity_configuration configurations
:return: DataFrame of data download analyses for a metric
Examples
^^^^^^^^
.. code-block:: python
:caption: Generating data download analyses for a metric.
from etl_toolkit import A
input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")
entity_configuration = A.entity_configuration(
top_level_entity_name="Chewy",
top_level_entity_ticker="CHWY:XNYS",
figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Net Sales - Order Date",
company_comparable_kpi=False,
currency="USD",
value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="net_sales_order_date",
source_input_date_column="date",
max_relevant_years=4,
calendar_type="52_WEEK",
trailing_period_aggregate_function="SUM",
)
unified_kpi_df = A.standard_metric_unified_kpi(
input_df,
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
df = A.standard_metric_data_download(unified_kpi_df, entity_configuration)
display(df)
+----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+
|date |top_level_entity_ticker |top_level_entity_name |internal_dashboard_analysis_name |analysis_label_name |metric_name |slice_name_1 |slice_value_1 |...|value |...|
+----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+
|2025-02-02|CHWY:XNYS |Chewy |day_1y_growth_rate_trailing_day |T7D SUM 1Y Growth |Net Sales - Order Date |null |null |...|0.019454 |...|
+----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+
|2025-02-02|CHWY:XNYS |Chewy |day_2y_growth_rate_trailing_day |T7D SUM 2Y Growth |Net Sales - Order Date |null |null |...|0.019231 |...|
+----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+
|2025-02-02|CHWY:XNYS |Chewy |day_3y_growth_rate_trailing_day |T7D SUM 3Y Growth |Net Sales - Order Date |null |null |...|0.110622 |...|
+----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+
|2025-02-02|CHWY:XNYS |Chewy |day_4y_growth_rate_trailing_day |T7D SUM 4Y Growth |Net Sales - Order Date |null |null |...|0.104899 |...|
+----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+
|2025-02-02|CHWY:XNYS |Chewy |day_simple_aggregate |Nominal Value. |Net Sales - Order Date |null |null |...|32434644.148619 |...|
+----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+
|2025-02-02|CHWY:XNYS |Chewy |day_simple_aggregate_trailing_day |T7D SUM |Net Sales - Order Date |null |null |...|237115829.752783 |...|
+----------+------------------------+----------------------+----------------------------------+--------------------+-----------------------+-------------+--------------+---+-----------------+---+
"""
_validate_unified_kpi_input_df(df)
relevant_analyses_df = df.withColumns(
{
"source_table_granularity": F.get_json_object(
F.col("metric_options"), "$.source_table_granularity"
),
"trailing_period_aggregate_function": F.get_json_object(
F.col("metric_options"), "$.trailing_period.aggregate_sql_operator"
),
"qtd_cumulative_type": F.get_json_object(
F.col("metric_options"), "$.aggregate_function"
),
"aggregation_type_filter": aggregation_type_filter,
"trailing_analysis_label_name": E.chain_cases(
[
E.case(
F.col("trailing_period").isNotNull(),
F.concat(
F.lit("T"),
F.col("trailing_period"),
F.lit("D "),
F.col("aggregation_type"),
),
)
]
),
"growth_rate_analysis_label_name": E.chain_cases(
[
E.case(
F.col("calculation_type") == "GROWTH_RATE",
F.concat(
F.upper(
F.split(F.col("internal_dashboard_analysis_name"), "_")[
1
]
),
F.lit(" Growth"),
),
)
]
),
"temp_analysis_label_name": F.concat_ws(
" ",
F.col("trailing_analysis_label_name"),
F.col("growth_rate_analysis_label_name"),
),
"analysis_label_name": E.chain_cases(
[
E.case(
F.length(F.col("temp_analysis_label_name")) > 0,
F.col("temp_analysis_label_name"),
)
],
otherwise=F.lit("Nominal Value"),
),
"date": E.chain_cases(
[E.case(F.col("periodicity") == "DAY", F.col("period_start"))],
otherwise=F.col("period_end"),
),
}
).where(data_download_filter)
data_through_date = _get_data_through(relevant_analyses_df)["data_through"]
relevant_periodicities = _get_relevant_periodicities(relevant_analyses_df)
data_download_df = (
relevant_analyses_df.withColumn("max_date", F.lit(data_through_date))
.where(full_period_filter)
.where(F.col("periodicity").isin(relevant_periodicities))
.select(
"date",
"top_level_entity_ticker",
"top_level_entity_name",
"internal_dashboard_analysis_name",
"analysis_label_name",
"metric_name",
*SLICE_NAME_VALUE_COLUMNS,
"value",
F.current_timestamp().alias("publication_timestamp"),
"periodicity",
"internal_metric_id",
"qtd_cumulative_type",
"currency",
"derived_metric",
"numerator",
"denominator",
"operator",
)
)
return data_download_df