Source code for etl_toolkit.analyses.investor.backtest

from typing import Optional, Literal
from dataclasses import dataclass

from pyspark.sql import Column, DataFrame, functions as F, Window as W
from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit import expressions as E
from etl_toolkit.analyses.dedupe import dedupe_by_row_number
from etl_toolkit.analyses.investor.consensus import add_unified_consensus_column


@dataclass
class BackTestConfiguration:
    """
    Use this function to define backtest configurations that are used in the investor reporting process. A list of ``backtest_configurations`` can then be added to earnings results data using ``A.earnings_results_with_backtests``.

    .. note:: This function should not be used on its own. It's not a standard analysis function that returns a dataframe.
              Instead, it defines a configuration that can be supplied to ``A.earnings_results_with_backtests``.

    :param df: A backtest dataframe that contains the results of the back test for each prior publishing quarter. This can be obtained
               by filtering a quarterly_accuracy for ``metric_type="backtest"`` and ``status="active"``.
    :param primary_dataset: The primary dataset used for this KPI backtest. For ex: ``skywalker`` or ``yoda``.
    :param panel_used: The corresponding Panel ID for the datasets used in this KPI backtest. For ex: `202101_100`.
    :param methodology_notes: Any relevant information about the methodology for this backtest, examples include adjustments, coverage changes, weights, etc.
    :param panel_used: The corresponding Panel ID for the datasets used in this KPI backtest. For ex: ``202101_100``.
    :param secondary_dataset: An optional, secondary dataset used for this KPI backtest. For ex: ``skywalker`` or ``yoda``. Can leave ``None`` if not used. Default is ``None``.
    :param metric_type: The type of KPI Backtest, can be ``backtest`` or ``qa``. Default is ``backtest``.

    Examples
    ^^^^^^^^^^^^^
    .. code-block:: python
        :caption: Create a list of backtest_configurations for use in A.earnings_results_with_backtests.

        from etl_toolkit import A, E, F

        sample_backtest_df = (
            spark.table("yd_production.tgt_analysts.tgt_quarterly_accuracy")
            .where(F.col("metric_type") == 'backtest')
            .where(F.col("status") == 'active')
        )

        backtests = [
            A.backtest_configuration(
                df=sample_backtest_df,
                primary_dataset="skywalker",
                secondary_dataset=None,
                panel_used="202101_100",
                methodology_notes="Uses geo weights and card type weights, yoy coverage change",
                metric_type="backtest",
            ),
        ]

    """

    df: DataFrame
    primary_dataset: str
    panel_used: str
    methodology_notes: str
    secondary_dataset: Optional[str] = None
    metric_type: Literal["backtest", "qa"] = "backtest"

    @property
    def as_columns(self) -> list[Column]:
        """
        Returns an ordered list of columns that are necessary for the final backtest results df
        """
        return [
            F.lit(self.primary_dataset).cast("string").alias("primary_dataset"),
            F.lit(self.secondary_dataset).cast("string").alias("secondary_dataset"),
            F.lit(self.panel_used).cast("string").alias("panel_used"),
            F.lit(self.methodology_notes).cast("string").alias("methodology_notes"),
            F.lit(self.metric_type).cast("string").alias("metric_type"),
        ]


