A module (“analyses”)#

The analyses module in the etl_toolkit is imported as A. This module contains several functions that accept a dataframe, apply complex transformations, and return new dataframes. The functions should be used as larger or routine components of pipelines when possible. These transformations include deduping, complex filtering, enrichment, and aggregations.

Tip

It is highly recommended to use these functions to cut down on large pieces of repetitive logic. Steps like parsing, deduplication, and generating KPIs can be greatly simplified using these functions. It is also clearer to a teammate or a reviewer of the intention of a transformation when using these functions.

Tip

Many analysis functions that do some filtering or aggregation have a QA mode that when enabled the output dataframe will include additional columns and/or records to make investigation of the transformation or data easier. Functions that have a QA mode will include a qa boolean argument. See each function’s documentation for details on if it supports QA mode.

Calculation Analyses#

Suite of functions that perform common calculations to enrich existing dataframes, including lagged values or percent of total. It is recommended these functions are used rather than implementing the calculations in native pyspark as they apply special tecniques to be performant and are easier to read.

Card Analyses#

Suite of functions that perform common adjustments on card (Skywalker, Yoda, etc.) datasets derived dataframes, including lag adjustments, weighting, and paneling. It is recommended these functions are used rather than implementing manually as they are common enhancements to normalize the data trends typically found in these datasets.

E-Receipt Analyses#

Dedupe Analyses#

Suite of functions to handle deduplication operations on dataframes. The returned output will be a dataframe with unique rows based on some condition. These functions can be easier to read/write than trying to handle deduping in native pyspark.

Index Analyses#

Suite of functions to generate index metrics that can reveal the trajectory of a company KPI without directly benchmarking to it. These index columns can be tricky to calculate in native pyspark, so these functions can assist in the standardization of how those are calculated.

Investor Standard Analyses#

Suite of functions to generate common analyses used for investor research. It is recommended to use these functions as they follow specific standards for research reports and integrate with standard visualizations for charts to simplify publishing workflows.

Investor Reporting Analyses#

Suite of functions specific to the investor reporting process. It is recommended to use these functions as they follow specific standards for customer-facing assets and reduce time spent on otherwise manual processes.

etl_toolkit.A.backtest_configuration(df, primary_dataset, panel_used, methodology_notes, secondary_dataset=None, metric_type='backtest')#

Use this function to define backtest configurations that are used in the investor reporting process. A list of backtest_configurations can then be added to earnings results data using A.earnings_results_with_backtests.

Note

This function should not be used on its own. It’s not a standard analysis function that returns a dataframe. Instead, it defines a configuration that can be supplied to A.earnings_results_with_backtests.

Parameters:
  • df (DataFrame) – A backtest dataframe that contains the results of the back test for each prior publishing quarter. This can be obtained by filtering a quarterly_accuracy for metric_type="backtest" and status="active".

  • primary_dataset (str) – The primary dataset used for this KPI backtest. For ex: skywalker or yoda.

  • panel_used (str) – The corresponding Panel ID for the datasets used in this KPI backtest. For ex: 202101_100.

  • methodology_notes (str) – Any relevant information about the methodology for this backtest, examples include adjustments, coverage changes, weights, etc.

  • panel_used – The corresponding Panel ID for the datasets used in this KPI backtest. For ex: 202101_100.

  • secondary_dataset (Optional[str], default: None) – An optional, secondary dataset used for this KPI backtest. For ex: skywalker or yoda. Can leave None if not used. Default is None.

  • metric_type (Literal['backtest', 'qa'], default: 'backtest') – The type of KPI Backtest, can be backtest or qa. Default is backtest.

Return type:

None

Examples#

Create a list of backtest_configurations for use in A.earnings_results_with_backtests.#
from etl_toolkit import A, E, F

sample_backtest_df = (
    spark.table("yd_production.tgt_analysts.tgt_quarterly_accuracy")
    .where(F.col("metric_type") == 'backtest')
    .where(F.col("status") == 'active')
)

backtests = [
    A.backtest_configuration(
        df=sample_backtest_df,
        primary_dataset="skywalker",
        secondary_dataset=None,
        panel_used="202101_100",
        methodology_notes="Uses geo weights and card type weights, yoy coverage change",
        metric_type="backtest",
    ),
]
etl_toolkit.A.add_unified_consensus_column(df, calculate_va_yy_growth=False, yy_growth_tickers=None)[source]#

Helper function to add va_consensus_value column to the input dataframe that represents the quarterly VA estimate and is used in reporting processes.

The column is introduced via a join so that we resolve differences in how VA reports quarterly estimates The join logic handles the following adjustments: - Differing quarter ends between the company’s actual fiscal periods and what VA provides - Normalization of growth rates represented as a percentage between 0 to 100 - Growth rate calculations when the reported metric is a growth rate while VA publishes nominal values

Parameters:
  • df (DataFrame) – Input DataFrame of quarterly earnings data. Must have a va_metric_id column and a quarter_end column to perform the join.

  • calculate_va_yy_growth (bool, default: False) – Flag to control whether Y/Y growth rates should be calculated from VA nominal values. Used if the metric being reported is a growth rate while VA publishes actuals.

  • yy_growth_tickers (list[str], default: None) – List of tickers that when calculate_va_yy_growth=True, these will use a the calculated Y/Y growth value from VA nominals. All other tickers will use the VA value as-is.

Ordering Analyses#

Suite of functions to handle re-arranging dataframes columns and rows for common scenarios. These are helpful to use as they will be optimized for performance and/or avoid duplicating columns.

