from pyspark.sql import DataFrame, functions as F, Window as W
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit.analyses.scalar import get_aggregates
from etl_toolkit.analyses.standard_metrics.helpers import (
_validate_unified_kpi_input_df,
_get_metric_configs,
NULL_DECIMAL_PLACEHOLDER,
)
@track_usage
[docs]
def standard_metric_net_adds(df: DataFrame, calendar_df: DataFrame) -> DataFrame:
"""
Transforms a metric's unified KPI table to generate a dataframe containing net add analyses.
:param df: A dataframe of a metric's unified KPI table
:return: DataFrame of net add analyses for a metric
Examples
^^^^^^^^
.. code-block:: python
:caption: Generating net add analyses for a metric.
from etl_toolkit import A
input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")
entity_configuration = A.entity_configuration(
top_level_entity_name="Toast",
top_level_entity_ticker="TOST:XNYS",
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Location Net Adds",
company_comparable_kpi=True,
uses_va_for_actuals=False,
display_period_granularity="DAY",
report_period_granularity="QUARTER",
currency=None,
value_divisor=1,
visible_alpha_id=14282041,
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="locations",
source_input_date_column="date",
source_table_granularity="DAY",
aggregate_function=None,
max_relevant_years=2,
growth_rate_type=None,
calendar_type=None,
source_table_filter_conditions=None,
slice_columns=None,
trailing_period_length=None,
trailing_period_aggregate_function=None,
)
unified_kpi_df = A.standard_metric_unified_kpi(
input_df,
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
df = A.standard_metric_net_adds(unified_kpi_df, calendar_df)
display(df)
+-------------+-----------+--------------+------------+---+--------------+------------------------+----------------------+----------------------------------------+---+
|period_start |period_end |quarter_start |quarter_end |...|value |daily_net_add_qtd_value |value_previous_1_week |daily_net_add_qtd_value_previous_1_week |...|
+-------------+-----------+--------------+------------+---+--------------+------------------------+----------------------+----------------------------------------+---+
|2024-10-01 |2024-10-01 |2024-10-01 |2024-12-31 |...|127066.757493 |66.757493 |126534.726587 |6534.726587 |...|
+-------------+-----------+--------------+------------+---+--------------+------------------------+----------------------+----------------------------------------+---+
|2024-10-02 |2024-10-02 |2024-10-01 |2024-12-31 |...|127121.858916 |121.858916 |126597.423004 |6597.423004 |...|
+-------------+-----------+--------------+------------+---+--------------+------------------------+----------------------+----------------------------------------+---+
|2024-10-03 |2024-10-03 |2024-10-01 |2024-12-31 |...|127182.258553 |182.258553 |126653.519799 |6653.519799 |...|
+-------------+-----------+--------------+------------+---+--------------+------------------------+----------------------+----------------------------------------+---+
"""
_validate_unified_kpi_input_df(df)
# Generate base calendar with full quarter dates for extrapolation
base_calendar_df = calendar_df.select(
F.col("day").alias("period_start"),
F.col("day").alias("period_end"),
"quarter_period_start",
"days_in_quarter",
(F.col("quarter_length_in_days") - F.col("days_in_quarter")).alias(
"days_remaining_in_quarter"
),
F.substring(F.col("quarter_label"), 1, 2).alias("quarter"),
F.year(F.col("quarter_period_start")).alias("year"),
)
# Generate base df with analysis filtered to daily aggregate
base_df = df.where(
F.col("internal_dashboard_analysis_name") == "day_simple_aggregate"
)
# Generate max dates for extrapolation
max_quarter_end_period_end = get_aggregates(
base_df,
value_column="quarter_end_period_start",
aggregate_functions=["max"],
)["max"]
min_period_start = get_aggregates(
base_df,
value_column="period_start",
aggregate_functions=["min"],
)["min"]
# Join calendar table to base table with max and min date restrictions
daily_sales_df = (
base_calendar_df.where(F.col("period_start") >= min_period_start)
.where(F.col("period_start") <= max_quarter_end_period_end)
.join(base_df.drop("period_end"), "period_start", "left")
)
# Generate daily net adds
daily_net_add_df = daily_sales_df.withColumn(
"yesterday_value", F.lag(F.col("value"), 1).over(W.orderBy("period_start"))
).withColumn("daily_net_add", F.col("value") - F.col("yesterday_value"))
qtd_value_window = (
W.partitionBy("quarter_start_period_start", "quarter_end_period_start")
.orderBy("period_start")
.rowsBetween(W.unboundedPreceding, W.currentRow)
)
# Generate cumulative adds df
# Select specific QTD daily progress columns
# Find QTD sales
qtd_value_df = daily_net_add_df.select(
"period_start",
"period_end",
"metric_name",
"analysis_name",
"top_level_entity_ticker",
"top_level_entity_name",
"quarter",
"year",
"days_in_quarter",
"days_remaining_in_quarter",
F.coalesce(
F.col("quarter_start_period_start"), F.col("quarter_period_start")
).alias("quarter_start_period_start"),
"quarter_end_period_start",
"quarter_label_period_end",
"period_index",
"value",
"daily_net_add",
F.sum(F.col("daily_net_add")) # Generate QTD value from daily periodicity
.over(qtd_value_window)
.alias("daily_net_add_qtd_value"),
"value_divisor",
"aggregation_type",
"internal_metric_id",
)
metric_configs = _get_metric_configs(df)
metric_calendar_type = metric_configs["calendar_type"]
metric_max_relevant_years = int(metric_configs["max_relevant_years"])
if metric_calendar_type == "52_WEEK":
join_col = ["join_year", "quarter", "days_in_quarter"]
else:
join_col = ["join_date"]
for year_previous in range(1, metric_max_relevant_years + 1):
# Iterate through relevant years and joins previous years QTD values for corresponding day in a quarter
# Handles leap year by determining if PTD is calculated uses date/date or period day/period day
qtd_value_df = qtd_value_df.withColumns(
{
"join_year": (F.col("year") - year_previous),
"join_date": F.col("period_start")
- F.expr(f"INTERVAL {year_previous} YEAR"),
}
)
join_df = qtd_value_df.select(
F.col("period_start").alias("join_date"),
F.col("year").alias("join_year"),
"quarter",
"days_in_quarter",
F.col("daily_net_add_qtd_value").alias(
f"daily_net_add_qtd_value_previous_{year_previous}_year"
),
).select(*join_col, f"daily_net_add_qtd_value_previous_{year_previous}_year")
qtd_value_df = qtd_value_df.join(
join_df,
join_col,
"left",
).drop("join_date", "join_year")
growth_df = qtd_value_df.withColumn(
"quarter_period_index", # Used for final filtering
(F.dense_rank().over(W.orderBy(F.col("quarter_start_period_start").desc())))
- 1,
)
previous_week_df = growth_df.withColumns(
{
# Derived T7D value for comparison to current day
"value_previous_1_week": F.lag(F.col("value"), 7).over(
W.orderBy("period_start")
),
"daily_net_add_qtd_value_previous_1_week": F.lag(
F.col("daily_net_add_qtd_value"), 7
).over(W.orderBy("period_start")),
}
)
# Isolate current fiscal year, used in extrapolation tool
static_values = (
previous_week_df.where(F.col("period_index") == 0)
.select(
F.col("year").alias("fiscal_year"),
F.col("period_start").alias("current_period_start"),
"value_divisor",
F.col("days_in_quarter").alias("current_days_in_quarter"),
)
.first()
.asDict()
)
# Add empty columns for missing years 1-4 to maintain consistent schema
for year in range(1, 5):
if (
f"daily_net_add_qtd_value_previous_{year}_year"
not in previous_week_df.columns
):
previous_week_df = previous_week_df.withColumn(
f"daily_net_add_qtd_value_previous_{year}_year",
NULL_DECIMAL_PLACEHOLDER,
)
metric_relative_year_comparisons = list(range(1, metric_max_relevant_years + 1))
net_adds_df = previous_week_df.where(F.col("quarter_period_index") <= 1).select(
"period_start",
"period_end",
F.col("quarter_start_period_start").alias("quarter_start"),
F.col("quarter_end_period_start").alias("quarter_end"),
"quarter_period_index",
F.col("quarter_label_period_end").alias("quarter_year_label"),
F.col("quarter").alias("quarter_label"),
F.col("year").alias("fiscal_year"),
"days_in_quarter",
"days_remaining_in_quarter",
"analysis_name",
F.lit("Cumulative Net Adds").alias("dashboard_analysis_name"),
"value",
"daily_net_add_qtd_value",
"value_previous_1_week",
"daily_net_add_qtd_value_previous_1_week",
"daily_net_add_qtd_value_previous_1_year",
"daily_net_add_qtd_value_previous_2_year",
"daily_net_add_qtd_value_previous_3_year",
"daily_net_add_qtd_value_previous_4_year",
"aggregation_type",
F.lit(metric_configs["metric_name"]).cast("string").alias("metric_name"),
F.lit(metric_configs["top_level_entity_ticker"])
.cast("string")
.alias("top_level_entity_ticker"),
F.lit(metric_configs["top_level_entity_name"])
.cast("string")
.alias("top_level_entity_name"),
F.lit(metric_configs["visible_alpha_id"]).cast("int").alias("metric_va_id"),
F.lit(metric_relative_year_comparisons).alias(
"metric_relative_year_comparisons"
),
F.lit(static_values["fiscal_year"]).alias("current_fiscal_year"),
F.lit(static_values["current_period_start"]).alias("current_date"),
F.lit(static_values["value_divisor"]).alias("value_divisor"),
F.datediff(F.col("period_start"), F.lit(static_values["current_period_start"]))
.cast("bigint")
.alias("extrapolated_day_index"),
F.col("period_index").alias("sequential_index"),
F.current_timestamp().alias("publication_timestamp"),
F.when(
(
(F.col("days_in_quarter") <= 5)
& (F.lit(static_values["current_days_in_quarter"]) >= 6)
)
| ((F.col("days_in_quarter") <= 5) & (F.col("quarter_period_index") > 0)),
F.lit(0),
)
.otherwise(F.lit(1))
.alias("hide_first_week"),
"internal_metric_id",
)
return net_adds_df