from typing import Optional
from pyspark.sql import Column, DataFrame, functions as F
from yipit_databricks_utils.future import read_gsheet
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit import expressions as E
from etl_toolkit.analyses.investor.consensus import add_unified_consensus_column
@track_usage
def observed_coverage_ratio(
observed_column: str | Column,
company_reported_column: str | Column,
) -> Column:
return E.normalize_column(observed_column) / E.normalize_column(
company_reported_column
)
EARNINGS_RESULTS_COLUMNS = [
E.normalize_date("quarter_start").alias("quarter_start"),
E.normalize_date("quarter_end").alias("quarter_end"),
F.col("quarter_label").cast("string"),
E.normalize_date("yd_calendar_quarter").alias("yd_calendar_quarter"),
E.quarter_label(E.normalize_date("yd_calendar_quarter")).alias("yd_quarter_label"),
F.col("product_name").cast("string"),
F.col("ticker").cast("string"),
F.col("kpi_name").cast("string"),
F.col("company_reported_value").cast("double"),
F.col("yd_estimate_value").cast("double"),
F.lit(None).cast("double").alias("observed_value"),
F.lit(None)
.alias("observed_coverage_ratio")
.cast("double")
.alias("observed_coverage_ratio"),
# Spark connect has issues with casting invalid strings to double type
F.expr("try_cast(lower_range_80th_percentile AS DOUBLE)")
.cast("double")
.alias("lower_range_80th_percentile"),
F.expr("try_cast(upper_range_80th_percentile AS DOUBLE)")
.cast("double")
.alias("upper_range_80th_percentile"),
F.col("company_commentary").cast("string"),
F.col("yd_commentary").cast("string"),
F.col("yd_call_vs_consensus").cast("string"),
F.col("yd_call_outcome").cast("string"),
F.col("metric_level_narrative_ranking").cast("string"),
F.col("is_company_comparable").cast("boolean"),
F.col("is_active").cast("boolean"),
F.col("is_signals_metric").cast("boolean"),
F.col("is_derived_metric").cast("boolean"),
F.col("unit_type").cast("string"),
F.col("currency").cast("string"),
F.col("data_owner_email").cast("string"),
F.col("research_owner_email").cast("string"),
F.col("va_metric_id").cast("long"),
F.col("primary_dataset").cast("string"),
F.col("secondary_dataset").cast("string"),
F.col("panel_used").cast("string"),
F.col("methodology_notes").cast("string"),
F.lit("published").cast("string").alias("metric_type"),
F.current_timestamp().cast("timestamp").alias("last_updated"),
]
@track_usage
[docs]
def earnings_results_from_gsheet(
gsheet_file_id: str,
sheet_id: int,
additional_columns: Optional[list[str | Column]] = None,
calculate_va_yy_growth: bool = False,
yy_growth_tickers: list[str] = None,
) -> DataFrame:
"""
Returns a dataframe of Earnings Results data retrieved from the specified Gsheet. The dataframe
follows the standard Earnings Results schema for Investor reporting.
.. warning:: When using this function, the Gsheet will need to be permissioned correctly to be read from databricks and has the correct sheet columns and values to be compatible the standard schema.
:param gsheet_file_id: The Gsheet file ID that contains earnings results. Can be found in the Gsheet URL.
:param sheet_id: The Gsheet sheet (tab) ID that contains earnings results. Can be found in the Gsheet URL.
:param additional_columns: Additional columns to add to the end of the Earnings results schema. Not typically used, and the default is for no additional columns to be added.
:param calculate_va_yy_growth: Flag to control whether Y/Y growth rates should be calculated from VA nominal values. Used if the metric being reported is a growth rate while VA publishes actuals.
:param yy_growth_tickers: List of tickers that when ``calculate_va_yy_growth=True``, these will use a the calculated Y/Y growth value from VA nominals. All other tickers will use the VA value as-is.
Examples
--------
.. code-block:: python
:caption: Generating an Earnings Results dataframe
from etl_toolkit import E, A, F
EARNINGS_FILE_ID = "1w6cXoj6TaAt5WA8gm29DvYg4PNkiwyT4A219g6JS2p8"
EARNINGS_SHEET_ID = 1232299054
df = A.earnings_results_from_gsheet(EARNINGS_FILE_ID, EARNINGS_SHEET_ID)
display(df)
+--------------+--------------+-------+------------------------------------+
|quarter_start |quarter_end |... |last_updated |
+--------------+--------------+-------+------------------------------------+
|2024-01-01 |2024-03-31 |... |2024-09-18 |
+--------------+--------------+-------+------------------------------------+
"""
additional_columns = additional_columns or []
columns = EARNINGS_RESULTS_COLUMNS + [
E.normalize_column(column) for column in additional_columns
]
df = read_gsheet(gsheet_file_id, sheet_id).select(*columns)
enriched_df = add_unified_consensus_column(
df,
calculate_va_yy_growth=calculate_va_yy_growth,
yy_growth_tickers=yy_growth_tickers,
)
return enriched_df
EARNINGS_RANKING_COLUMNS = [
E.normalize_date("quarter_start").alias("quarter_start"),
E.normalize_date("quarter_end").alias("quarter_end"),
F.col("quarter_label").cast("string"),
E.normalize_date("yd_calendar_quarter").alias("yd_calendar_quarter"),
E.quarter_label(E.normalize_date("yd_calendar_quarter")).alias("yd_quarter_label"),
F.col("product_name").cast("string"),
F.col("ticker").cast("string"),
F.col("accuracy_ranking").cast("string"),
F.col("narrative_ranking").cast("string"),
]
@track_usage
[docs]
def earnings_rankings_from_gsheet(
gsheet_file_id: str,
sheet_id: int,
) -> DataFrame:
"""
Returns a dataframe of Earnings Rankings data retrieved from the specified Gsheet. The dataframe
follows the standard Earnings Rankings schema for Investor reporting.
In addition, will automatically add a ``va_consensus_value`` based on the values for the ``va_metric_id`` and ``quarter_end`` columns of the input dataframe.
.. warning:: When using this function, the Gsheet will need to be permissioned correctly to be read from databricks and has the correct sheet columns and values to be compatible the standard schema.
:param gsheet_file_id: The Gsheet file ID that contains the narrative rankings data. Can be found in the Gsheet URL.
:param sheet_id: The Gsheet sheet (tab) ID that contains the narrative rankings data. Can be found in the Gsheet URL.
Examples
--------
.. code-block:: python
:caption: Generating a Earnings Rankings dataframe
from etl_toolkit import E, A, F
EARNINGS_RANKINGS_FILE_ID = "1w6cXoj6TaAt5WA8gm29DvYg4PNkiwyT4A219g6JS2p8"
EARNINGS_RANKINGS_SHEET_ID = 2006888117
df = A.earnings_rankings_from_gsheet(EARNINGS_RANKINGS_FILE_ID, EARNINGS_RANKINGS_SHEET_ID)
display(df)
+--------------+--------------+-------+------------------------------------+
|quarter_start |quarter_end |... |narrative_ranking |
+--------------+--------------+-------+------------------------------------+
|2024-01-01 |2024-03-31 |... |Green |
+--------------+--------------+-------+------------------------------------+
"""
df = read_gsheet(gsheet_file_id, sheet_id).select(*EARNINGS_RANKING_COLUMNS)
return df