from typing import Optional, Literal
from dataclasses import dataclass
from pyspark.sql import Column, DataFrame, functions as F, Window as W
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit import expressions as E
from etl_toolkit.analyses.dedupe import dedupe_by_row_number
from etl_toolkit.analyses.investor.consensus import add_unified_consensus_column
@dataclass
class BackTestConfiguration:
"""
Use this function to define backtest configurations that are used in the investor reporting process. A list of ``backtest_configurations`` can then be added to earnings results data using ``A.earnings_results_with_backtests``.
.. note:: This function should not be used on its own. It's not a standard analysis function that returns a dataframe.
Instead, it defines a configuration that can be supplied to ``A.earnings_results_with_backtests``.
:param df: A backtest dataframe that contains the results of the back test for each prior publishing quarter. This can be obtained
by filtering a quarterly_accuracy for ``metric_type="backtest"`` and ``status="active"``.
:param primary_dataset: The primary dataset used for this KPI backtest. For ex: ``skywalker`` or ``yoda``.
:param panel_used: The corresponding Panel ID for the datasets used in this KPI backtest. For ex: `202101_100`.
:param methodology_notes: Any relevant information about the methodology for this backtest, examples include adjustments, coverage changes, weights, etc.
:param panel_used: The corresponding Panel ID for the datasets used in this KPI backtest. For ex: ``202101_100``.
:param secondary_dataset: An optional, secondary dataset used for this KPI backtest. For ex: ``skywalker`` or ``yoda``. Can leave ``None`` if not used. Default is ``None``.
:param metric_type: The type of KPI Backtest, can be ``backtest`` or ``qa``. Default is ``backtest``.
Examples
^^^^^^^^^^^^^
.. code-block:: python
:caption: Create a list of backtest_configurations for use in A.earnings_results_with_backtests.
from etl_toolkit import A, E, F
sample_backtest_df = (
spark.table("yd_production.tgt_analysts.tgt_quarterly_accuracy")
.where(F.col("metric_type") == 'backtest')
.where(F.col("status") == 'active')
)
backtests = [
A.backtest_configuration(
df=sample_backtest_df,
primary_dataset="skywalker",
secondary_dataset=None,
panel_used="202101_100",
methodology_notes="Uses geo weights and card type weights, yoy coverage change",
metric_type="backtest",
),
]
"""
df: DataFrame
primary_dataset: str
panel_used: str
methodology_notes: str
secondary_dataset: Optional[str] = None
metric_type: Literal["backtest", "qa"] = "backtest"
@property
def as_columns(self) -> list[Column]:
"""
Returns an ordered list of columns that are necessary for the final backtest results df
"""
return [
F.lit(self.primary_dataset).cast("string").alias("primary_dataset"),
F.lit(self.secondary_dataset).cast("string").alias("secondary_dataset"),
F.lit(self.panel_used).cast("string").alias("panel_used"),
F.lit(self.methodology_notes).cast("string").alias("methodology_notes"),
F.lit(self.metric_type).cast("string").alias("metric_type"),
]
[docs]
backtest_configuration = BackTestConfiguration
def _placeholder_column(column_name: str, column_type: str) -> Column:
return F.lit(None).cast(column_type).alias(column_name)
# Static metadata from the accuracy table that is enriched on backtest data
BASE_EARNINGS_COLUMNS = [
F.col("product_name"),
F.col("ticker"),
F.col("kpi_name"),
F.col("is_company_comparable"),
F.col("is_active"),
F.col("is_signals_metric"),
F.col("is_derived_metric"),
F.col("unit_type"),
F.col("currency"),
F.col("data_owner_email"),
F.col("research_owner_email"),
F.col("va_metric_id"),
]
# Columns that fit the accuracy table schema but will be extracted from backtest information
BACKTEST_COLUMNS = [
F.col("quarter_start").cast("date"),
F.col("quarter_end").cast("date"),
F.col("quarter_label").cast("string"),
F.col("yd_calendar_quarter").cast("date").alias("yd_calendar_quarter"),
F.col("company_reported_value").cast("double").alias("company_reported_value"),
F.col("yd_estimate_value").cast("double").alias("yd_estimate_value"),
F.col("observed_value").cast("double"),
(F.col("observed_value") / F.col("company_reported_value"))
.cast("double")
.alias("observed_coverage_ratio"),
]
@track_usage
[docs]
def earnings_results_with_backtests(
df: DataFrame,
backtest_configurations: Optional[list[BackTestConfiguration]] = None,
calculate_va_yy_growth: bool = False,
yy_growth_tickers: list[str] = None,
) -> DataFrame:
"""
Returns a dataframe combining a summary of earnings results with backtests for KPIs using this function. The dataframe
follows the standard Backtest + Earnings Results schema for Investor reporting.
:param df: The Earnings Results Dataframe that the backtests correspond to w.r.t. Product, ticker, and KPI names. It is recommended that this is a dataframe of the table created from the ``A.earnings_results_from_gsheet``
:param backtest_configurations: An optional list of backtest configurations that will populate the output dataframe. The configurations should be created via ``A.backtest_configuration``. While typically backtests are specified, if no backtests are provided, the output table will pass through the earnings results dataframe with the standard schema for backtests.
Examples
--------
.. code-block:: python
:caption: Generating a Earnings Rankings with Backtests dataframe. Note that this also uses the ``A.backtest_configuration`` function.
from etl_toolkit import A, E, F
# Earnings results from Gsheets
EARNINGS_FILE_ID = "1w6cXoj6TaAt5WA8gm29DvYg4PNkiwyT4A219g6JS2p8"
EARNINGS_SHEET_ID = 1232299054
# Define backtests to include in output Datafame
BACKTESTS = [
A.backtest_configuration(
df=sample_backtest_df,
primary_dataset="skywalker",
secondary_dataset=None,
panel_used="202101_100",
methodology_notes="Uses geo weights and card type weights, yoy coverage change",
metric_type="backtest",
),
A.backtest_configuration(
df=sample_backtest_df,
primary_dataset="yoda",
secondary_dataset=None,
panel_used="202101_100",
methodology_notes="Uses geo weights and card type weights, yoy coverage change",
metric_type="qa",
),
]
df = A.earnings_results_with_backtests(
spark.table("yd_production.tgt_gold.earnings_results"),
BACKTESTS
)
display(df)
+--------------+--------------+-------+-----------------+
|quarter_start |quarter_end |... |metric_type |
+--------------+--------------+-------+-----------------+
|2024-01-01 |2024-03-31 |... |published |
+--------------+--------------+-------+-----------------+
|2023-10-01 |2023-12-31 |... |backtest |
+--------------+--------------+-------+-----------------+
|2023-10-01 |2023-12-31 |... |qa |
+--------------+--------------+-------+-----------------+
"""
# Start with earnings results dataframe and then iterate over each backtest
# unioning the backtest data once it fits the correct schema and has the appropriate metadata columns
# We use the earnings results dataframe for metadata columns to enrich each backtes
# If no backtests are provided, we return the accuracy source dataframe as is
combined_df = _standardize_backtest_schema(df)
backtest_configurations = backtest_configurations or []
if len(backtest_configurations):
for backtest_configuration in backtest_configurations:
enriched_backtest_df = _enriched_backtest_with_earnings(
backtest_configuration.df,
df,
calculate_va_yy_growth=calculate_va_yy_growth,
yy_growth_tickers=yy_growth_tickers,
)
final_backtest_df = _standardize_backtest_schema(
enriched_backtest_df,
backtest_configuration=backtest_configuration,
)
combined_df = combined_df.unionByName(
final_backtest_df, allowMissingColumns=True
)
return combined_df
def _latest_quarter_earnings_results(earnings_df: DataFrame) -> DataFrame:
static_earnings_df = dedupe_by_row_number(
earnings_df,
dedupe_columns=["product_name", "ticker", "kpi_name"],
order_columns=[F.desc("quarter_end")],
).select(*BASE_EARNINGS_COLUMNS)
return static_earnings_df
def _enriched_backtest_with_earnings(
backtest_df: DataFrame,
earnings_df: DataFrame,
calculate_va_yy_growth: bool = False,
yy_growth_tickers: list[str] = None,
) -> DataFrame:
earnings_df = _latest_quarter_earnings_results(earnings_df).alias("earnings")
# Join the latest quarter's earnings results by Product+Ticker KPI (case-insensitive)
# then enrich the backtest columns with the corresponding earnings columns
join_columns = {
"product_name_lower": F.lower("product_name"),
"ticker_lower": F.lower("ticker"),
"kpi_name_lower": F.lower("kpi_name"),
}
enriched_backtest_df = (
backtest_df.withColumns(join_columns)
.alias("backtest")
.join(
earnings_df.withColumns(join_columns), list(join_columns.keys()), how="left"
)
.drop(*list(join_columns.keys()))
.select(
*BACKTEST_COLUMNS,
*[F.col(f"earnings.{column}") for column in earnings_df.columns],
)
)
enriched_backtest_df = add_unified_consensus_column(
enriched_backtest_df,
calculate_va_yy_growth=calculate_va_yy_growth,
yy_growth_tickers=yy_growth_tickers,
)
return enriched_backtest_df
def _standardize_backtest_schema(
backtest_df: DataFrame,
backtest_configuration: Optional[BackTestConfiguration] = None,
) -> DataFrame:
standard_backtest_columns = [
F.col("quarter_start").cast("date"),
F.col("quarter_end").cast("date"),
F.col("quarter_label").cast("string"),
F.col("yd_calendar_quarter").cast("date").alias("yd_calendar_quarter"),
E.quarter_label("yd_calendar_quarter").cast("string").alias("yd_quarter_label"),
F.col("product_name").cast("string"),
F.col("ticker").cast("string"),
F.col("kpi_name").cast("string"),
F.col("company_reported_value").cast("double"),
F.col("yd_estimate_value").cast("double").alias("yd_estimate_value"),
F.col("observed_value").cast("double"),
F.col("observed_coverage_ratio").cast("double"),
*(
[
_placeholder_column("lower_range_80th_percentile", "double"),
_placeholder_column("upper_range_80th_percentile", "double"),
_placeholder_column("company_commentary", "string"),
_placeholder_column("yd_commentary", "string"),
_placeholder_column("yd_call_vs_consensus", "string"),
_placeholder_column("yd_call_outcome", "string"),
_placeholder_column("metric_level_narrative_ranking", "string"),
]
if backtest_configuration
else [
F.col("lower_range_80th_percentile")
.cast("double")
.alias("lower_range_80th_percentile"),
F.col("upper_range_80th_percentile")
.cast("double")
.alias("upper_range_80th_percentile"),
F.col("company_commentary").cast("string").alias("company_commentary"),
F.col("yd_commentary").cast("string").alias("yd_commentary"),
F.col("yd_call_vs_consensus")
.cast("string")
.alias("yd_call_vs_consensus"),
F.col("yd_call_outcome").cast("string").alias("yd_call_outcome"),
F.col("metric_level_narrative_ranking")
.cast("string")
.alias("metric_level_narrative_ranking"),
]
),
F.col("is_company_comparable").cast("boolean"),
F.col("is_active").cast("boolean"),
F.col("is_signals_metric").cast("boolean"),
F.col("is_derived_metric").cast("boolean"),
F.col("unit_type").cast("string"),
F.col("currency").cast("string"),
F.col("data_owner_email").cast("string"),
F.col("research_owner_email").cast("string"),
F.col("va_metric_id").cast("long"),
*(
[
F.col("va_consensus_value").cast("decimal(38,6)")
if "va_consensus_value" in backtest_df.columns
else _placeholder_column("va_consensus_value", "decimal(38,6)")
]
),
*(
# Fall back to accuracy table if no backtest is specified
backtest_configuration.as_columns
if backtest_configuration
else [
F.col("primary_dataset").cast("string").alias("primary_dataset"),
F.col("secondary_dataset").cast("string").alias("secondary_dataset"),
F.col("panel_used").cast("string").alias("panel_used"),
F.col("methodology_notes").cast("string").alias("methodology_notes"),
F.col("metric_type").cast("string").alias("metric_type"),
]
),
F.current_timestamp().cast("timestamp").alias("last_updated"),
]
updated_backtest_df = backtest_df.select(*standard_backtest_columns)
return updated_backtest_df