Parser Analyses#

Suite of functions to filter and extract important values from string columns to enrich dataframes. These are powerful functions to simplify and organize complex regex or conditional logic that are usually the critical first steps of product pipelines to filter out unrelated data.

etl_toolkit.A.parser(parser_id, include_any=<factory>, exclude_any=<factory>, with_columns=<factory>, metadata=<factory>)#

Use this function to define parsing configurations that are applied in the A.parse_records function. These parsers can control inclusion and exclusion filtering logic via include_any and exclude_any and enrichment columns through with_columns.

Tip

Encapsulating logic in parsers like this make transformation logic modular and more easily understood. For example, if covering multiple companies, each company’s filtering logic can be defined as a distinct parser.

Note

This function should not be used on its own. It’s not a standard analysis function that returns a dataframe. Instead, it defines a configuration that can be supplied to A.parse_records.

Parameters:
  • parser_id (str) – Unique ID of the parser object that is used in the parser_id column included in the A.parse_records function. This ID must be unique within the list of configurations used in A.parse_records

  • include_any (list[Column] | dict[str, Column], default: <factory>) – A list or dict of boolean Column expressions. If any of these column expressions evaluate to True for a record, the parser is considered match (assuming exclude_any also does not have a match). If a dict is provided instead of a list, then key/value pairs are supplied where the key is a Unique ID for condition and the value is the boolean expression.

  • exclude_any (list[Column] | dict[str, Column], default: <factory>) – A list or dict of boolean Column expressions. If any of these column expressions evaluate to True for a record, the record is not included in the output of A.parse_records. If a dict is provided instead of a list, then key/value pairs are supplied where the key is a Unique ID for condition and the value is the boolean expression.

  • with_columns (dict[str, str | Column], default: <factory>) – A dict of key/value pairs where the key is a column name to add and the value is a Column expression. These columns are included on a record of A.parse_records should the parser match.

  • metadata (dict[str, str | int | date | datetime | bool | float], default: <factory>) – Generally not used outside of library developers. These dicts can include arbitrary key/value information about the parser. These values are not added to the outputs of A.parse_records

Return type:

None

Examples#

Simple parser example with A.parse_records.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])

display(
    A.parse_records(
        df,
        configs=[
            A.parser(
                parser_id="red_parser",
                include_any=[
                    F.col("color") == "red"
                ],
            ),
            A.parser(
                parser_id="blue_parser",
                include_any=[
                    F.col("color") == "blue"
                ],
            ),
        ],
    )
)

color

date

value

parser_id

red

2024-01-01

100

red_parser

blue

2024-01-02

50

blue_parser

Parser example with A.parse_records when with_columns is used. Notice how the columns are added to the output dataframe. Notice that if the set of keys of columns to add are not consisted between parsers, the function will resolve the missing columns as nulls.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])

display(
    A.parse_records(
        df,
        configs=[
            A.parser(
                parser_id="red_parser",
                include_any=[
                    F.col("color") == "red"
                ],
                with_columns={
                    "enriched_red": F.lit(True),
                    "enriched_red_2": F.lit(1),
                },
            ),
            A.parser(
                parser_id="blue_parser",
                include_any=[
                    F.col("color") == "blue"
                ],
                with_columns={
                    "enriched_blue": F.lit(True),
                    "enriched_blue_2": F.lit(1),
                },
            ),
        ],
    )
)

color

date

value

parser_id

enriched_red

enriched_red_2

enriched_blue

enriched_blue_2

red

2024-01-01

100

red_parser

true

1

null

null

blue

2024-01-02

50

blue_parser

null

null

true

1

Parser example with include_any and exclude_any expressed as dicts. The keys are present in the output dataframe when QA mode is used for A.parse_records and the condition is satisfied for the record.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])

display(
    A.parse_records(
        df,
        configs=[
            A.parser(
                parser_id="red_parser",
                include_any={
                    "color_match": F.col("color") == "red",
                },
            ),
            A.parser(
                parser_id="blue_parser",
                include_any={
                    "color_match": F.col("color") == "blue",
                },
                exclude_any={
                    "date_stale": F.col("date") < date(2024, 1, 3),
                },
            ),
        ],
        qa=True,
    )
)

color

date

value

parser_id

parser_exclusion_case

parser_inclusion_case

red

2024-01-01

100

red_parser

null

color_match

blue

2024-01-02

50

blue_parser

date_stale

color_match

Scalar Analyses#

Suite of functions to genenerate scalars (i.e. python literal values) for common pyspark operations. These can be useful to retrieve values into python from dataframes and re-use them in pipeline code.

Note

Unlike other analyses functions in the toolkit, these scalar functions return python literal values, (ex: int, float, etc.) instead of dataframes.

Time Analyses#

Suite of functions to manage date and timestamp operations. Use these functions for filling date ranges and grossing up data to higher periodicities.

Comparison Analyses#

Suite of functions to compare DataFrames by schema or row content. These are useful for data validation, testing pipeline outputs, and ensuring data quality. The functions help identify differences between two DataFrames either at the schema level or by comparing individual rows.

Investor Standard Metrics Analyses#

Suite of functions specific to the standard metrics experience. These are used to generate unified KPI analyses and downstream dashboard and feed tables for a metric.

Calendar Analyses#

Suite of functions and classes to generate and manage calendar data with support for custom fiscal periods, holidays, and business day calculations. These calendar utilities standardize period definitions and holiday handling across an organization.