[docs] backtest_configuration = BackTestConfiguration
def _placeholder_column(column_name: str, column_type: str) -> Column: return F.lit(None).cast(column_type).alias(column_name) # Static metadata from the accuracy table that is enriched on backtest data BASE_EARNINGS_COLUMNS = [ F.col("product_name"), F.col("ticker"), F.col("kpi_name"), F.col("is_company_comparable"), F.col("is_active"), F.col("is_signals_metric"), F.col("is_derived_metric"), F.col("unit_type"), F.col("currency"), F.col("data_owner_email"), F.col("research_owner_email"), F.col("va_metric_id"), ] # Columns that fit the accuracy table schema but will be extracted from backtest information BACKTEST_COLUMNS = [ F.col("quarter_start").cast("date"), F.col("quarter_end").cast("date"), F.col("quarter_label").cast("string"), F.col("yd_calendar_quarter").cast("date").alias("yd_calendar_quarter"), F.col("company_reported_value").cast("double").alias("company_reported_value"), F.col("yd_estimate_value").cast("double").alias("yd_estimate_value"), F.col("observed_value").cast("double"), (F.col("observed_value") / F.col("company_reported_value")) .cast("double") .alias("observed_coverage_ratio"), ] @track_usage
[docs] def earnings_results_with_backtests( df: DataFrame, backtest_configurations: Optional[list[BackTestConfiguration]] = None, calculate_va_yy_growth: bool = False, yy_growth_tickers: list[str] = None, ) -> DataFrame: """ Returns a dataframe combining a summary of earnings results with backtests for KPIs using this function. The dataframe follows the standard Backtest + Earnings Results schema for Investor reporting. :param df: The Earnings Results Dataframe that the backtests correspond to w.r.t. Product, ticker, and KPI names. It is recommended that this is a dataframe of the table created from the ``A.earnings_results_from_gsheet`` :param backtest_configurations: An optional list of backtest configurations that will populate the output dataframe. The configurations should be created via ``A.backtest_configuration``. While typically backtests are specified, if no backtests are provided, the output table will pass through the earnings results dataframe with the standard schema for backtests. Examples -------- .. code-block:: python :caption: Generating a Earnings Rankings with Backtests dataframe. Note that this also uses the ``A.backtest_configuration`` function. from etl_toolkit import A, E, F # Earnings results from Gsheets EARNINGS_FILE_ID = "1w6cXoj6TaAt5WA8gm29DvYg4PNkiwyT4A219g6JS2p8" EARNINGS_SHEET_ID = 1232299054 # Define backtests to include in output Datafame BACKTESTS = [ A.backtest_configuration( df=sample_backtest_df, primary_dataset="skywalker", secondary_dataset=None, panel_used="202101_100", methodology_notes="Uses geo weights and card type weights, yoy coverage change", metric_type="backtest", ), A.backtest_configuration( df=sample_backtest_df, primary_dataset="yoda", secondary_dataset=None, panel_used="202101_100", methodology_notes="Uses geo weights and card type weights, yoy coverage change", metric_type="qa", ), ] df = A.earnings_results_with_backtests( spark.table("yd_production.tgt_gold.earnings_results"), BACKTESTS ) display(df) +--------------+--------------+-------+-----------------+ |quarter_start |quarter_end |... |metric_type | +--------------+--------------+-------+-----------------+ |2024-01-01 |2024-03-31 |... |published | +--------------+--------------+-------+-----------------+ |2023-10-01 |2023-12-31 |... |backtest | +--------------+--------------+-------+-----------------+ |2023-10-01 |2023-12-31 |... |qa | +--------------+--------------+-------+-----------------+ """ # Start with earnings results dataframe and then iterate over each backtest # unioning the backtest data once it fits the correct schema and has the appropriate metadata columns # We use the earnings results dataframe for metadata columns to enrich each backtes # If no backtests are provided, we return the accuracy source dataframe as is combined_df = _standardize_backtest_schema(df) backtest_configurations = backtest_configurations or [] if len(backtest_configurations): for backtest_configuration in backtest_configurations: enriched_backtest_df = _enriched_backtest_with_earnings( backtest_configuration.df, df, calculate_va_yy_growth=calculate_va_yy_growth, yy_growth_tickers=yy_growth_tickers, ) final_backtest_df = _standardize_backtest_schema( enriched_backtest_df, backtest_configuration=backtest_configuration, ) combined_df = combined_df.unionByName( final_backtest_df, allowMissingColumns=True ) return combined_df
def _latest_quarter_earnings_results(earnings_df: DataFrame) -> DataFrame: static_earnings_df = dedupe_by_row_number( earnings_df, dedupe_columns=["product_name", "ticker", "kpi_name"], order_columns=[F.desc("quarter_end")], ).select(*BASE_EARNINGS_COLUMNS) return static_earnings_df def _enriched_backtest_with_earnings( backtest_df: DataFrame, earnings_df: DataFrame, calculate_va_yy_growth: bool = False, yy_growth_tickers: list[str] = None, ) -> DataFrame: earnings_df = _latest_quarter_earnings_results(earnings_df).alias("earnings") # Join the latest quarter's earnings results by Product+Ticker KPI (case-insensitive) # then enrich the backtest columns with the corresponding earnings columns join_columns = { "product_name_lower": F.lower("product_name"), "ticker_lower": F.lower("ticker"), "kpi_name_lower": F.lower("kpi_name"), } enriched_backtest_df = ( backtest_df.withColumns(join_columns) .alias("backtest") .join( earnings_df.withColumns(join_columns), list(join_columns.keys()), how="left" ) .drop(*list(join_columns.keys())) .select( *BACKTEST_COLUMNS, *[F.col(f"earnings.{column}") for column in earnings_df.columns], ) ) enriched_backtest_df = add_unified_consensus_column( enriched_backtest_df, calculate_va_yy_growth=calculate_va_yy_growth, yy_growth_tickers=yy_growth_tickers, ) return enriched_backtest_df def _standardize_backtest_schema( backtest_df: DataFrame, backtest_configuration: Optional[BackTestConfiguration] = None, ) -> DataFrame: standard_backtest_columns = [ F.col("quarter_start").cast("date"), F.col("quarter_end").cast("date"), F.col("quarter_label").cast("string"), F.col("yd_calendar_quarter").cast("date").alias("yd_calendar_quarter"), E.quarter_label("yd_calendar_quarter").cast("string").alias("yd_quarter_label"), F.col("product_name").cast("string"), F.col("ticker").cast("string"), F.col("kpi_name").cast("string"), F.col("company_reported_value").cast("double"), F.col("yd_estimate_value").cast("double").alias("yd_estimate_value"), F.col("observed_value").cast("double"), F.col("observed_coverage_ratio").cast("double"), *( [ _placeholder_column("lower_range_80th_percentile", "double"), _placeholder_column("upper_range_80th_percentile", "double"), _placeholder_column("company_commentary", "string"), _placeholder_column("yd_commentary", "string"), _placeholder_column("yd_call_vs_consensus", "string"), _placeholder_column("yd_call_outcome", "string"), _placeholder_column("metric_level_narrative_ranking", "string"), ] if backtest_configuration else [ F.col("lower_range_80th_percentile") .cast("double") .alias("lower_range_80th_percentile"), F.col("upper_range_80th_percentile") .cast("double") .alias("upper_range_80th_percentile"), F.col("company_commentary").cast("string").alias("company_commentary"), F.col("yd_commentary").cast("string").alias("yd_commentary"), F.col("yd_call_vs_consensus") .cast("string") .alias("yd_call_vs_consensus"), F.col("yd_call_outcome").cast("string").alias("yd_call_outcome"), F.col("metric_level_narrative_ranking") .cast("string") .alias("metric_level_narrative_ranking"), ] ), F.col("is_company_comparable").cast("boolean"), F.col("is_active").cast("boolean"), F.col("is_signals_metric").cast("boolean"), F.col("is_derived_metric").cast("boolean"), F.col("unit_type").cast("string"), F.col("currency").cast("string"), F.col("data_owner_email").cast("string"), F.col("research_owner_email").cast("string"), F.col("va_metric_id").cast("long"), *( [ F.col("va_consensus_value").cast("decimal(38,6)") if "va_consensus_value" in backtest_df.columns else _placeholder_column("va_consensus_value", "decimal(38,6)") ] ), *( # Fall back to accuracy table if no backtest is specified backtest_configuration.as_columns if backtest_configuration else [ F.col("primary_dataset").cast("string").alias("primary_dataset"), F.col("secondary_dataset").cast("string").alias("secondary_dataset"), F.col("panel_used").cast("string").alias("panel_used"), F.col("methodology_notes").cast("string").alias("methodology_notes"), F.col("metric_type").cast("string").alias("metric_type"), ] ), F.current_timestamp().cast("timestamp").alias("last_updated"), ] updated_backtest_df = backtest_df.select(*standard_backtest_columns) return updated_backtest_df