Source code for etl_toolkit.analyses.investor.consensus

from pyspark.sql import Column, DataFrame, functions as F
from pyspark.sql import Window as W
from etl_toolkit.analyses.dedupe import dedupe_by_row_number
from etl_toolkit import expressions as E
from etl_toolkit.exceptions import InvalidInputException

from yipit_databricks_utils.helpers.telemetry import track_usage
from yipit_databricks_utils.helpers.pyspark_utils import get_spark_session


@track_usage
[docs] def add_unified_consensus_column( df: DataFrame, calculate_va_yy_growth: bool = False, yy_growth_tickers: list[str] = None, ): """ Helper function to add ``va_consensus_value`` column to the input dataframe that represents the quarterly VA estimate and is used in reporting processes. The column is introduced via a join so that we resolve differences in how VA reports quarterly estimates The join logic handles the following adjustments: - Differing quarter ends between the company's actual fiscal periods and what VA provides - Normalization of growth rates represented as a percentage between 0 to 100 - Growth rate calculations when the reported metric is a growth rate while VA publishes nominal values :param df: Input DataFrame of quarterly earnings data. Must have a ``va_metric_id`` column and a ``quarter_end`` column to perform the join. :param calculate_va_yy_growth: Flag to control whether Y/Y growth rates should be calculated from VA nominal values. Used if the metric being reported is a growth rate while VA publishes actuals. :param yy_growth_tickers: List of tickers that when ``calculate_va_yy_growth=True``, these will use a the calculated Y/Y growth value from VA nominals. All other tickers will use the VA value as-is. """ input_df = df.alias("input_df") yy_growth_tickers = yy_growth_tickers or [] spark = get_spark_session() if "va_metric_id" not in input_df.columns: raise InvalidInputException("Missing va_metric_id column in provided dataframe") if "quarter_end" not in input_df.columns: raise InvalidInputException("Missing quarter_end column in provided dataframe") # The VA value we want to join is either: # A) the preq_consensus value # B) the VA value that we scraped on the day closest to the end of the fiscal quarter # A) is from the company_actuals_consensus_union table, # which reflects what the VA value was before the company reported. # This value is not always available though. va_daily = ( spark.table("yd_3p_visiblealpha.va_gold.company_consensus_daily_w_growth") ).alias("va_daily") # B) is used as a backup and it derived from the actuals table. # it is the most recent estimate on the day closest to the end of the fiscal quarter, # but not after the quarter ended. va_current = ( spark.table("yd_3p_visiblealpha.va_gold.company_actuals_consensus_union") ).alias("va_current") # Coalescing metrics from A) and B) to account for all cases va_coal = F.coalesce( F.col("va_with_correct_quarters.preq_consensus"), F.col("va_daily.value") ) va_yy_coal = F.coalesce( F.col("va_with_correct_quarters.preq_consensus_yy_growth_calc"), F.col("va_daily.value_yy_growth"), ) # For metrics in %, VA uses values between 0 and 100, # so we need to divide those by 100 to be consistent va_value = _normalize_percentage(va_coal) va_yy = _normalize_percentage(va_yy_coal) # In addition, VA quarter end dates are not accurate, # so first we need to take the accurate quarter end dates from input DF, # join those to the VA table using date range logic, # and use those accurate quarter end dates to select the correct date to pull the VA value from. quarter_dates = ( input_df.filter(F.col("va_metric_id").isNotNull()) .select( "quarter_end", "va_metric_id", ) .distinct() ).alias("quarter_dates") if quarter_dates.count() == 0: return input_df.withColumn("va_consensus_value", F.lit(None).cast("double")) va_with_correct_quarters = ( va_current.join( quarter_dates, # join based on VA metric ID + closest quarter # i.e. that the fiscal quarter end for a company falls within 30 days of the VA quarter end E.all( F.col("va_metric_id") == F.col("metric_id"), F.abs( F.datediff( F.col("quarter_dates.quarter_end"), F.col("va_current.quarter_end_date"), ) ) < 30, ), how="inner", ) ).alias("va_with_correct_quarters") joined_consensus = ( dedupe_by_row_number( va_with_correct_quarters.join( va_daily, E.all( F.col("va_with_correct_quarters.metric_id") == F.col("va_daily.metric_id"), F.col("va_with_correct_quarters.quarter_end") >= F.col("va_daily.date"), ), how="left", ), dedupe_columns=[ F.col("va_with_correct_quarters.metric_id"), F.col("va_with_correct_quarters.quarter_end"), ], order_columns=[ F.abs( F.col("va_with_correct_quarters.quarter_end") - F.col("va_daily.date") ).asc() ], ) .select( "va_with_correct_quarters.*", va_value.cast("decimal(38,6)").alias("va_consensus_value"), va_yy.cast("decimal(38,6)").alias("va_consensus_yy"), ) .alias("joined_consensus") ) input_df_updated = input_df.join( joined_consensus, E.all( F.col("input_df.va_metric_id") == F.col("joined_consensus.metric_id"), F.col("input_df.quarter_end") == F.col("joined_consensus.quarter_end"), ), how="left", ).select( "input_df.*", "joined_consensus.va_consensus_value", "joined_consensus.va_consensus_yy", ) # Finalize schema with va_consensus_value column addition # There are some tickers where the VA metric is in dollars, but we make estimates in Y/Y growth. # The calculate_va_yy_growth parameter to allow for this # In addition, some products with multiple tickers will need some to use the calculated yy growth # and others to just use the va value, a list of tickers # is used to convert to yy growth if calculate_va_yy_growth and len(yy_growth_tickers): final_df = input_df_updated.select( "input_df.*", ( F.when( F.col("ticker").isin(yy_growth_tickers), F.col("va_consensus_yy") ).otherwise(F.col("va_consensus_value")) ).alias("va_consensus_value"), ) elif calculate_va_yy_growth: final_df = input_df_updated.select( "input_df.*", F.col("va_consensus_yy").alias("va_consensus_value") ) else: final_df = input_df_updated.drop("va_consensus_yy") return final_df
def _normalize_percentage( value_column: str | Column, unit_column: str | Column = "va_with_correct_quarters.metric_unit", ) -> Column: value_column = E.normalize_column(value_column) unit_column = E.normalize_column(unit_column) return F.when(unit_column.isin(["Percent", "%"]), value_column / 100).otherwise( value_column )