from pyspark.sql import Column, DataFrame, functions as F
from pyspark.sql import Window as W
from etl_toolkit.analyses.dedupe import dedupe_by_row_number
from etl_toolkit import expressions as E
from etl_toolkit.exceptions import InvalidInputException
from yipit_databricks_utils.helpers.telemetry import track_usage
from yipit_databricks_utils.helpers.pyspark_utils import get_spark_session
@track_usage
[docs]
def add_unified_consensus_column(
df: DataFrame,
calculate_va_yy_growth: bool = False,
yy_growth_tickers: list[str] = None,
):
"""
Helper function to add ``va_consensus_value`` column to the input dataframe that represents the quarterly VA estimate and is used in reporting processes.
The column is introduced via a join so that we resolve differences in how VA reports quarterly estimates The join logic handles the following adjustments:
- Differing quarter ends between the company's actual fiscal periods and what VA provides
- Normalization of growth rates represented as a percentage between 0 to 100
- Growth rate calculations when the reported metric is a growth rate while VA publishes nominal values
:param df: Input DataFrame of quarterly earnings data. Must have a ``va_metric_id`` column and a ``quarter_end`` column to perform the join.
:param calculate_va_yy_growth: Flag to control whether Y/Y growth rates should be calculated from VA nominal values. Used if the metric being reported is a growth rate while VA publishes actuals.
:param yy_growth_tickers: List of tickers that when ``calculate_va_yy_growth=True``, these will use a the calculated Y/Y growth value from VA nominals. All other tickers will use the VA value as-is.
"""
input_df = df.alias("input_df")
yy_growth_tickers = yy_growth_tickers or []
spark = get_spark_session()
if "va_metric_id" not in input_df.columns:
raise InvalidInputException("Missing va_metric_id column in provided dataframe")
if "quarter_end" not in input_df.columns:
raise InvalidInputException("Missing quarter_end column in provided dataframe")
# The VA value we want to join is either:
# A) the preq_consensus value
# B) the VA value that we scraped on the day closest to the end of the fiscal quarter
# A) is from the company_actuals_consensus_union table,
# which reflects what the VA value was before the company reported.
# This value is not always available though.
va_daily = (
spark.table("yd_3p_visiblealpha.va_gold.company_consensus_daily_w_growth")
).alias("va_daily")
# B) is used as a backup and it derived from the actuals table.
# it is the most recent estimate on the day closest to the end of the fiscal quarter,
# but not after the quarter ended.
va_current = (
spark.table("yd_3p_visiblealpha.va_gold.company_actuals_consensus_union")
).alias("va_current")
# Coalescing metrics from A) and B) to account for all cases
va_coal = F.coalesce(
F.col("va_with_correct_quarters.preq_consensus"), F.col("va_daily.value")
)
va_yy_coal = F.coalesce(
F.col("va_with_correct_quarters.preq_consensus_yy_growth_calc"),
F.col("va_daily.value_yy_growth"),
)
# For metrics in %, VA uses values between 0 and 100,
# so we need to divide those by 100 to be consistent
va_value = _normalize_percentage(va_coal)
va_yy = _normalize_percentage(va_yy_coal)
# In addition, VA quarter end dates are not accurate,
# so first we need to take the accurate quarter end dates from input DF,
# join those to the VA table using date range logic,
# and use those accurate quarter end dates to select the correct date to pull the VA value from.
quarter_dates = (
input_df.filter(F.col("va_metric_id").isNotNull())
.select(
"quarter_end",
"va_metric_id",
)
.distinct()
).alias("quarter_dates")
if quarter_dates.count() == 0:
return input_df.withColumn("va_consensus_value", F.lit(None).cast("double"))
va_with_correct_quarters = (
va_current.join(
quarter_dates,
# join based on VA metric ID + closest quarter
# i.e. that the fiscal quarter end for a company falls within 30 days of the VA quarter end
E.all(
F.col("va_metric_id") == F.col("metric_id"),
F.abs(
F.datediff(
F.col("quarter_dates.quarter_end"),
F.col("va_current.quarter_end_date"),
)
)
< 30,
),
how="inner",
)
).alias("va_with_correct_quarters")
joined_consensus = (
dedupe_by_row_number(
va_with_correct_quarters.join(
va_daily,
E.all(
F.col("va_with_correct_quarters.metric_id")
== F.col("va_daily.metric_id"),
F.col("va_with_correct_quarters.quarter_end")
>= F.col("va_daily.date"),
),
how="left",
),
dedupe_columns=[
F.col("va_with_correct_quarters.metric_id"),
F.col("va_with_correct_quarters.quarter_end"),
],
order_columns=[
F.abs(
F.col("va_with_correct_quarters.quarter_end")
- F.col("va_daily.date")
).asc()
],
)
.select(
"va_with_correct_quarters.*",
va_value.cast("decimal(38,6)").alias("va_consensus_value"),
va_yy.cast("decimal(38,6)").alias("va_consensus_yy"),
)
.alias("joined_consensus")
)
input_df_updated = input_df.join(
joined_consensus,
E.all(
F.col("input_df.va_metric_id") == F.col("joined_consensus.metric_id"),
F.col("input_df.quarter_end") == F.col("joined_consensus.quarter_end"),
),
how="left",
).select(
"input_df.*",
"joined_consensus.va_consensus_value",
"joined_consensus.va_consensus_yy",
)
# Finalize schema with va_consensus_value column addition
# There are some tickers where the VA metric is in dollars, but we make estimates in Y/Y growth.
# The calculate_va_yy_growth parameter to allow for this
# In addition, some products with multiple tickers will need some to use the calculated yy growth
# and others to just use the va value, a list of tickers
# is used to convert to yy growth
if calculate_va_yy_growth and len(yy_growth_tickers):
final_df = input_df_updated.select(
"input_df.*",
(
F.when(
F.col("ticker").isin(yy_growth_tickers), F.col("va_consensus_yy")
).otherwise(F.col("va_consensus_value"))
).alias("va_consensus_value"),
)
elif calculate_va_yy_growth:
final_df = input_df_updated.select(
"input_df.*", F.col("va_consensus_yy").alias("va_consensus_value")
)
else:
final_df = input_df_updated.drop("va_consensus_yy")
return final_df
def _normalize_percentage(
value_column: str | Column,
unit_column: str | Column = "va_with_correct_quarters.metric_unit",
) -> Column:
value_column = E.normalize_column(value_column)
unit_column = E.normalize_column(unit_column)
return F.when(unit_column.isin(["Percent", "%"]), value_column / 100).otherwise(
value_column
)