A module (“analyses”)#

The analyses module in the etl_toolkit is imported as A. This module contains several functions that accept a dataframe, apply complex transformations, and return new dataframes. The functions should be used as larger or routine components of pipelines when possible. These transformations include deduping, complex filtering, enrichment, and aggregations.

Tip

It is highly recommended to use these functions to cut down on large pieces of repetitive logic. Steps like parsing, deduplication, and generating KPIs can be greatly simplified using these functions. It is also clearer to a teammate or a reviewer of the intention of a transformation when using these functions.

Tip

Many analysis functions that do some filtering or aggregation have a QA mode that when enabled the output dataframe will include additional columns and/or records to make investigation of the transformation or data easier. Functions that have a QA mode will include a qa boolean argument. See each function’s documentation for details on if it supports QA mode.

Calculation Analyses#

Suite of functions that perform common calculations to enrich existing dataframes, including lagged values or percent of total. It is recommended these functions are used rather than implementing the calculations in native pyspark as they apply special tecniques to be performant and are easier to read.

etl_toolkit.A.add_lag_columns(df, value_columns, date_column, slice_columns=None, steps=1, step_unit='DAY')[source]#

Adds additional columns to input dataframe that are lagged version of the value_columns specified. The lag is calculated based on date_column and the specified interval (steps * step_unit). Lags can be performed within each slice if the slice_columns are specified.

The added lag columns are named in a standard way based on the interval, ex: (“revenue”, 1, “DAY”) -> “revenue_lag_1_day”

Caution

The lag is performed via a self-join to match a date with the corresponding (date - interval) and within any slice(s). It’s important to account for missing dates in the data to ensure accurate calculations. The A.fill_periods function can be useful in these scenarios.

Parameters:
  • df (pyspark.sql.DataFrame) – The input Dataframe to add lag calculations to

  • value_columns (list[str | pyspark.sql.Column]) – A list of Columns or strings to base the lag calculations for. Each column will have a corresponding lag column added. If strings are provided, they are resolved as Columns.

  • date_column (str | pyspark.sql.Column) – A Column or str that should be of date or timestamp type. This column is used to determine the lagged period for lag calculations. If a string is provided, it is resolved as a Column.

  • slice_columns (list[str | pyspark.sql.Column]) – An optional list of Columns or strings to define the slices of the dataframe. Within each slice a lag calculation will be generated based on the date_column + interval. If strings are provided, they are resolved as Columns.

  • steps (int) – The number of step_units that define a lag interval.

  • step_unit (Literal['DAY', 'WEEK', 'MONTH', 'YEAR', 'HOUR', 'MINUTE', 'SECOND']) – The unit of length of the lag period, and when combined with steps, equals the lag interval.

Return type:

pyspark.sql.DataFrame

Examples#

Basic example of a 1-day lag of the value column based on the date column.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 2)},
    {"value": 50, "color": "red", "date": date(2024, 1, 1)},
])

display(
    A.add_lag_columns(
        df,
        value_columns=["value"],
        date_column="date",
        step_unit="DAY",
        steps=1,
    )
)

color

date

value

value_lag_1_day

red

2024-01-02

100

50

red

2024-01-01

50

null

Example of a 2-day lag of the value column based on the date column. Notice the lag column name changed to reflect the interval.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 250, "color": "red", "date": date(2024, 1, 5)},
    {"value": 200, "color": "red", "date": date(2024, 1, 4)},
    {"value": 150, "color": "red", "date": date(2024, 1, 3)},
    {"value": 100, "color": "red", "date": date(2024, 1, 2)},
    {"value": 50, "color": "red", "date": date(2024, 1, 1)},
])

display(
    A.add_lag_columns(
        df,
        value_columns=["value"],
        date_column="date",
        step_unit="DAY",
        steps=2,
    )
)

color

date

value

value_lag_2_day

red

2024-01-05

250

150

red

2024-01-04

200

100

red

2024-01-03

150

50

red

2024-01-02

100

null

red

2024-01-01

50

null

Example of a 2-week lag of the value column based on the date column. Notice how the dataframe has a weekly periodicity, and the data is still lagged correctly because a self-join is performed in the operation.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 250, "color": "red", "date": date(2024, 1, 29)},
    {"value": 200, "color": "red", "date": date(2024, 1, 22)},
    {"value": 150, "color": "red", "date": date(2024, 1, 15)},
    {"value": 100, "color": "red", "date": date(2024, 1, 8)},
    {"value": 50, "color": "red", "date": date(2024, 1, 1)},
])

display(
    A.add_lag_columns(
        df,
        value_columns=["value"],
        date_column="date",
        step_unit="WEEK",
        steps=2,
    )
)

color

date

value

value_lag_2_week

red

2024-01-29

250

150

red

2024-01-22

200

100

red

2024-01-15

150

50

red

2024-01-08

100

null

red

2024-01-01

50

null

Example of a 1-day lag of the value column using the color column for slices. Notice that the lag is applied within each unique slice of the dataframe.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 2)},
    {"value": 50, "color": "red", "date": date(2024, 1, 1)},
    {"value": 150, "color": "blue", "date": date(2024, 1, 2)},
    {"value": 75, "color": "blue", "date": date(2024, 1, 1)},
])

display(
    A.add_lag_columns(
        df,
        value_columns=["value"],
        date_column="date",
        slice_columns=["color"],
        step_unit="DAY",
        steps=1,
    )
)

color

date

value

value_lag_1_day

red

2024-01-02

100

50

red

2024-01-01

50

null

blue

2024-01-02

150

75

blue

2024-01-01

75

null

etl_toolkit.A.add_percent_of_total_columns(df, value_columns, total_grouping_columns, suffix='percent')[source]#

Add additional column(s) to the dataframe that equal the percent of each row of the value_columns given the sum of the values across the total_grouping_columns.

Each percent column added will have a standard naming convention that is the <value_column>_<suffix> (ex: “gmv” -> “gmv_percent”). The default suffix is “percent”, but this can be adjusted.

Tip

It is recommended this function is used whenever a percent of total operation is needed. The implementation uses a group by + join under the hood, which is more performant than a window expression.

Parameters:
  • df (pyspark.sql.DataFrame) – The dataframe to add percent of total columns to

  • value_columns (list[str | pyspark.sql.Column]) – A list of Columns or strings to generate percent of total columns. Each column specified will have a percent of total column added to the output dataframe. If a strings are provided, they will be resolved as Columns.

  • total_grouping_columns (list[str | pyspark.sql.Column]) – A list of Columns or strings to define the slices of the dataframe. The sum of the value column for each unique combination of values in this list will be used as the denominator for the percentage column. If a strings are provided, they will be resolved as Columns.

  • suffix (str) – The suffix that is added to each percent column generated for the output dataframe.

Return type:

pyspark.sql.DataFrame

Examples#

Example of generating percent of total columns based on the color column for groupings.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 2)},
    {"value": 50, "color": "red", "date": date(2024, 1, 1)},
])

display(
    A.add_percent_of_total_columns(
        df,
        value_columns=["value"],
        total_grouping_columns=["color"],
    )
)

color

date

value

value_percent

red

2024-01-02

100

0.666666

red

2024-01-01

50

0.333333

Example of modifying the suffix argument to adjust the new column names.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 2)},
    {"value": 50, "color": "red", "date": date(2024, 1, 1)},
])

display(
    A.add_percent_of_total_columns(
        df,
        value_columns=["value"],
        total_grouping_columns=["color"],
        suffix="percent_of_total",
    )
)

color

date

value

value_percent_of_total

red

2024-01-02

100

0.666666

red

2024-01-01

50

0.333333

Example of generating multiple percent of total columns based on the date column for groupings.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1), "count": 5},
    {"value": 50, "color": "blue", "date": date(2024, 1, 1), "count": 10},
])

display(
    A.add_percent_of_total_columns(
        df,
        value_columns=["value", "count"],
        total_grouping_columns=["date"],
    )
)

color

date

value

count

count_percent

value_percent

red

2024-01-01

100

5

0.333333

0.666666

blue

2024-01-01

50

10

0.666666

0.333333

Card Analyses#

Suite of functions that perform common adjustments on card (Skywalker, Yoda, etc.) datasets derived dataframes, including lag adjustments, weighting, and paneling. It is recommended these functions are used rather than implementing manually as they are common enhancements to normalize the data trends typically found in these datasets.

etl_toolkit.A.add_card_date_adjustment(df, date_column_name='date', group_column_names=None, adjustment_type='revenue', correct_jan_24=True, dataset='skywalker', source_country_list=['uk_de_at', 'fr'], amount_column='amount_usd')[source]#

Modifies the date_column_name specified for the input dataframe (df) to account for delays in processing transactions. For Skywalker data, this is based on the yipit_cobrand_id and card source. For Mando data, this is based on the source_country and card_type. The date adjustment is determined by the adjustment_type, which indicates either the count of transactions or total revenue for the transactions observed during each period. This adjustment is necessary given delays in processing transactions due to holidays or non-business days that can distort the actual date a transaction occurred.

The additional or modified columns added to the input df for this adjustment include:

  • <date_column_name>: This column will be modified to reflect the corrected date based on the expected lag. Date type.

  • <date_column_name>_raw: A new column added that indicates the original, non-adjusted date for the transaction. Date type.

Parameters:
  • df (pyspark.sql.DataFrame) – Input dataframe to add adjustment columns to through this function. It should be generated from the txns_all table from either Skywalker or Mando and parsed for specific merchant(s).

  • date_column_name (str) – The date column name of the df to adjust. This function will correct the dates of transactions within this column. Defaults to date.

  • group_column_names (Optional[list[str]]) – An optional list of grouping columns to generate lag adjustments within. This can be used if the input dataframe spans multiple slices where the date behavior can be different. Defaults to None, which indicates there are no slices to group by for this adjustment.

  • adjustment_type (Literal['revenue', 'count']) – Indicates whether the date adjustment should be calculated based on revenue or transaction count. Defaults to revenue.

  • correct_jan_24 (bool) – Optional flag to control if a subset of 2024-01-23 transactions should be assigned to 2024-01-22 via a deterministic sample. Defaults to True.

  • dataset (Literal['skywalker', 'mando']) – The source dataset that the input df is derived from. Can be either skywalker or mando. Defaults to skywalker.

  • source_country_list (List) – For Mando data only. List of countries to process. Must be one or both of uk_de_at and fr. Defaults to ["uk_de_at", "fr"].

  • amount_column (str) – For Mando data only. The name of the amount column to use for revenue adjustments. Defaults to amount_usd.

Return type:

pyspark.sql.DataFrame

Examples#

Example of applying the date adjustment to a Skywalker dataset. Note how the date column values are changed and a new date_raw column is added with the original values.#
from etl_toolkit import E, F, A

cmg_txns = spark.table(f'yd_production.cmg_silver.txns_paneled')

display(
    A.add_card_date_adjustment(cmg_txns)
)

source

yipit_cobrand_id

date_raw

date

bank

1

2019-02-19

2019-02-16

bank

1

2019-02-19

2019-02-16

Example of applying the date adjustment to a Mando dataset for specific countries.#
from etl_toolkit import E, F, A

eu_txns = spark.table(f'yd_production.eu_silver.txns_paneled')

display(
    A.add_card_date_adjustment(
        eu_txns,
        dataset="mando",
        source_country_list=["uk_de_at"],
        amount_column="amount_eur"
    )
)

card_type

source_country

date_raw

date

DEBIT

uk_de_at

2019-02-19

2019-02-16

CREDIT

uk_de_at

2019-02-19

2019-02-19

etl_toolkit.A.add_card_day_of_week_lag_adjustment(df, threshold=84, dataset='yoda', transaction_type='debit', panel_ids=None, group_column_names=None, max_adjustment=5.0, use_central_adjustment=False, enable_paneling=False)[source]#

Add additional column(s) to an input dataframe, df, of card data that calculates adjustments to account for the merchant-specific lag of when transactions (txns) are processed. The lag addressed in this function is due to the day-of-week (“DOW”) a transaction falls on.

The additional columns added to the input df for this adjustment include:

  • txns: The number of transactions found within each grouping of the group_column_names, the dataset’s card type, the DOW of the transaction, and the number of days (lag) between the transaction and the max file date of the df. Int column type.

  • txns_percent_fill: Percentage of txns that have been observed by the above grouping, ordered by the lag calculation. Double column type.

  • lag_adjustment: Equal to inverse of the txns_percent_fill, this is the adjustment factor that should be multiplied to the transaction amount to account for the DOW lag behavior. Defaults to 1 if no lag is observed. Double column type.

For further context, certain merchants may have lag patterns that are different from the lag pattern observed when looking at the whole card dataset. For these merchants, using this lag adjustment functions allows you to calculate a more accurate lag adjustment.

  • For example in Skywalker, we have noticed transactions from merchants like Toast and Starbucks arrive faster than typical Skywalker transactions, which causes any central lag adjustment to consistently overestimate for these merchants.

  • This trend is not as prominent in Yoda, however, it is still preferred to use a merchant-specific lag adjustment.

Parameters:
  • df (pyspark.sql.DataFrame) – Input dataframe to add adjustment columns to through this function. It should be generated from the txns_all table from Skywalker or Yoda table and parsed for specific merchant(s).

  • threshold (int) – An integer indicating the number of days prior to the maximum file date to calculate the lag adjustment for each cobrand, card type, and any other grouping columns. This value must be between 63 and 126 days.

  • dataset (Literal['skywalker', 'yoda', 'mando']) – The source dataset that the input df is derived from. Can be either yoda or skywalker. Defaults to yoda.

  • transaction_type (Literal['debit', 'credit']) – The type of transactions that should be used to calculate the adjustment. Can be either credit or debit and is only applied when dataset="skywlaker". Defaults to debit.

  • panel_ids (Optional[list[str]]) – A list of strings indicated the Panel ID(s) (panel table names) that this adjustment should account for. Defaults to panel fixed_201901_100 for Yoda and fixed_201901_111_ex for Skywalker.

  • group_column_names (Optional[list[str]]) – An optional list of grouping columns to generate lag adjustments within. This can be used if the input dataframe spans multiple slices where the lag behavior can be different. Defaults to ["cobrid"] for Yoda and ["yipit_cobrand_id"] for Skywalker.

  • max_adjustment (float) – An upper bound value to apply for the final lag_adjustment column. Defaults to 5.0 , which indicates the adjustment value cannot be a factor greater than 5.

  • use_central_adjustment (bool) – Deprecated feature. An optional boolean to use the central lag adjustment value from the source dataframe. Using merchant-specific DOW adjustments is preferred. Defaults to False.

  • enable_paneling (bool)

Return type:

pyspark.sql.DataFrame

Tip

One of the arguments for this function is threshold, a couple of notes on this:

  • Lag behavior can change over time, if your merchant’s lag behavior has changed in the last 126 days, you may want to limit the threshold to a lower number.

  • It may be beneficial to alter the threshold, however, if you do, you should test several different thresholds and QA the results.

Examples#

Example of applying day of week adjustments to a Yoda dataset.#
from etl_toolkit import E, F, A

tost_txns = spark.table("yd_production.tost_silver.txns_paneled")

display(
    A.add_card_day_of_week_lag_adjustment(
        tost_txns,
        dataset="yoda",
    )
)

cobrid

cardtype

trans_dow

txns

txns_percent_fill

lag_adjustment

175

DEBIT

4

1154536

0.890645002553501

1.1227818009790382

174

DEBIT

5

null

null

1

Example of applying day of week adjustments to a Yoda dataset while setting a max_adjustment. Note how it establishes a ceiling for the lag_adjustment column.#
from etl_toolkit import E, F, A

tost_txns = spark.table("yd_production.tost_silver.txns_paneled")

display(
    A.add_card_day_of_week_lag_adjustment(
        tost_txns,
        dataset="yoda",
        max_adjustment=1.1,
    )
)

cobrid

cardtype

trans_dow

txns

txns_percent_fill

lag_adjustment

175

DEBIT

4

1154536

0.890645002553501

1.1

174

DEBIT

5

null

null

1

etl_toolkit.A.add_card_paneling(df, dataset='yoda', panel_ids=None, add_geo_weights=True, add_income_weights=True, add_card_type_weights=True, qa=False)[source]#

QA Mode support

Applies card paneling on the input dataframe (df) using the specified panel_ids (panel table names) and dataset (Skywalker, Yoda, or Mando). Transactions where the user is not a member of the panel will be filtered out unless qa=True. In addition, various weight columns will be added based on the panel to account for geography, income, and card type biases in the dataset. Currently only one panel ID can be provided.

Note

For Mando datasets, only basic user paneling is supported. Income weights, card type weights, and geography weights are not yet supported for Mando data.

A fully adjusted transaction amount should multiply geopgraphy, income, card type weights, and any other adjustments (ex: lag_adjustment) to the transaction amount.

The additional columns added to the input df for this adjustment include:

  • is_in_panel: This column indicates if the transaction’s user is a member of any of the Panel ID(s) specified . Boolean type.

  • geo_weight: This column includes the geo weight value for a given transaction given its panelist and panel ID. Double type. Only available for Skywalker and Yoda datasets.

  • income_weight: This column includes the income weight value for a given transaction given its panelist and panel ID. Double type. Only available for Skywalker and Yoda datasets.

  • card_type_weight: This column includes the card type weight value for a given transaction given its panelist and panel ID. Double type. Only available for Skywalker and Yoda datasets.

Parameters:
  • df (pyspark.sql.DataFrame) – Input dataframe to add adjustment columns to through this function. It should be generated from the txns_all table from Skywalker, Yoda or Mando table and parsed for specific merchant(s).

  • dataset (Literal['skywalker', 'yoda', 'mando']) – The source dataset that the input df is derived from. Can be either yoda, skywalker, or mando. Defaults to yoda.

  • panel_ids (Optional[list[str]]) – A list of strings indicated the Panel ID(s) (panel table names) that this adjustment should account for. Defaults to panel fixed_201901_100 for Yoda, fixed_201901_111_ex for Skywalker, and appropriate geographic panels for Mando.

  • add_geo_weights (bool) – Boolean flag to control if a geo_weight column should be added to the dataset. Only applicable for Skywalker/Yoda. Defaults to True.

  • add_income_weights (bool) – Boolean flag to control if an income_weight column should be added to the dataset. Only applicable for Skywalker/Yoda. Defaults to True.

  • add_card_type_weights (bool) – Boolean flag to control if a card_type_weight column should be added to the dataset. Only applicable for Skywalker/Yoda. Defaults to True.

  • qa (bool) – Boolean flag to control QA mode for this function. When True, then all rows are preserved and the is_in_panel column indicates if the transaction was in any of the Panel IDs specified. Defaults to False, where non-panel transactions are filtered out.

Return type:

pyspark.sql.DataFrame

Examples#

Example of applying card paneling to a Yoda dataset. Note the default Yoda panel is used here.#
from etl_toolkit import E, F, A

tost_txns = spark.table("yd_production.tost_silver.txns_parsed")

display(
    A.add_card_paneling(
        tost_txns,
        dataset="yoda",
    )
)

cobrid

cardtype

trans_dow

is_in_panel

geo_weight

income_weight

card_type_weight

175

DEBIT

4

True

0.6403014580204711

0.6190908069398377

2.9107223403194861

174

DEBIT

5

True

0.4155116960839209

0.6190908069398377

2.9107223403194861

Example of applying card paneling to a Yoda dataset when QA mode is enabled. Note that out of panel transactions are included.#
from etl_toolkit import E, F, A

tost_txns = spark.table("yd_production.tost_silver.txns_parsed")

display(
    A.add_card_paneling(
        tost_txns,
        dataset="yoda",
    )
)

cobrid

cardtype

trans_dow

is_in_panel

geo_weight

income_weight

card_type_weight

175

DEBIT

4

True

0.6403014580204711

0.6190908069398377

2.9107223403194861

174

DEBIT

5

True

0.4155116960839209

0.6190908069398377

2.9107223403194861

174

DEBIT

5

False

0.4155116960839209

0.6190908069398377

2.9107223403194861

Example of applying card paneling to a Yoda dataset with a specific Panel ID and not including card type weights.#
from etl_toolkit import E, F, A

tost_txns = spark.table("yd_production.tost_silver.txns_parsed")

display(
    A.add_card_paneling(
        tost_txns,
        dataset="yoda",
        panel_ids=["fixed_201901_333_cbs_green_red_teal"],
        add_card_type_weights=False,
    )
)

cobrid

cardtype

trans_dow

is_in_panel

geo_weight

income_weight

175

DEBIT

4

True

0.6403014580204711

0.6190908069398377

174

DEBIT

5

True

0.4155116960839209

0.6190908069398377

Example of applying card paneling to a Mando dataset. Note how only the is_in_panel column is added.#
from etl_toolkit import E, F, A

mando_txns = spark.table("yd_3p_mando.mando_gold.txns_parsed")

display(
    A.add_card_paneling(
        mando_txns,
        dataset="mando",
    )
)
etl_toolkit.A.add_card_paneling_reweighted(df, dataset='yoda', base_panel_id=None, weighting_panel_id=None, panel_overlap_start_date=date(2021, 1, 1), panel_overlap_end_date=date(2021, 12, 31), reweight_min=0.5, reweight_max=2.0)[source]#

Applies a base panel using A.add_card_paneling and then uses a weighted panel to generate weights for shifting merchant volumes by merchant. This combines two panels - a base panel for the main paneling and weights, and a weighting panel used to determine merchant reweighting factors.

Parameters:
  • df (pyspark.sql.DataFrame) – Input dataframe of transactions data

  • dataset (Literal['skywalker', 'mando', 'yoda']) – The source dataset the df belongs to

  • base_panel_id (Optional[str]) – The base panel ID for the dataset. If not provided the default panel is used based on the dataset.

  • weighting_panel_id (Optional[str]) – The weighting panel ID for the dataset. If not provided the default weighting panel is used based on the dataset.

  • panel_overlap_start_date (datetime.date) – The minimum date the provided panel IDs overlap, used to generate a fixed window to determine reweighting

  • panel_overlap_end_date (datetime.date) – The maximum date the provided panel IDs overlap, used to generate a fixed window to determine reweighting

  • reweight_min (float) – Overall minimum value for the reweight column

  • reweight_max (float) – Overall maximum value for the reweight column

Returns:

A dataframe of transactions for the given dataset, with paneling applied and a reweight column added to indicate the adjustment.

Return type:

pyspark.sql.DataFrame

Examples#

Example of applying card paneling reweighting with default panels#
from etl_toolkit import A

df = spark.table("yd_production.tost_silver.txns_parsed")

panel_df = A.add_card_paneling_reweighted(
    df,
    dataset="yoda",
)
Example of applying card paneling reweighting with custom panel IDs#
from etl_toolkit import A
from datetime import date

df = spark.table("yd_production.tost_silver.txns_parsed")

panel_df = A.add_card_paneling_reweighted(
    df,
    dataset="yoda",
    base_panel_id="fixed_201901_202203_100",
    weighting_panel_id="fixed_202101_202403_100",
    panel_overlap_start_date=date(2021,1,1),
    panel_overlap_end_date=date(2021,12,31)
)
etl_toolkit.A.get_card_panels(dataset='yoda')[source]#

Utility function to list all available Panel IDs for a given dataset. The Panel IDs can be used in the panel_ids argument for ETL toolkit card analyses.

Parameters:

dataset (Literal['skywalker', 'yoda']) – The dataset to retrieve available Panel IDs from. Can be either yoda or skywalker. Defaults to yoda.

Return type:

pyspark.sql.DataFrame

Examples#

Example of retrieving all Yoda Panel IDs#
from etl_toolkit import E, F, A

display(
    A.get_card_panels(
        dataset="yoda",
    )
)

panel_id

fixed_201701_100

fixed_201701_100_card_weights

Example of retrieving all Skywalker Panel IDs#
from etl_toolkit import E, F, A

display(
    A.get_card_panels(
        dataset="skywalker",
    )
)

panel_id

fixed_201701_333_ex

fixed_201701_333_ex_card_weights

etl_toolkit.A.source_card_transactions(dataset, start_date=None, end_date=None, transaction_types=None, group_column_names=None)[source]#

Returns a DataFrame of card transactions from the specified source dataset, with optional filtering for date ranges and transaction types. This function provides a standardized way to access card transaction data across different datasets while handling their unique schemas and storage patterns.

The function supports both single-table datasets (like Mando) where transaction types are distinguished by a column value, and multi-table datasets (like Skywalker/Yoda) where different transaction types are stored in separate tables.

Tip

When working with card transaction data, it’s recommended to use this function rather than directly accessing the source tables to ensure consistent filtering and proper handling of dataset-specific schemas.

Parameters:
  • dataset (Literal['skywalker', 'mando', 'yoda']) – The source dataset to retrieve transactions from. Must be one of “skywalker”, “mando”, or “yoda”.

  • start_date (Optional[datetime.date]) – Optional start date to filter transactions. If provided, only includes transactions on or after this date.

  • end_date (Optional[datetime.date]) – Optional end date to filter transactions. If provided, only includes transactions on or before this date.

  • transaction_types (Optional[List[str | CardTransactionType]]) – Optional list of transaction types to include. Can use either string values or CardTransactionType enum. Valid values are “transactions”, “deposits”, or “all”. Defaults to [“transactions”] if not specified. If “all” is included in the list, other types are ignored.

  • group_column_names (Optional[List[str]]) – Optional list of grouping columns to use for this dataset, overriding defaults

Returns:

A DataFrame containing the filtered card transactions from the specified dataset.

Raises:

InvalidInputException – If any validation checks fail

Return type:

pyspark.sql.DataFrame

Examples#

Basic usage to get recent Mando transactions#
from datetime import date
from etl_toolkit import A

# Get Mando transactions for January 2024
transactions_df = A.source_card_transactions(
    dataset="mando",
    start_date=date(2024, 1, 1),
    end_date=date(2024, 1, 31)
)
Get both transactions and deposits from Skywalker#
from etl_toolkit import A
from etl_toolkit.analyses.card.source import CardTransactionType

# Get all Skywalker transaction types using enum
all_activity_df = A.source_card_transactions(
    dataset="skywalker",
    transaction_types=[
        CardTransactionType.TRANSACTIONS,
        CardTransactionType.DEPOSITS
    ]
)

# Alternative using string values
all_activity_df = A.source_card_transactions(
    dataset="skywalker",
    transaction_types=["transactions", "deposits"]
)
Get all transaction types using the ALL type#
from etl_toolkit import A
from etl_toolkit.analyses.card.source import CardTransactionType

# Get all transaction types from Mando
all_df = A.source_card_transactions(
    dataset="mando",
    transaction_types=[CardTransactionType.ALL]
)

# Alternative using string value
all_df = A.source_card_transactions(
    dataset="mando",
    transaction_types=["all"]
)
Get only deposits from a specific date range#
from datetime import date
from etl_toolkit import A

# Get Yoda deposits for Q1 2024
deposits_df = A.source_card_transactions(
    dataset="yoda",
    start_date=date(2024, 1, 1),
    end_date=date(2024, 3, 31),
    transaction_types=["deposits"]
)
etl_toolkit.A.card_coverage_metrics(dataframe, metric_definitions, periodicity='QUARTER', time_period_grouping_columns=None, additional_grouping_columns=['ticker'], transaction_date_column='trans_date', time_period_ordering_column=None, normalize_by_days=False)[source]#

Calculates coverage ratios by comparing observed values to reported metrics across time periods. Coverage metrics help assess data completeness by measuring the difference between transactions captured in card data versus those reported in company filings.

The function adds coverage calculations and lag-based estimates: - Basic coverage ratios (observed/reported) - Period-over-period coverage changes - Lag-adjusted metrics accounting for historical patterns - Seasonal adjustments (for quarterly data) - Recent trend indicators (for daily data)

Parameters:
  • dataframe (pyspark.sql.DataFrame) – DataFrame containing transaction and reported metric data

  • metric_definitions (list[coverage_metric]) – List of coverage_metric objects defining metrics to calculate

  • periodicity (Literal['QUARTER', 'DAY']) – Analysis granularity, either “QUARTER” or “DAY”. Defaults to “QUARTER”

  • time_period_grouping_columns (Optional[list[Union[str, pyspark.sql.Column]]]) – Columns to group timestamps. Defaults to [“quarter”, “start_date”, “end_date”] for quarters, [“date”] for days

  • additional_grouping_columns (list[Union[str, pyspark.sql.Column]]) – Additional grouping dimensions (e.g. ticker). Defaults to [“ticker”]

  • transaction_date_column (Union[str, pyspark.sql.Column]) – Column containing transaction dates. Defaults to “trans_date”

  • time_period_ordering_column (Optional[Union[str, pyspark.sql.Column]]) – Column for ordering periods. Defaults to “start_date” or “date” based on periodicity

  • normalize_by_days (bool) – Whether to adjust for partial data periods. Defaults to False

Return type:

pyspark.sql.DataFrame

Examples#

Calculate quarterly coverage metrics with base metrics#
from etl_toolkit import F, A

# Define metrics to analyze
metrics = [
    A.coverage_metric(
        name='revenue_total',
        observed_column_name='adj_trans_amount',
        reported_column_name='reported_total'
    )
]

quarterly_coverage = A.card_coverage_metrics(
    transactions_df,
    metric_definitions=metrics,
    periodicity='QUARTER'
)

display(quarterly_coverage)
Calculate daily coverage with normalization and channel splits#
from etl_toolkit import F, A

metrics = [
    A.coverage_metric(
        name='revenue_total',
        observed_column_name='adj_trans_amount',
        reported_column_name='reported_total'
    ),
    A.coverage_metric(
        name='revenue_online',
        observed_column_name='adj_trans_amount',
        reported_column_name='reported_online',
        filter=F.col('channel')=='ONLINE'
    )
]

daily_coverage = A.card_coverage_metrics(
    transactions_df,
    metric_definitions=metrics,
    periodicity='DAY',
    normalize_by_days=True
)

display(daily_coverage)
etl_toolkit.A.coverage_metric()[source]#

Defines a metric for calculating coverage ratios between observed and reported values.

Parameters:
  • name – Name of the metric (must be alphanumeric/lowercase/underscore)

  • observed_column_name – Column containing the observed values to measure

  • reported_column_name – Column containing the reported values to compare against

  • filter – Optional filter condition to apply before aggregation

Examples#

from etl_toolkit import F, A

# Basic metric definition
revenue_metric = A.coverage_metric(
    name='revenue_total',
    observed_column_name='adj_trans_amount',
    reported_column_name='reported_total'
)

# Metric with filter condition
online_revenue = A.coverage_metric(
    name='revenue_online',
    observed_column_name='adj_trans_amount',
    reported_column_name='reported_online',
    filter=F.col('channel')=='ONLINE'
)

E-Receipt Analyses#

etl_toolkit.A.source_ereceipts(receipt_type, vendor_list=None, country=None, include_duplicates=False, granularity=None, add_app_id=False, add_demo_columns=False)[source]#

Fetch a DataFrame of Edison e-receipts with filtering and transformations. This function standardizes e-receipts access, ensuring consistency across modules.

Tip

Use this function instead of directly accessing source tables for consistent filtering and schema handling.

Parameters:
  • receipt_type (Literal['clean_headers', 'purchase', 'flights', 'hotels', 'rental_cars', 'rides', 'subscriptions', 'walmart']) – Type of receipt data to retrieve. Supported types: - clean_headers - purchase - subscriptions - rental_cars - hotels - flights - rides - walmart

  • vendor_list (list[str]) – List of vendors to filter. Must be non-empty.

  • country (Optional[Literal['us', 'intl', 'ca', 'all', 'US', 'INTL', 'CA', 'ALL']]) – Optional. Valid values: ALL, US, CA, INTL. Default varies by type.

  • include_duplicates (Optional[bool]) – Include duplicate entries. Default: False. Some types disallow duplicates.

  • granularity (Literal['orders', 'items']) – Specify granularity level. Valid: items, orders. Default depends on receipt type.

  • add_app_id (Optional[bool]) – Include application ID. Default: False. Supported only for certain receipt types.

  • add_demo_columns (Optional[bool]) – Add demographic columns. Default: False.

Returns:

Filtered e-receipts DataFrame.

Raises:

InvalidInputException – On validation failure.

Return type:

pyspark.sql.DataFrame

### Examples#

#### Clean Headers Retrieve receipts for specific vendors in the US:

clean_headers_df = A.source_ereceipts(
    receipt_type="clean_headers",
    vendor_list=["Uber", "Walmart"], # at least 1 vendor required
    country="ALL" # defaults to ALL for clean headers
)

#### Including Duplicates Allow duplicates (if supported):

purchase_df = A.source_ereceipts(
    receipt_type="purchase",
    vendor_list=["Walmart"],
    country="US" # defaults to US for purchase
    include_duplicates=True # defaults to False for all receipt types
)

#### Adjusting Granularity Retrieve order-level data:

subscriptions_df = A.source_ereceipts(
    receipt_type="purchase",
    vendor_list=["UberEats"],
    granularity="orders" # defaults to "items" for purchase
)

#### Adding Extra Columns Include app IDs and user demographic columns:

purchase_with_details = A.source_ereceipts(
    receipt_type="purchase",
    vendor_list=["Walmart"],
    add_app_id=True,
    add_demo_columns=True
)

Validation Rules#

### Receipt Type-Specific Behavior

+-------------------+-----------------+------------+-----------------+--------+--------------+
| Receipt Type      | country         | duplicates | granularity     | app ID | demo columns |
+-------------------+-----------------+------------+-----------------+--------+--------------+
| clean_headers     | ALL             | No         | N/A             | No     | Yes          |
+-------------------+-----------------+------------+-----------------+--------+--------------+
| purchase          | US              | Yes        | items, orders   | Yes    | Yes          |
+-------------------+-----------------+------------+-----------------+--------+--------------+
| subscriptions     | ALL             | No         | items, orders   | Yes    | Yes          |
+-------------------+-----------------+------------+-----------------+--------+--------------+
| rental_cars       | ALL             | Yes        | orders          | No     | Yes          |
+-------------------+-----------------+------------+-----------------+--------+--------------+
| hotels            | ALL             | Yes        | orders          | No     | Yes          |
+-------------------+-----------------+------------+-----------------+--------+--------------+
| flights           | ALL             | Yes        | items           | No     | Yes          |
+-------------------+-----------------+------------+-----------------+--------+--------------+
| rides             | ALL             | No         | items, orders   | No     | Yes          |
+-------------------+-----------------+------------+-----------------+--------+--------------+
| walmart           | ALL             | Yes        | items, orders   | No     | Yes          |
+-------------------+-----------------+------------+-----------------+--------+--------------+

Dedupe Analyses#

Suite of functions to handle deduplication operations on dataframes. The returned output will be a dataframe with unique rows based on some condition. These functions can be easier to read/write than trying to handle deduping in native pyspark.

etl_toolkit.analyses.dedupe.dedupe_by_condition(df, condition, qa=False)[source]#

QA Mode support

Function that applies a generic deduplication transformation where rows will be excluded if the supplied condition does not equal the expected_value. The expected value is 1 as a common use case is to dedupe based on a row number over a custom window expression.

If qa=True, then all rows will be preserved, and a dedupe_index column matching the condition will be added to the returned dataframe. This can be useful when investigating the behavior of deduplication for QA purposes.

Parameters:
  • df (pyspark.sql.DataFrame) – DataFrame to deduplicate

  • condition (str | pyspark.sql.Column) – A pyspark Column or string that should return a 1 if the row is meant to be preserved. If a string is provided, it is resolved as a Column.

  • qa (bool) – Boolean flag to control QA mode for this function. When True, then all rows are preserved and a dedupe_index column is added to the dataframe that stores the value of the condition. If False, rows that with a condition value that does not equal 1 are dropped.

Return type:

pyspark.sql.DataFrame

Dedupe by taking the first value of rows for each color based on the date column. Note that we use a Window expression to define the condition.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 1000, "color": "red", "date": date(2023, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])

window = W.partitionBy("color").orderBy("date")

display(
    A.dedupe_by_condition(
        df,
        condition=F.row_number().over(window),
    )
)

color

date

output

blue

2023-01-01

50

red

2023-01-01

1000

When using QA mode, the function will include all rows and a dedupe_index indicating the value of the condition.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 1000, "color": "red", "date": date(2023, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])

window = W.partitionBy("color").orderBy("date")

display(
    A.dedupe_by_condition(
        df,
        condition=F.row_number().over(window),
        qa=True,
    )
)

color

date

output

dedupe_index

blue

2023-01-01

50

1

red

2023-01-01

1000

1

red

2023-01-01

100

2

etl_toolkit.analyses.dedupe.dedupe_by_row_number(df, dedupe_columns, order_columns, qa=False)[source]#

QA Mode support

Function that dedupes rows from the supplied df by dropping rows that are sorted and the row number != 1. The row ordering is handled within each window partition of the dedupe_columns and then sorted in order of order_columns (i.e. the columns produce a window expression F.row_number().over(W.partitionBy(*dedupe_columns).orderBy(*order_columns))).

If qa=True, then all rows will be preserved, and a dedupe_index column indicating the row number will be added to the returned dataframe. This can be useful when investigating the behavior of deduplication for QA purposes.

Caution

The deduplication logic in this function uses F.row_number from pyspark. If a different ranking function is preferred, for ex. F.rank or F.dense_rank, use A.dedupe_by_condition with a custom window expression.

Parameters:
  • df (pyspark.sql.DataFrame) – DataFrame to deduplicate

  • dedupe_columns (list[str | pyspark.sql.Column]) – A list of pyspark columns or strings that define the window partitions of the dataframe to sort the rows by. In most cases, the this will be the columns that define a row’s uniqueness. If strings are provided, they will be resolved as Columns.

  • order_columns (list[str | pyspark.sql.Column]) – A list of pyspark columns or strings that sorts rows within each window partition of the dataframe. This sorting will determine the row_number. Be explicit in sorting ascending or descending when specifying these columns. If strings are passed they are resolved as Columns.

  • qa (bool) – Boolean flag to control QA mode for this function. When True, then all rows are preserved and a dedupe_index column is added to the dataframe that stores the row_number. If False, rows with row numbers that do not equal 1 are dropped.

Return type:

pyspark.sql.DataFrame

Examples#

Dedupe by taking the first value of rows for each color based on the date column#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 1000, "color": "red", "date": date(2023, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])

display(
    A.dedupe_by_row_number(
        df,
        dedupe_columns=["color"],
        order_columns=["date"],
    )
)

color

date

value

blue

2023-01-01

50

red

2023-01-01

1000

When using the QA mode all rows are included and a dedupe_index column is added that indicates the row number.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 1000, "color": "red", "date": date(2023, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])

display(
    A.dedupe_by_row_number(
        df,
        dedupe_columns=["color"],
        order_columns=["date"],
        qa=True,
    )
)

color

date

value

dedupe_index

blue

2023-01-01

50

1

red

2023-01-01

1000

1

red

2023-01-01

100

2

Dedupe by taing the last value of rows for each color based on the date column. Note that implementing descending order is via the F.desc function in pyspark.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 1000, "color": "red", "date": date(2023, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])

display(
    A.dedupe_by_row_number(
        df,
        dedupe_columns=["color"],
        order_columns=[F.desc("date")],
    )
)

color

date

value

blue

2023-01-01

50

red

2024-01-01

100

Index Analyses#

Suite of functions to generate index metrics that can reveal the trajectory of a company KPI without directly benchmarking to it. These index columns can be tricky to calculate in native pyspark, so these functions can assist in the standardization of how those are calculated.

etl_toolkit.A.index_from_rolling_panel(input_df, start_date, date_column, metrics, slices=None, panel_periodicity='MONTH', index_periodicity='DAY', panel_end_date_column='panel_end_date', user_create_date_column='create_time', spark=None)[source]#

Returns a dataframe that maps the input dataframe to a series of indices for each metric provided. The indices are calculated through an aggregate function, a base value, and a rolling panel methodology, where the panel is fixed for 2 consecutive periods. The period length can be configured using panel_periodicity.

Indices can be broken down across various slices to visualize trends on key segments of the dataset.

Multiple metrics can be specified, these will be available as different columns in the output dataframe.

Note

This function is expected to work with Edison (e-receipt) derived dataframes. The rolling panel methodology is designed around the fluctuating behavior of the Edison user panel.

Parameters:
  • input_df (pyspark.sql.DataFrame) – The input dataframe to use to generate a series of indices.

  • start_date (datetime.datetime | datetime.date) – The minimum start date of the input_df, all metrics will be filtered on or after this date.

  • date_column (pyspark.sql.Column | str) – A Column or string that indicates the date column of the input_df to use in paneling and index logic. If a string is used, it is referenced as a Column.

  • metrics (dict[str, dict]) – A dict of various metric configurations to generate indices. An index will be created for each metric defined in this dictionary. Each metric config must have an aggregation Column and a start_value defined for the index.

  • slices (list[str | pyspark.sql.Column]) – Optional list of string or columns that define the slices with in the dataframe. If slices are used, the indices generated will be for each slice within the input_df. If strings are used in the list they are resolved as Columns.

  • panel_periodicity (Literal['WEEK', 'MONTH', 'QUARTER']) – The unit of length for the inverval of each paneling window.

  • index_periodicity (Literal['DAY', 'WEEK', 'MONTH', 'QUARTER']) – The unit of length for the inverval of each indexing window. A row will be generated for each period based on this argument.

  • panel_end_date_column (pyspark.sql.Column | str) – The Column to use when determing the exit date of a user from the panel in the rolling panel methodology.

  • user_create_date_column (pyspark.sql.Column | str) – The Column to use when determing the state date of a user in the panel in the rolling panel methodology.

  • spark (pyspark.sql.SparkSession) – Optional argument to pass in a spark session to this function. Normally this is not used and a spark session is generated automatically. This is usually used for library developers.

Return type:

pyspark.sql.DataFrame

Examples#

Example of a daily index creation for Lyft. This creates two indices, rides and total_charges based on different aggregations. The paneling is a rolling-panel based on a weekly interval.#
from etl_toolkit import E, A, F

source_df = spark.table(f"{DESTINATION_CATALOG}.{SILVER_DATABASE}.deduped_receipts")

index_df = A.index_from_rolling_panel(
    source_df,
    start_date=date(2017, 1, 2),
    date_column="adjusted_order_date",
    metrics= {
        "rides": {
            "calculation": F.sum("geo_weight"),
            "start_value": 13.471719625795382,
        },
        "total_charges": {
            "calculation": F.sum(total_charges),
            "start_value": 13.471719625795382,
        },
    },
    panel_periodicity="WEEK",
    index_periodicity="DAY",
)

Investor Standard Analyses#

Suite of functions to generate common analyses used for investor research. It is recommended to use these functions as they follow specific standards for research reports and integrate with standard visualizations for charts to simplify publishing workflows.

etl_toolkit.A.revenue_mix_by_income_bucket(df, revenue_column, date_column, income_bucket_column='yd_income_bucket1', income_weight_column=None, apply_income_weighting=False, periodicity='MONTH', trailing_period=None, interval_period=None, include_growth_rates=False, start_date=None, start_day_of_week='SUNDAY', end_day_of_week=None, calendar=None, spark=None)[source]#

Returns a dataframe of revenue total and percent of total calculated by income_bucket_column and the specified periodicity. Can optionally include trailing period calcuations for revenue by income bucket and/or 1-year growth rates for revenue metrics. The input data can also be weighted for income if that was not already applied.

Parameters:
  • df (pyspark.sql.DataFrame) – Input dataframe that should be used to generate revenue calculations. The dataframe must include the remaining column arguments for this function.

  • revenue_column (str | pyspark.sql.Column) – Numeric type column on the df to use to calculate revenue totals by income bucket.

  • date_column (str | pyspark.sql.Column) – Date type column on the df to use to aggregate revenue by the relevant periodicity.

  • income_bucket_column (str | pyspark.sql.Column) – Column on the df that indicates the income bucket for the given row. Revenue mix will be based on grouping by this column.

  • income_weight_column (Optional[str | pyspark.sql.Column]) – Optional column on the df that indicates the income weighting for a panelized dataset. If specified, the income weight will be used to adjust the revenue_column before aggregating by period and income bucket.

  • apply_income_weighting (bool) – When True, the income_weight_column is used to adjust the revenue_column before aggregating by period and income bucket. When False, the revenue_column is directly aggregated by period and income bucket. Default is False.

  • periodicity (Literal['WEEK', 'MONTH', 'QUARTER']) – The time resolution to group revenue data on. When specified, the date_column will be rounded to the nearest matching period to aggregate revenue on that period + income bucket.

  • trailing_period (Optional[int]) – Optionally specified as an integer. When specified, additional revenue metrics will be added to the output dataframe that equals trailing n-period average revenue by period and income bucket. The addtional columns will be prefixed by the trailing period used, ex: t3m_revenue for trailing 3-month aggregations.

  • interval_period (Optional[int]) – Optionally specified as an integer. When specified the first time period and every n-time periods after will be included in the output dataframe. This can be used to display data more effectively for trailing periods, where every nth month is desired in the output.

  • start_date (Optional[datetime.date | datetime.datetime]) – Optional start date to filter the date_column on before aggregating revenue. The start_date should be consistent with the start_day_of_week.

  • start_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – When periodicity='WEEK', this parameter will ensure periods are aggregated on 7-day intervals that start on this specified day of the week. This parameter should not be specified if end_day_of_week is specified as it is duplicative.

  • end_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – When periodicity='WEEK', this parameter will ensure periods are aggregated on 7-day intervals that end on this specified day of the week. This parameter should not be specified if start_day_of_week is specified as it is duplicative.

  • spark (pyspark.sql.SparkSession) – Spark session to use. Generally, this is not needed as the session is automatically generated in databricks. It is used by library developers.

  • include_growth_rates (bool)

Return type:

pyspark.sql.DataFrame

Examples#

Generate revenue mix by week and income bucket. Notice how the time_period column indicates the end of the week.#
from etl_toolkit import E, A, F

# cmg_df would be the yoda transactions for CMG
df = A.revenue_mix_by_income_bucket(
    cmg_df,
    revenue_column="rev",
    date_column="date",
    start_date=date(2021, 1, 3),
    periodicity="WEEK",
    start_day_of_week="SUNDAY",
    income_bucket_column="yd_income_bucket2",
)

display(df)

time_period

yd_income_bucket

revenue

income_bucket_revenue_percent

2021-01-09

High ($150k+)

22127.71

0.2071751921632703

2021-01-09

Low ($0-60k)

38568.29

0.3611035438424529

2021-01-09

Lower-Middle ($60-100k)

25451.31

0.2382930843839744

2021-01-09

Upper-Middle ($100-150k)

20659.43

0.1934281796103024

2022-01-16

High ($150k+)

22187.53

0.2092824865506127

2022-01-16

Low ($0-60k)

38352.40

0.3617565842338732

2022-01-16

Lower-Middle ($60-100k)

24908.72

0.2349499211492873

2022-01-16

Upper-Middle ($100-150k)

20568.49

0.1940110080662268

Aggregate by monthly periods and income bucket, while also including trailing 3-month metrics. Trailing month metrics will be null for earlier periods where not enough periods have elapsed to make the calculation. This also includes weighting the revenue data by income weight by specifying the income_weight_column argument.#
from etl_toolkit import E, A, F

# cmg_df would be the yoda transactions for CMG
df = A.revenue_mix_by_income_bucket(
    cmg_df,
    revenue_column="rev",
    date_column="date",
    start_date=date(2021, 1, 1),
    periodicity="MONTH",
    income_bucket_column="yd_income_bucket2",
    income_weight_column="income_weight",
    apply_income_weighting=True,
    trailing_period=3,
)

display(df)

time_period

yd_income_bucket

revenue

income_bucket_revenue_percent

trailing_revenue

trailing_income_bucket_revenue_percent

2021-01-01

High ($150k+)

95083.00

0.2085046527102458

null

null

2021-01-01

Low ($0-60k)

163845.34

0.3592915276434441

null

null

2021-01-01

Lower-Middle ($60-100k)

108469.64

0.2378598260263688

null

null

2021-01-01

Upper-Middle ($100-150k)

88625.41

0.1943439936199410

null

null

2021-02-01

High ($150k+)

83032.50

0.2146970928471732

null

null

2021-02-01

Low ($0-60k)

136581.27

0.3531580923512764

null

null

2021-02-01

Lower-Middle ($60-100k)

92097.17

0.2381355876897015

null

null

2021-02-01

Upper-Middle ($100-150k)

75031.62

0.1940092271118489

null

null

2021-03-01

High ($150k+)

104205.85

0.2085999518166622

94107.12

0.2103242607554216

2021-03-01

Low ($0-60k)

180829.90

0.3619864599887388

160418.84

0.3585273177006854

2021-03-01

Lower-Middle ($60-100k)

118999.91

0.2382147830825141

106522.24

0.23807137636607104

2021-03-01

Upper-Middle ($100-150k)

95513.13

0.1911988051120848

86390.05

0.19307704517782195

Investor Reporting Analyses#

Suite of functions specific to the investor reporting process. It is recommended to use these functions as they follow specific standards for customer-facing assets and reduce time spent on otherwise manual processes.

etl_toolkit.A.earnings_results_from_gsheet(gsheet_file_id, sheet_id, additional_columns=None, calculate_va_yy_growth=False, yy_growth_tickers=None)[source]#

Returns a dataframe of Earnings Results data retrieved from the specified Gsheet. The dataframe follows the standard Earnings Results schema for Investor reporting.

Warning

When using this function, the Gsheet will need to be permissioned correctly to be read from databricks and has the correct sheet columns and values to be compatible the standard schema.

Parameters:
  • gsheet_file_id (str) – The Gsheet file ID that contains earnings results. Can be found in the Gsheet URL.

  • sheet_id (int) – The Gsheet sheet (tab) ID that contains earnings results. Can be found in the Gsheet URL.

  • additional_columns (Optional[list[str | pyspark.sql.Column]]) – Additional columns to add to the end of the Earnings results schema. Not typically used, and the default is for no additional columns to be added.

  • calculate_va_yy_growth (bool) – Flag to control whether Y/Y growth rates should be calculated from VA nominal values. Used if the metric being reported is a growth rate while VA publishes actuals.

  • yy_growth_tickers (list[str]) – List of tickers that when calculate_va_yy_growth=True, these will use a the calculated Y/Y growth value from VA nominals. All other tickers will use the VA value as-is.

Return type:

pyspark.sql.DataFrame

Examples#

Generating an Earnings Results dataframe#
from etl_toolkit import E, A, F

EARNINGS_FILE_ID = "1w6cXoj6TaAt5WA8gm29DvYg4PNkiwyT4A219g6JS2p8"
EARNINGS_SHEET_ID = 1232299054

df = A.earnings_results_from_gsheet(EARNINGS_FILE_ID, EARNINGS_SHEET_ID)

display(df)

quarter_start

quarter_end

last_updated

2024-01-01

2024-03-31

2024-09-18

etl_toolkit.A.earnings_rankings_from_gsheet(gsheet_file_id, sheet_id)[source]#

Returns a dataframe of Earnings Rankings data retrieved from the specified Gsheet. The dataframe follows the standard Earnings Rankings schema for Investor reporting.

In addition, will automatically add a va_consensus_value based on the values for the va_metric_id and quarter_end columns of the input dataframe.

Warning

When using this function, the Gsheet will need to be permissioned correctly to be read from databricks and has the correct sheet columns and values to be compatible the standard schema.

Parameters:
  • gsheet_file_id (str) – The Gsheet file ID that contains the narrative rankings data. Can be found in the Gsheet URL.

  • sheet_id (int) – The Gsheet sheet (tab) ID that contains the narrative rankings data. Can be found in the Gsheet URL.

Return type:

pyspark.sql.DataFrame

Examples#

Generating a Earnings Rankings dataframe#
from etl_toolkit import E, A, F

EARNINGS_RANKINGS_FILE_ID = "1w6cXoj6TaAt5WA8gm29DvYg4PNkiwyT4A219g6JS2p8"
EARNINGS_RANKINGS_SHEET_ID = 2006888117

df = A.earnings_rankings_from_gsheet(EARNINGS_RANKINGS_FILE_ID, EARNINGS_RANKINGS_SHEET_ID)

display(df)

quarter_start

quarter_end

narrative_ranking

2024-01-01

2024-03-31

Green

etl_toolkit.A.earnings_results_with_backtests(df, backtest_configurations=None, calculate_va_yy_growth=False, yy_growth_tickers=None)[source]#

Returns a dataframe combining a summary of earnings results with backtests for KPIs using this function. The dataframe follows the standard Backtest + Earnings Results schema for Investor reporting.

Parameters:
  • df (pyspark.sql.DataFrame) – The Earnings Results Dataframe that the backtests correspond to w.r.t. Product, ticker, and KPI names. It is recommended that this is a dataframe of the table created from the A.earnings_results_from_gsheet

  • backtest_configurations (Optional[list[BackTestConfiguration]]) – An optional list of backtest configurations that will populate the output dataframe. The configurations should be created via A.backtest_configuration. While typically backtests are specified, if no backtests are provided, the output table will pass through the earnings results dataframe with the standard schema for backtests.

  • calculate_va_yy_growth (bool)

  • yy_growth_tickers (list[str])

Return type:

pyspark.sql.DataFrame

Examples#

Generating a Earnings Rankings with Backtests dataframe. Note that this also uses the A.backtest_configuration function.#
from etl_toolkit import A, E, F

# Earnings results from Gsheets
EARNINGS_FILE_ID = "1w6cXoj6TaAt5WA8gm29DvYg4PNkiwyT4A219g6JS2p8"
EARNINGS_SHEET_ID = 1232299054

# Define backtests to include in output Datafame
BACKTESTS = [
    A.backtest_configuration(
        df=sample_backtest_df,
        primary_dataset="skywalker",
        secondary_dataset=None,
        panel_used="202101_100",
        methodology_notes="Uses geo weights and card type weights, yoy coverage change",
        metric_type="backtest",
    ),
    A.backtest_configuration(
        df=sample_backtest_df,
        primary_dataset="yoda",
        secondary_dataset=None,
        panel_used="202101_100",
        methodology_notes="Uses geo weights and card type weights, yoy coverage change",
        metric_type="qa",
    ),
]

df = A.earnings_results_with_backtests(
    spark.table("yd_production.tgt_gold.earnings_results"),
    BACKTESTS
)
display(df)

quarter_start

quarter_end

metric_type

2024-01-01

2024-03-31

published

2023-10-01

2023-12-31

backtest

2023-10-01

2023-12-31

qa

etl_toolkit.A.backtest_configuration(df, primary_dataset, panel_used, methodology_notes, secondary_dataset=None, metric_type='backtest')[source]#

Use this function to define backtest configurations that are used in the investor reporting process. A list of backtest_configurations can then be added to earnings results data using A.earnings_results_with_backtests.

Note

This function should not be used on its own. It’s not a standard analysis function that returns a dataframe. Instead, it defines a configuration that can be supplied to A.earnings_results_with_backtests.

Parameters:
  • df (DataFrame) – A backtest dataframe that contains the results of the back test for each prior publishing quarter. This can be obtained by filtering a quarterly_accuracy for metric_type="backtest" and status="active".

  • primary_dataset (str) – The primary dataset used for this KPI backtest. For ex: skywalker or yoda.

  • panel_used (str) – The corresponding Panel ID for the datasets used in this KPI backtest. For ex: 202101_100.

  • methodology_notes (str) – Any relevant information about the methodology for this backtest, examples include adjustments, coverage changes, weights, etc.

  • panel_used – The corresponding Panel ID for the datasets used in this KPI backtest. For ex: 202101_100.

  • secondary_dataset (Optional[str], default: None) – An optional, secondary dataset used for this KPI backtest. For ex: skywalker or yoda. Can leave None if not used. Default is None.

  • metric_type (Literal['backtest', 'qa'], default: 'backtest') – The type of KPI Backtest, can be backtest or qa. Default is backtest.

Return type:

None

Examples#

Create a list of backtest_configurations for use in A.earnings_results_with_backtests.#
from etl_toolkit import A, E, F

sample_backtest_df = (
    spark.table("yd_production.tgt_analysts.tgt_quarterly_accuracy")
    .where(F.col("metric_type") == 'backtest')
    .where(F.col("status") == 'active')
)

backtests = [
    A.backtest_configuration(
        df=sample_backtest_df,
        primary_dataset="skywalker",
        secondary_dataset=None,
        panel_used="202101_100",
        methodology_notes="Uses geo weights and card type weights, yoy coverage change",
        metric_type="backtest",
    ),
]
etl_toolkit.A.add_unified_consensus_column(df, calculate_va_yy_growth=False, yy_growth_tickers=None)[source]#

Helper function to add va_consensus_value column to the input dataframe that represents the quarterly VA estimate and is used in reporting processes.

The column is introduced via a join so that we resolve differences in how VA reports quarterly estimates The join logic handles the following adjustments: - Differing quarter ends between the company’s actual fiscal periods and what VA provides - Normalization of growth rates represented as a percentage between 0 to 100 - Growth rate calculations when the reported metric is a growth rate while VA publishes nominal values

Parameters:
  • df (DataFrame) – Input DataFrame of quarterly earnings data. Must have a va_metric_id column and a quarter_end column to perform the join.

  • calculate_va_yy_growth (bool, default: False) – Flag to control whether Y/Y growth rates should be calculated from VA nominal values. Used if the metric being reported is a growth rate while VA publishes actuals.

  • yy_growth_tickers (list[str], default: None) – List of tickers that when calculate_va_yy_growth=True, these will use a the calculated Y/Y growth value from VA nominals. All other tickers will use the VA value as-is.

Ordering Analyses#

Suite of functions to handle re-arranging dataframes columns and rows for common scenarios. These are helpful to use as they will be optimized for performance and/or avoid duplicating columns.

etl_toolkit.A.shift_columns(df, priority_columns)[source]#

Utility transformation that shifts any specified columns of the dataframe to the beginning of the schema This can be useful for performance optimization (ex: improving join performance on key columns and clustering).

The re-ordered olumns can include new columns that don’t already exist on the dataframe. These will be added in addition to any existing columns in the order of the priority_columns list

Parameters:
  • df (pyspark.sql.DataFrame) – The dataframe to reorder columns on

  • priority_columns (list[str | pyspark.sql.Column]) – A list of Column(s) or string(s) to shift to the beginning of the schema. If strings are provided, they will be resolved as Columns.

Return type:

pyspark.sql.DataFrame

Examples#

Re-order columns by specifying which columns should come at the beginning of the schema. All remaining columns will preserve the original order of the dataframe.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"color": "red", "value": 1000, "date": date(2023, 1, 1)},
    {"color": "blue", "value": 50, "date": date(2023, 1, 1)},
])

display(
    A.shift_columns(
        df,
        priority_columns=["date"],
    )
)

date

color

value

2023-01-01

red

1000

2023-01-01

blue

50

New derived columns can be introduced in the priority_columns list. These must be derived used other columns of the provided dataframe.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"color": "red", "value": 1000, "date": date(2023, 1, 1)},
    {"color": "blue", "value": 50, "date": date(2023, 1, 1)},
])

display(
    A.shift_columns(
        df,
        priority_columns=["date", F.upper("color").alias("label")],
    )
)

date

label

color

output

2023-01-01

RED

red

1000

2023-01-01

BLUE

blue

50

Parser Analyses#

Suite of functions to filter and extract important values from string columns to enrich dataframes. These are powerful functions to simplify and organize complex regex or conditional logic that are usually the critical first steps of product pipelines to filter out unrelated data.

etl_toolkit.A.parse_records(df, configs, qa=False, include_non_matches=False)[source]#

QA Mode support

Takes the input df and returns an output dataframe filtered by the provided configs, as each config is mapped to a parser_id column that is added to the output dataframe. Rows are filtered if:

  • they match the inclusion cases of any parser

  • they do not match the exclusion case of the corresponding matched parser.

If qa=True, then the exclude_any argument for each parser will not drop records. Instead, those columns will be included with a parser_exclusion_case column added to flag which exclude criteria matches that record.

Additionally, if qa=True, then the include_any argument for each parser will be used to populate the parser_inclusion_case column added to the output df. However, this is in addition to the include_any option filtering records.

If include_non_matches=True, the parser will not filter out any records that were not able to match.

If qa=True and include_non_matches=True, the function allows multiple parser matches per record. In this case:

  • The parser_id and tag columns become arrays, potentially containing multiple parser IDs.

  • The parser_inclusion_case and parser_exclusion_case columns become arrays of structs, each containing the parser_id and the corresponding inclusion or exclusion case.

Parameters:
  • df (pyspark.sql.DataFrame) – Input dataframe that is filtered and enriched by the specified parser configs.

  • configs (list[parser]) – List of A.parser configs to use in parsing the df. Parsers are evaluated in order and the first matching parser is used for a given record.

  • qa (bool) – Boolean flag to control QA mode for this function. When True, then all rows are preserved and parser_inclusion_case + parser_exclusion_case columns are added to the dataframe that indicate the specific case that caused the parser to match. If False, any rows that don’t match any of the inclusion cases or match any of the exclusion cases are removed.

  • include_non_matches (bool) – Boolean flag to control if unmatched records are excluded from the output dataframe. Unmatched records are included when this flag is True.

Return type:

pyspark.sql.DataFrame

Examples#

Example of using A.parse_records to filter data based on a list of A.parsers and their conditions. Notice how only matched records are included and the parser_id column indicates which parser was matched.#
from etl_toolkit import E, F, A, W
from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])

display(
    A.parse_records(
        df,
        configs=[
            A.parser(
                parser_id="red_parser",
                include_any=[
                    F.col("color") == "red",
                ],
            )
        ]
    )
)

color

date

value

parser_id

red

2024-01-01

100

red_parser

Example of using A.parse_records to filter data on both inclusion and exclusion criteria.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])

display(
    A.parse_records(
        df,
        configs=[
            A.parser(
                parser_id="red_parser",
                include_any=[
                    F.col("date") >= date(2024, 1, 1),
                ],
                exclude_any=[
                    F.col("color") == "blue"
                ]
            )
        ]
    )
)

color

date

value

parser_id

red

2024-01-01

100

red_parser

Example of how multipler parsers can be used. Notice how if a record satisfies any of the included parsers it will be included, and the associated parser_id is present in the output row.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])

display(
    A.parse_records(
        df,
        configs=[
            A.parser(
                parser_id="red_parser",
                include_any=[
                    F.col("color") == "red"
                ],
            ),
            A.parser(
                parser_id="blue_parser",
                include_any=[
                    F.col("color") == "blue"
                ],
            ),
        ],
    )
)

color

date

value

parser_id

red

2024-01-01

100

red_parser

blue

2024-01-02

50

blue_parser

Example of how QA mode works. Notice how the inclusion and exclusion cases apply to each matched parser_id. The values of these additional columns match the positional index of includes_any or excludes_any arguments for the matching parser if they are lists. If dicts are used for these arguments, then the matching key will be used as the value. These columns pinpoint the logic responsible for a records being included/excluded from the dataframe.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])

display(
    A.parse_records(
        df,
        configs=[
            A.parser(
                parser_id="red_parser",
                include_any=[
                    F.col("date") >= date(2024, 1, 1),
                ],
                exclude_any=[
                    F.col("color") == "blue"
                ]
            )
        ],
        qa=True,
    )
)

color

date

value

parser_id

red

2024-01-01

100

red_parser

Example of how the include_non_matches flag works. Notice how unmatched records are preserved in the dataframe with a parser_id column value of NULL.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])

display(
    A.parse_records(
        df,
        configs=[
            A.parser(
                parser_id="red_parser",
                include_any=[
                    F.col("date") >= date(2024, 1, 1),
                ],
                exclude_any=[
                    F.col("color") == "blue"
                ]
            )
        ],
        include_non_matches=True,
    )
)

color

date

value

parser_id

red

2024-01-01

100

red_parser

blue

2024-01-02

50

null

Example of how the QA mode works with include_non_matches. Notice how records can match multiple parsers and show multiple inclusion/exclusion cases.#
from etl_toolkit import E, F, A, W
from datetime import date

df = spark.createDataFrame([
    {"id": 1, "shape": "square", "color": "red"},
    {"id": 2, "shape": "square", "color": "blue"},
    {"id": 3, "shape": "triangle", "color": "blue"},
    {"id": 4, "shape": "circle", "color": "red"},
])

result_df = A.parse_records(
    df,
    configs=[
        A.parser(
            parser_id="red_square_parser",
            include_any=[F.col("shape") == "square"],
            exclude_any=[F.col("color").isin(["blue", "green"])],
        ),
        A.parser(
            parser_id="blue_parser",
            include_any=[F.col("color") == "blue"],
        ),
    ],
    qa=True,
    include_non_matches=True,
)

display(result_df)

In this example: - The square red shape (id=1) matches only the “red_square_parser” and is listed as primary match since it’s the only match. - The square blue shape (id=2) initially matches both parsers but after exclusions only matches “blue_parser” (shown in matched_parser_ids). The parser_id shows all initial matches before exclusions. - The triangle blue shape (id=3) matches only the “blue_parser” and thus has it as primary_match. - The circle red shape (id=4) doesn’t match any parser, but is included due to include_non_matches=True.

The columns: - parser_id: Array showing all initial parser matches before exclusions - matched_parser_ids: Array showing final parser matches after applying exclusions - primary_match: Single parser ID when there’s exactly one match after exclusions, otherwise null - parser_inclusion_case and parser_exclusion_case: Arrays of structs containing the parser_id and corresponding inclusion or exclusion case that matched

etl_toolkit.A.parser(parser_id, include_any=<factory>, exclude_any=<factory>, with_columns=<factory>, metadata=<factory>)[source]#

Use this function to define parsing configurations that are applied in the A.parse_records function. These parsers can control inclusion and exclusion filtering logic via include_any and exclude_any and enrichment columns through with_columns.

Tip

Encapsulating logic in parsers like this make transformation logic modular and more easily understood. For example, if covering multiple companies, each company’s filtering logic can be defined as a distinct parser.

Note

This function should not be used on its own. It’s not a standard analysis function that returns a dataframe. Instead, it defines a configuration that can be supplied to A.parse_records.

Parameters:
  • parser_id (str) – Unique ID of the parser object that is used in the parser_id column included in the A.parse_records function. This ID must be unique within the list of configurations used in A.parse_records

  • include_any (list[Column] | dict[str, Column], default: <factory>) – A list or dict of boolean Column expressions. If any of these column expressions evaluate to True for a record, the parser is considered match (assuming exclude_any also does not have a match). If a dict is provided instead of a list, then key/value pairs are supplied where the key is a Unique ID for condition and the value is the boolean expression.

  • exclude_any (list[Column] | dict[str, Column], default: <factory>) – A list or dict of boolean Column expressions. If any of these column expressions evaluate to True for a record, the record is not included in the output of A.parse_records. If a dict is provided instead of a list, then key/value pairs are supplied where the key is a Unique ID for condition and the value is the boolean expression.

  • with_columns (dict[str, str | Column], default: <factory>) – A dict of key/value pairs where the key is a column name to add and the value is a Column expression. These columns are included on a record of A.parse_records should the parser match.

  • metadata (dict[str, str | int | date | datetime | bool | float], default: <factory>) – Generally not used outside of library developers. These dicts can include arbitrary key/value information about the parser. These values are not added to the outputs of A.parse_records

Return type:

None

Examples#

Simple parser example with A.parse_records.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])

display(
    A.parse_records(
        df,
        configs=[
            A.parser(
                parser_id="red_parser",
                include_any=[
                    F.col("color") == "red"
                ],
            ),
            A.parser(
                parser_id="blue_parser",
                include_any=[
                    F.col("color") == "blue"
                ],
            ),
        ],
    )
)

color

date

value

parser_id

red

2024-01-01

100

red_parser

blue

2024-01-02

50

blue_parser

Parser example with A.parse_records when with_columns is used. Notice how the columns are added to the output dataframe. Notice that if the set of keys of columns to add are not consisted between parsers, the function will resolve the missing columns as nulls.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])

display(
    A.parse_records(
        df,
        configs=[
            A.parser(
                parser_id="red_parser",
                include_any=[
                    F.col("color") == "red"
                ],
                with_columns={
                    "enriched_red": F.lit(True),
                    "enriched_red_2": F.lit(1),
                },
            ),
            A.parser(
                parser_id="blue_parser",
                include_any=[
                    F.col("color") == "blue"
                ],
                with_columns={
                    "enriched_blue": F.lit(True),
                    "enriched_blue_2": F.lit(1),
                },
            ),
        ],
    )
)

color

date

value

parser_id

enriched_red

enriched_red_2

enriched_blue

enriched_blue_2

red

2024-01-01

100

red_parser

true

1

null

null

blue

2024-01-02

50

blue_parser

null

null

true

1

Parser example with include_any and exclude_any expressed as dicts. The keys are present in the output dataframe when QA mode is used for A.parse_records and the condition is satisfied for the record.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 2)},
])

display(
    A.parse_records(
        df,
        configs=[
            A.parser(
                parser_id="red_parser",
                include_any={
                    "color_match": F.col("color") == "red",
                },
            ),
            A.parser(
                parser_id="blue_parser",
                include_any={
                    "color_match": F.col("color") == "blue",
                },
                exclude_any={
                    "date_stale": F.col("date") < date(2024, 1, 3),
                },
            ),
        ],
        qa=True,
    )
)

color

date

value

parser_id

parser_exclusion_case

parser_inclusion_case

red

2024-01-01

100

red_parser

null

color_match

blue

2024-01-02

50

blue_parser

date_stale

color_match

Scalar Analyses#

Suite of functions to genenerate scalars (i.e. python literal values) for common pyspark operations. These can be useful to retrieve values into python from dataframes and re-use them in pipeline code.

Note

Unlike other analyses functions in the toolkit, these scalar functions return python literal values, (ex: int, float, etc.) instead of dataframes.

etl_toolkit.A.get_aggregates(df, value_column, aggregate_functions)[source]#

Returns a dictionary of aggregate values based on the value_column, df, and aggregate_functions specified. This is used to be able to extract the min, max, etc. values from a pyspark dataframe into python and use in subsequent code. The output is a dictionary with key/value pairs for each aggregate calculation specified.

Tip

Certain pipeline logic can benefit from calculating these simple aggregates in a dedicated step and then referenced in subsequent transformations as literals - this simplifies the spark query plan by cutting down on window expressions and/or joins. A common example of this is calculating the percentage of a column across the total for the entire dataset. This would otherwise be an expensive window operation without the use of this function.

Parameters:
  • df (pyspark.sql.DataFrame) – Dataframe used to calculate aggregate values

  • value_column (str | pyspark.sql.Column) – Column or string to use when calculating aggregate values. If a string is provided, it is referenced as a Column.

  • aggregate_functions (list[Literal['min', 'max', 'count', 'avg', 'sum', 'count_distinct']]) – A list of aggregate function names. For each function name, a separate aggregate calculation will be returned in the output dict.

Return type:

dict[str, Any]

Examples#

Example of generating aggregate values. Notice the output is a dictonary with keys representing the aggregate functions specified.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 1)},
])

print(
    A.get_aggregates(
        df,
        value_column="value",
        aggregate_functions=["min", "max", "sum"],
    )
)

# out: {'min': 50, 'max': 100, 'sum': 150}
Example of using aggregate values to avoid an expensive window operation. Notice how the aggregates are python values that can be used as literals in subsequent spark operations.#
from etl_toolkit import E, F, A, W

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 1)},
])

# This costly window operation can be reimplemented using get_aggregates
enriched_df = df.withColumn(
    "value_percent",
    F.col("value") / F.sum("value").over(W.partitionBy(F.lit(1)))
)

# Using get_aggregates, we can accomplish the same logic:
aggregates = A.get_aggregates(
    df,
    value_column="value",
    aggregate_functions=["sum"]
)

display(
    df
    .withColumn("value_percent", F.col("value") / F.lit(aggregates["sum"]))
)

color

date

value

value_percent

red

2024-01-01

100

0.666666

blue

2024-01-01

50

0.333333

Time Analyses#

Suite of functions to manage date and timestamp operations. Use these functions for filling date ranges and grossing up data to higher periodicities.

etl_toolkit.A.fill_periods(df, date_column, slice_columns=None, steps=1, step_unit='DAY', start_day_of_week='MONDAY', end_day_of_week=None, start=None, end=None, calendar=None, spark=None)[source]#

Function to transform the provided df so that includes any missing periods as blank rows given the specified date_column and interval defined as (steps * step_unit). This is a useful function to normalize data when there is gaps due to missing time periods which can be important to address when calculating growth rates or other lagged metrics. The periods are filled between the min/max range of the dataframes date_column.

Additionally, this function can fill records based on slice_columns. When specified, the rows will be added per permuation of slice columns in addition to the defined interval.

In the returned dataframe, the filled date and slices (if any specified) will be provided. Other columns in the dataframe will be NULL.

Tip

This is a recommended replacement for the fill_periods function in yipit_databricks_utils, which is deprecated. It handles other use cases including variable intervals and slices, while being a more performant spark operation.

Parameters:
  • df (pyspark.sql.DataFrame) – The dataframe to transform to fill in missing periods.

  • date_column (str | pyspark.sql.Column) – A date or timestamp Column or string that is used to evaluate the min/max range of the fill operation. In addition the filled records will include the date value under this column. If a string is specified, it is normalized as a Column.

  • slice_columns (list[str | pyspark.sql.Column]) – A list of Columns or strings, that can be used to evaluate the records to fill within each slice. If not specified, only the date column is inspected for gaps in values. If string(s) are specified, it is normalized as a Column.

  • steps (int) – The number of step_units to use between each row’s period_start and period_end

  • step_unit (Literal['DAY', 'WEEK', 'MONTH', 'YEAR', 'HOUR', 'MINUTE', 'SECOND']) – The interval of each step to use between each row’s period_start and period_end. A standard calendar is used for quarters.

  • start_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – Only used when step_unit="WEEK". Indicates which day of week to start each period by, ex start every week on Saturday or Mondays. The input data should also have rows starting on the same day of the week, or an InvalidInputException will be raised. By default, weeks start on Monday.

  • end_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – When step_unit='WEEK', this parameter will ensure periods are aggregated on 7-day intervals that end on this specified day of the week. This parameter should not be specified if start_day_of_week is specified as it is duplicative.

  • start (datetime.datetime | datetime.date) – The start date or timestamp (datetime) of the earliest period to fill. If a datetime object is supplied the resulting columns will be of timestamp type. If not specified, the min value of the date_column of the input df is used.

  • end (datetime.datetime | datetime.date) – The end date or timestamp (datetime) of the latest period to fill. If a datetime object is supplied the resulting columns will be of timestamp type. If not specified, the max value of the date_column of the input df is used.

  • spark (pyspark.sql.SparkSession) – Spark session to use. Generally, this is not needed as the session is automatically generated in databricks. It is used by library developers.

Return type:

pyspark.sql.DataFrame

Examples#

Fill missing dates with 1 day intervals between rows of the dataframe. Notice that the dates are filled based on the overall/min max date of the dataframe.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2023, 1, 5)},
    {"value": 1000, "color": "red", "date": date(2023, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])

display(
    A.fill_periods(
        df,
        date_column="date",
    )
)

date

color

value

2023-01-01

red

1000

2023-01-01

blue

50

2023-01-02

null

null

2023-01-03

null

null

2023-01-04

null

null

2023-01-05

red

50

Fill missing dates with 1 day intervals while using the slice_columns feature. Notice that the dates are filled based on the overall/min max date of the dataframe for each unique slice in the dataframe.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2023, 1, 3)},
    {"value": 1000, "color": "red", "date": date(2023, 1, 1)},
    {"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])

display(
    A.fill_periods(
        df,
        date_column="date",
        slice_columns=["color"],
    )
)

date

color

value

2023-01-01

blue

50

2023-01-02

blue

null

2023-01-03

blue

null

2023-01-01

red

1000

2023-01-02

red

null

2023-01-03

red

50

Fill missing dates with 1 week intervals between rows of the dataframe, with weeks starting on saturdays.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 6)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 20)},
])

display(
    A.fill_periods(
        df,
        date_column="date",
        step_unit="WEEK",
        start_day_of_week="SATURDAY",
    )
)

date

color

value

2024-01-06

red

100

2024-01-13

null

null

2024-01-20

blue

50

Fill missing dates with 1 week intervals between rows of the dataframe, but include data through and include Jan. 1, 2024 using the start parameter.#
from etl_toolkit import E, F, A

from datetime import date

df = spark.createDataFrame([
    {"value": 100, "color": "red", "date": date(2024, 1, 8)},
    {"value": 50, "color": "blue", "date": date(2024, 1, 22)},
])

display(
    A.fill_periods(
        df,
        date_column="date",
        step_unit="WEEK",
        start=date(2024, 1, 1),
    )
)

date

color

value

2024-01-01

null

null

2024-01-08

red

100

2024-01-15

null

null

2024-01-22

blue

50

etl_toolkit.A.periods(start, end, steps=1, step_unit='DAY', start_day_of_week='MONDAY', calendar=None, spark=None)[source]#

Generate a dataframe dynamically based on the start and end values provided, with 1 row per interval that has elapsed (step * step_unit) time since the previous row. This function can be useful to generate a series of expected dates or timestamps to join to another dataframe.

The output dataframe also includes larger periodicies to round the period up to (ex: week, month, quarter, etc.)

Parameters:
  • start (datetime.datetime | datetime.date) – The start date or timestamp (datetime) of the dataframe to create. If a datetime object is supplied the resulting columns will be of timestamp type.

  • end (datetime.datetime | datetime.date) – The end date or timestamp (datetime) of the dataframe to create. If a datetime object is supplied the resulting columns will be of timestamp type.

  • steps (int) – The number of step_units to use between each row’s period_start and period_end

  • step_unit (Literal['DAY', 'WEEK', 'MONTH', 'QUARTER', 'YEAR', 'HOUR', 'MINUTE', 'SECOND']) – The interval of each step to use between each row’s period_start and period_end. A standard calendar is used for quarters.

  • start_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – Only used when step_unit="WEEK". Indicates which day of week to start each period by, ex start every week on Saturday or Mondays. The start value should have the same day of week, otherwise an InvalidInputException will be raised. By default, weeks start on Monday.

  • spark (pyspark.sql.SparkSession) – Spark session to use. Generally, this is not needed as the session is automatically generated in databricks. It is used by library developers.

Return type:

pyspark.sql.DataFrame

Examples#

Generate a simple date range with 1 day per row between the specified start and end.#
from etl_toolkit import E, F, A

from datetime import date

display(
    A.periods(
        start=date(2024, 1, 1),
        end=date(2024, 1, 5),
    )
)

period_start

period_end

2024-01-01

2024-01-01

2024-01-02

2024-01-02

2024-01-03

2024-01-03

2024-01-04

2024-01-04

2024-01-05

2024-01-05

Generate a date range based on a weekly periodicity. Notice how the start and end dates reflect a 7-day duration.#
from etl_toolkit import E, F, A

from datetime import date

display(
    A.periods(
        start=date(2024, 1, 1),
        end=date(2024, 1, 14),
        step_unit="WEEK",
    )
)

period_start

period_end

2024-01-01

2024-01-07

2024-01-08

2024-01-14

Generate a date range based on a weekly periodicity with weeks starting on saturdays. Notice how the start and end dates reflect a 7-day duration.#
from etl_toolkit import E, A, F
from datetime import date, datetime

display(
    A.periods(
        start=date(2024, 1, 6),
        end=date(2024, 1, 19),
        step_unit="WEEK",
        start_day_of_week="SATURDAY",
    )
)

period_start

period_end

2024-01-06

2024-01-12

2024-01-13

2024-01-19

Generate a date range based on a two-month periodicity. This is accomplished by specifying steps as 2 and a “MONTH” step_unit.#
from etl_toolkit import E, F, A

from datetime import date

display(
    A.periods(
        start=date(2024, 1, 1),
        end=date(2024, 6, 30),
        step_unit="MONTH",
        steps=2,
    )
)

period_start

period_end

2024-01-01

2024-02-29

2024-03-01

2024-04-30

2024-05-01

2024-06-30

Comparison Analyses#

Suite of functions to compare DataFrames by schema or row content. These are useful for data validation, testing pipeline outputs, and ensuring data quality. The functions help identify differences between two DataFrames either at the schema level or by comparing individual rows.

etl_toolkit.A.get_schema_comparison(df1, df2, diff_only=False)[source]#

Compare schemas from two DataFrames and returns a DataFrame with the differences.

For schema, all columns are compared regardless of what is passed to columns parameter.

Parameters:
  • df1 (pyspark.sql.DataFrame) – The first DataFrame to compare.

  • df2 (pyspark.sql.DataFrame) – The second DataFrame to compare.

  • diff_only (bool) – If True, only columns that are different will be returned in schema comparison dataframe.

Returns:

DataFrame with the schema comparison.

Return type:

pyspark.sql.DataFrame

Examples#

column_name

df1_data_type

df2_data_type

is_diff

name date

datetime

is_active

id

_timestamp_ingested

StringType()

DateType()

TimestampType()

BooleanType() IntegerType()

NULL

StringType()

NULL

StringType() StringType() StringType()

TimestampType()

false

true true true true true

etl_toolkit.A.get_rows_comparison(df1, df2, columns=None, sample_size=None, checksum_algorithm='hash')[source]#

Compare rows from both DataFrames and return two DataFrames with the differences.

It compares rows by computing a checksum for both dataframes using either the intersection of columns from both dataframes or the columns specified in the columns parameter. Both dataframes are left-anti joined twice on the checksum column to identify differences.

For large datasets, consider using the sample_size parameter to limit the comparison to a subset of rows.

Parameters:
  • df1 (pyspark.sql.DataFrame) – The first DataFrame to compare.

  • df2 (pyspark.sql.DataFrame) – The second DataFrame to compare.

  • columns (Optional[List[str]]) – A list of columns to compare. If not specified, intersection of the columns from both dataframes will be used.

  • sample_size (Optional[int]) – Optional limit on the number of rows to include in the comparison. Useful for comparing large datasets.

  • checksum_algorithm (str) – Algorithm to use for checksum calculation, options are ‘uuid5’ or ‘hash’ (default). For large datasets, ‘hash’ is generally more efficient.

Returns:

A tuple with two DataFrames, the first one with the rows that are in df1 but not in df2 and the second one with the rows that are in df2 but not in df1.

Return type:

Tuple[pyspark.sql.DataFrame, pyspark.sql.DataFrame]

Examples#

Investor Standard Metrics Analyses#

Suite of functions specific to the standard metrics experience. These are used to generate unified KPI analyses and downstream dashboard and feed tables for a metric.

etl_toolkit.A.standard_metric_unified_kpi(df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df=None)[source]#

Generates a comprehensive dataframe containing all unified KPI analyses for a metric.

This function performs a series of transformations to analyze a metric across multiple time periods and calculation types. The process includes:

  1. Data Preparation: - Validates and extracts configuration parameters - Sets up calendar information for time-based analysis - Applies any source table filters

  2. Analysis Generation: - Creates analysis groups based on periodicity (QUARTER, MONTH, etc.) - Calculates simple aggregates (SUM, AVG) for each period - Computes growth rates (YoY, QoQ) if configured - Handles period-to-date comparisons - Processes data slices (e.g., by product, region)

  3. Data Enrichment: - Adds entity information (company name, ticker) - Includes metric metadata (currency, divisor) - Attaches calendar metadata (period labels, dates)

  4. Special Handling: - Supports custom functions (e.g., PCT_OF_TOTAL) - Handles leap year adjustments - Manages trailing period calculations - Normalizes values across different period lengths

Parameters:
  • df (pyspark.sql.DataFrame) – Source DataFrame containing the metric data

  • entity_configuration (standard_metric_unified_kpi.entity_configuration) – A.entity_configuration for configuring entity details

  • standard_metric_metadata (standard_metric_unified_kpi.standard_metric_metadata) – A.standard_metric_metadata for configuring metric metadata

  • standard_metric_configuration (standard_metric_unified_kpi.standard_metric_configuration) – A.standard_metric_configuration for configuring metric calculations

  • calendar_df (Optional[pyspark.sql.DataFrame]) – Optional calendar DataFrame (uses standard calendar if not provided)

Returns:

DataFrame containing comprehensive metric unified KPI analyses

Return type:

pyspark.sql.DataFrame

Examples#

Generating unified KPI analyses for a metric. Example output is a subset of the actual output.#
from etl_toolkit import A

input_df = spark.table("yd_production.afrm_live_reported.afrm_gmv_sliced")
calendar_df = spark.table("yd_fp_investor_audit.afrm_xnas_deliverable_gold.custom_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="Affirm", top_level_entity_ticker="AFRM:XNAS"
)
standard_metric_metadata = A.standard_metric_metadata(
    metric_name="GMV",
    company_comparable_kpi=True,
    display_period_granularity="DAY",
    report_period_granularity="QUARTER",
    currency="USD",
    value_divisor=1000000,
    visible_alpha_id=14081329,
)
standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="gmv",
    source_input_date_column="date",
    aggregate_function="SUM",
    growth_rate_type="CAGR",
    max_relevant_years=4,
    calendar_type="EXACT_N_YEARS",
    slice_columns=["merchant"],
    trailing_period_length=7,
    trailing_period_aggregate_function="AVG",
)

df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)
display(df)

metric_name

slice_name_1

value

aggregation_type

calculation_type

trailing_period

periodicity

period_start

GMV

null

80536050.596995

SUM

SIMPLE_AGGREGATE

null

DAY

2024-10-01

GMV

merchant

35380357.162450

SUM

SIMPLE_AGGREGATE

null

DAY

2024-10-01

GMV

null

77256191.770087

AVG

SIMPLE_AGGREGATE

7

DAY

2024-10-01

GMV

merchant

41700742.409985

AVG

SIMPLE_AGGREGATE

7

DAY

2024-10-01

etl_toolkit.A.standard_metric_unified_kpi_derived(unified_kpi_df_1, unified_kpi_df_2, standard_metric_metadata, standard_metric_configuration, operator='DIVISION')[source]#

Generates a dataframe containing a unified KPI analyses for a derived standard metric based on unified KPI dataframes of two input metrics.

Note

The mathematical operation preserves the order of the input metrics’ unified KPI tables.

If the operator is DIVISION, for example, the metric from unified_kpi_df_1 will be used as the numerator and the metric from unified_kpi_df_1 will be used as the denominator.

Parameters:
  • unified_kpi_df_1 (pyspark.sql.DataFrame) – A dataframe containing unified KPI analyses of one metric

  • unified_kpi_df_2 (pyspark.sql.DataFrame) – A dataframe containing unified KPI analyses of another metric

  • standard_metric_metadata (standard_metric_unified_kpi_derived.standard_metric_metadata) – A.standard_metric_metadata configurations

  • standard_metric_configuration (standard_metric_unified_kpi_derived.standard_metric_configuration) – A.standard_metric_configuration configurations

  • operator (Literal['DIVISION', 'ADDITION', 'MULTIPLICATION', 'SUBTRACTION']) – mathematical operation between the two standard metrics to get the derived metric

Returns:

A dataframe containing unified KPI analyses of the derived metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating unified KPI analyses for a derived metric. Example output is a subset of the actual output.#
from etl_toolkit import A

entity_configuration = A.entity_configuration(
    top_level_entity_name="Chewy",
    top_level_entity_ticker="CHWY:XNYS",
    figi="BBG00P19DLQ4",
)
calendar_df = spark.table(
    "yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000"
)

standard_metric_metadata_1 = A.standard_metric_metadata(
    metric_name="Net Sales - Order Date",
    company_comparable_kpi=False,
    currency="USD",
    value_divisor=1000000,
)
standard_metric_configuration_1 = A.standard_metric_configuration(
    source_input_column="net_sales_order_date",
    source_input_date_column="date",
    calendar_type="52_WEEK",
    trailing_period_aggregate_function="SUM",
)
input_df_1 = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
unified_kpi_df_1 = A.standard_metric_unified_kpi(
    input_df_1,
    entity_configuration,
    standard_metric_metadata_1,
    standard_metric_configuration_1,
    calendar_df
)

standard_metric_metadata_2 = A.standard_metric_metadata(
    metric_name="Orders - Order Date",
    company_comparable_kpi=False,
)
standard_metric_configuration_2 = A.standard_metric_configuration(
    source_input_column="order_date_orders_index",
    source_input_date_column="date",
    calendar_type="52_WEEK",
    trailing_period_length=None,
    trailing_period_aggregate_function=None,
)
input_df_2 = spark.table("yd_production.chwy_reported.edison_daily_sales")
unified_kpi_df_2 = A.standard_metric_unified_kpi(
    input_df_2,
    entity_configuration,
    standard_metric_metadata_2,
    standard_metric_configuration_2,
    calendar_df
)

derived_standard_metric_metadata = A.standard_metric_metadata(
    metric_name="AOV - Order Date",
    company_comparable_kpi=False,
)
derived_standard_metric_configuration = A.standard_metric_configuration(
    max_relevant_years=2,
    growth_rate_type="CAGR",
    calendar_type="52_WEEK",
    trailing_period_length=7,
    trailing_period_aggregate_function="AVG",
)
df = unified_kpi_derived(
    unified_kpi_df_1,
    unified_kpi_df_2,
    derived_standard_metric_metadata,
    operator="DIVISION"
)

display(df)

metric_name

slice_name_1

value

aggregation_type

calculation_type

trailing_period

periodicity

period_start

AOV - Order Date

null

34136.196960

SUM

SIMPLE_AGGREGATE

null

QUARTER

2021-02-01

AOV - Order Date

null

40176.041644

SUM

SIMPLE_AGGREGATE

null

QUARTER

2023-01-30

AOV - Order Date

null

29545.027237

SUM

SIMPLE_AGGREGATE

null

QUARTER

2019-11-04

AOV - Order Date

null

37124.360009

SUM

SIMPLE_AGGREGATE

null

QUARTER

2022-05-02

etl_toolkit.A.standard_metric_data_download(df, entity_configuration)[source]#

Transforms the unified KPI table to generate a dataframe containing analyses for the data download.

Parameters:
  • df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table

  • entity_configuration (standard_metric_data_download.entity_configuration) – A.entity_configuration configurations

Returns:

DataFrame of data download analyses for a metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating data download analyses for a metric.#
from etl_toolkit import A

input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="Chewy",
    top_level_entity_ticker="CHWY:XNYS",
    figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Net Sales - Order Date",
    company_comparable_kpi=False,
    currency="USD",
    value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="net_sales_order_date",
    source_input_date_column="date",
    max_relevant_years=4,
    calendar_type="52_WEEK",
    trailing_period_aggregate_function="SUM",
)

unified_kpi_df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

df = A.standard_metric_data_download(unified_kpi_df, entity_configuration)
display(df)

date

top_level_entity_ticker

top_level_entity_name

internal_dashboard_analysis_name

analysis_label_name

metric_name

slice_name_1

slice_value_1

value

2025-02-02

CHWY:XNYS

Chewy

day_1y_growth_rate_trailing_day

T7D SUM 1Y Growth

Net Sales - Order Date

null

null

0.019454

2025-02-02

CHWY:XNYS

Chewy

day_2y_growth_rate_trailing_day

T7D SUM 2Y Growth

Net Sales - Order Date

null

null

0.019231

2025-02-02

CHWY:XNYS

Chewy

day_3y_growth_rate_trailing_day

T7D SUM 3Y Growth

Net Sales - Order Date

null

null

0.110622

2025-02-02

CHWY:XNYS

Chewy

day_4y_growth_rate_trailing_day

T7D SUM 4Y Growth

Net Sales - Order Date

null

null

0.104899

2025-02-02

CHWY:XNYS

Chewy

day_simple_aggregate

Nominal Value.

Net Sales - Order Date

null

null

32434644.148619

2025-02-02

CHWY:XNYS

Chewy

day_simple_aggregate_trailing_day

T7D SUM

Net Sales - Order Date

null

null

237115829.752783

etl_toolkit.A.standard_metric_live_feed(df, entity_configuration, spark=None)[source]#

Transforms the unified KPI table to generate a dataframe containing YDL feed analyses. This function will be deprecated alongside YDL Feeds in July 2025.

Parameters:
  • df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table

  • entity_configuration (standard_metric_live_feed.entity_configuration) – A.entity_configuration configurations

  • spark (pyspark.sql.SparkSession)

Returns:

DataFrame of live feed analyses for a metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating live feed analyses for a metric.#
from etl_toolkit import A

input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="Chewy",
    top_level_entity_ticker="CHWY:XNYS",
    figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Net Sales - Order Date",
    company_comparable_kpi=False,
    currency="USD",
    value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="net_sales_order_date",
    source_input_date_column="date",
    max_relevant_years=4,
    calendar_type="52_WEEK",
    trailing_period_aggregate_function="SUM",
)

unified_kpi_df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

df = A.standard_metric_live_feed(unified_kpi_df, entity_configuration)
display(df)
etl_toolkit.A.standard_metric_feed(df, entity_configuration, spark=None)[source]#

Transforms the unified KPI table to generate a dataframe containing core metric feed analyses.

Parameters:
  • df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table

  • entity_configuration (standard_metric_feed.entity_configuration) – A.entity_configuration configurations

  • spark (pyspark.sql.SparkSession)

Returns:

DataFrame of core metric feed analyses for a metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating core metric feed analyses for a metric.#
from etl_toolkit import A

input_df = spark.table("yd_production.aap_reported.aap_gmv")
calendar_df = spark.table("yd_fp_investor_audit.aap_xnys_deliverable_gold.custom_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="Advance Auto Parts",
    top_level_entity_ticker="AAP:XNYS",
)
standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Net Sales",
    currency="USD",
    value_divisor=1000000,
    visible_alpha_id=5598475,
)
standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="value",
    source_input_date_column="date",
    max_relevant_years=5,
    calendar_type="52_WEEK",
    source_table_filter_conditions=[F.col("metric")=="Net Sales"],
    trailing_period_aggregate_function="SUM",
)

unified_kpi_df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df,
)

df = A.standard_metric_feed(unified_kpi_df, entity_configuration)
display(df)

yd_product_type

yd_product

metric_name

analysis_name

periodicity

slice_names

slice_values

value

period_start

period_end

currency

figi

Core Metrics

AAP

Net Sales

Net Sales - Total

daily

null

null

18512030.489125

2025-01-01

2025-01-01

USD

null

Core Metrics

AAP

Net Sales

Net Sales - Total

monthly

null

null

747219529.371808

2025-01-01

2025-01-31

USD

null

Core Metrics

AAP

Net Sales

Net Sales - Total

quarterly

null

null

2026418754.661688

2024-10-06

2024-12-28

USD

null

Core Metrics

AAP

Net Sales

Net Sales - Total

MTD Y-0

null

null

29447541.625361

2025-02-01

2025-02-01

USD

null

Core Metrics

AAP

Net Sales

Net Sales - Total

QTD Y-0

null

null

868300454.561080

2024-12-09

2025-02-01

USD

null

etl_toolkit.A.standard_metric_daily_growth(df)[source]#

Transforms a metric’s unified KPI table to generate a dataframe containing daily growth analyses.

Parameters:

df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table

Returns:

DataFrame of daily growth analyses for a metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating daily growth analyses for a metric.#
from etl_toolkit import A

input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="Chewy",
    top_level_entity_ticker="CHWY:XNYS",
    figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Net Sales - Order Date",
    company_comparable_kpi=False,
    currency="USD",
    value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="net_sales_order_date",
    source_input_date_column="date",
    max_relevant_years=4,
    calendar_type="52_WEEK",
    trailing_period_aggregate_function="SUM",
)

unified_kpi_df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

df = A.standard_metric_daily_growth(unified_kpi_df)
display(df)

top_level_entity_ticker

top_level_entity_name

period_start

period_end

metric_name

dashboard_analysis_name

day_1y_growth_rate_trailing_day

day_2y_growth_rate_trailing_day

CHWY:XNYS

Chewy

2025-02-01

2025-02-01

Net Sales - Order Date

Daily Growth

0.012224

0.017250

CHWY:XNYS

Chewy

2025-02-02

2025-02-02

Net Sales - Order Date

Daily Growth

0.014802

0.016585

CHWY:XNYS

Chewy

2025-02-03

2025-02-03

Net Sales - Order Date

Daily Growth

0.040221

0.024634

etl_toolkit.A.standard_metric_quarter_month_pivot(df)[source]#

Transforms a metric’s unified KPI table to generate a dataframe containing MTD and QTD analyses.

Parameters:

df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table

Returns:

DataFrame of MTD and QTD analyses for a metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating MTD and QTD analyses for a metric.#
from etl_toolkit import A

input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="Chewy",
    top_level_entity_ticker="CHWY:XNYS",
    figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Net Sales - Order Date",
    company_comparable_kpi=False,
    currency="USD",
    value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="net_sales_order_date",
    source_input_date_column="date",
    max_relevant_years=4,
    calendar_type="52_WEEK",
    trailing_period_aggregate_function="SUM",
)

unified_kpi_df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

df = A.standard_metric_quarter_month_pivot(unified_kpi_df)
display(df)

top_level_entity_ticker

top_level_entity_name

period_start

period_end

metric_name

periodicity

analysis_label

value

CHWY:XNYS

Chewy

2024-10-28

2025-02-02

Net Sales - Order Date

QUARTER

Nominal

3252.308829

CHWY:XNYS

Chewy

2024-10-28

2025-02-02

Net Sales - Order Date

QUARTER

Y/Y Growth

0.152283

CHWY:XNYS

Chewy

2024-10-28

2025-02-02

Net Sales - Order Date

QUARTER

2Y CAGR

0.094586

CHWY:XNYS

Chewy

2024-10-28

2025-02-02

Net Sales - Order Date

QUARTER

3Y CAGR

0.106234

CHWY:XNYS

Chewy

2024-10-28

2025-02-02

Net Sales - Order Date

QUARTER

4Y CAGR

0.123719

etl_toolkit.A.standard_metric_trailing_day_pivot(df)[source]#

Transforms a metric’s unified KPI table to generate a dataframe containing trailing day analyses.

Parameters:

df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table

Returns:

DataFrame of trailing day analyses for a metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating trailing day analyses for a metric.#
from etl_toolkit import A

input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="Chewy",
    top_level_entity_ticker="CHWY:XNYS",
    figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Net Sales - Order Date",
    company_comparable_kpi=False,
    currency="USD",
    value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="net_sales_order_date",
    source_input_date_column="date",
    max_relevant_years=4,
    calendar_type="52_WEEK",
    trailing_period_aggregate_function="SUM",
)

unified_kpi_df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

df = A.standard_metric_txd_pivot(unified_kpi_df)
display(df)

top_level_entity_ticker

top_level_entity_name

period_start

period_end

metric_name

analysis_label

value

previous_week_txd_value

CHWY:XNYS

Chewy

2025-01-28

2025-02-03

Net Sales - Order Date

Nominal

233.494012

230.214723

CHWY:XNYS

Chewy

2024-10-28

2025-02-02

Net Sales - Order Date

Y/Y Growth

0.040221

0.075195

CHWY:XNYS

Chewy

2024-10-28

2025-02-02

Net Sales - Order Date

2Y CAGR

0.024634

0.041169

CHWY:XNYS

Chewy

2024-10-28

2025-02-02

Net Sales - Order Date

3Y CAGR

0.116819

0.043802

CHWY:XNYS

Chewy

2024-10-28

2025-02-02

Net Sales - Order Date

4Y CAGR

0.105718

0.085727

etl_toolkit.A.standard_metric_net_adds(df, calendar_df)[source]#

Transforms a metric’s unified KPI table to generate a dataframe containing net add analyses.

Parameters:
  • df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table

  • calendar_df (pyspark.sql.DataFrame)

Returns:

DataFrame of net add analyses for a metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating net add analyses for a metric.#
from etl_toolkit import A

input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="Toast",
    top_level_entity_ticker="TOST:XNYS",
)
standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Location Net Adds",
    company_comparable_kpi=True,
    uses_va_for_actuals=False,
    display_period_granularity="DAY",
    report_period_granularity="QUARTER",
    currency=None,
    value_divisor=1,
    visible_alpha_id=14282041,
)

standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="locations",
    source_input_date_column="date",
    source_table_granularity="DAY",
    aggregate_function=None,
    max_relevant_years=2,
    growth_rate_type=None,
    calendar_type=None,
    source_table_filter_conditions=None,
    slice_columns=None,
    trailing_period_length=None,
    trailing_period_aggregate_function=None,
)

unified_kpi_df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

df = A.standard_metric_net_adds(unified_kpi_df, calendar_df)
display(df)

period_start

period_end

quarter_start

quarter_end

value

daily_net_add_qtd_value

value_previous_1_week

daily_net_add_qtd_value_previous_1_week

2024-10-01

2024-10-01

2024-10-01

2024-12-31

127066.757493

66.757493

126534.726587

6534.726587

2024-10-02

2024-10-02

2024-10-01

2024-12-31

127121.858916

121.858916

126597.423004

6597.423004

2024-10-03

2024-10-03

2024-10-01

2024-12-31

127182.258553

182.258553

126653.519799

6653.519799

etl_toolkit.A.standard_metric_weekly_qtd_progress(df, calendar_df, spark=None)[source]#

Transforms a metric’s unified KPI table to generate a dataframe containing weekly analyses through the quarter and the comparable weekly analyses in prior years.

Parameters:
  • df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table

  • calendar_df (pyspark.sql.DataFrame) – Calendar dataframe

  • spark (pyspark.sql.SparkSession) – Optional SparkSession (will create one if not provided)

Returns:

DataFrame of weekly QTD analyses for a metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating weekly quarter-to-date progress analyses for a metric.#
from etl_toolkit import A

input_df = spark.table("yd_production.cmg_reported.dash_daily_rev")
calendar_df = spark.table("yd_fp_investor_audit.calendar_gold.standard_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="Chipotle",
    top_level_entity_ticker="CMG:XNYS",
    exchange=None,
    entity_name=None,
    figi=None,
)

standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Total Revenue",
    company_comparable_kpi=True,
    uses_va_for_actuals=False,
    display_period_granularity="WEEK",
    report_period_granularity="QUARTER",
    currency="USD",
    value_divisor=1000,
    visible_alpha_id=6639756,
)

standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="revenue",
    source_input_date_column="date",
    source_table_granularity="DAY",
    aggregate_function="SUM",
    max_relevant_years=2,
    growth_rate_type="CAGR",
    calendar_type="EXACT_N_YEARS",
    source_table_filter_conditions=None,
    slice_columns=None,
    trailing_period_length=7,
    trailing_period_aggregate_function="SUM",
)

unified_kpi_df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

df = A.standard_metric_weekly_qtd_progress(unified_kpi_df, calendar_df)
display(df)
etl_toolkit.A.standard_metric_monthly_qtd_progress(df, calendar_df, spark=None)[source]#

Transforms a metric’s unified KPI table to generate a dataframe containing monthly analyses through the quarter and the comparable monthly analyses in prior years.

Parameters:
  • df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table

  • calendar_df (pyspark.sql.DataFrame) – Calendar dataframe

  • spark (pyspark.sql.SparkSession) – Optional SparkSession (will create one if not provided)

Returns:

DataFrame of monthly QTD analyses for a metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating monthly quarter-to-date progress analyses for a metric.#
from etl_toolkit import A

input_df = spark.table("yd_production.cn_shortvid_reported.active_users")
calendar_df = spark.table("yd_fp_investor_audit.calendar_gold.standard_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="ByteDance",
    top_level_entity_ticker=None,
    exchange=None,
    entity_name="Douyin",
    figi=None,
)
standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Monthly Active Users",
    company_comparable_kpi=False,
    uses_va_for_actuals=False,
    display_period_granularity="MONTH",
    report_period_granularity="QUARTER",
    value_divisor=1000000,
    visible_alpha_id=None,
)
standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="mau",
    source_input_date_column="month",
    source_table_granularity="MONTH",
    max_relevant_years=2,
    aggregate_function="AVG",
    growth_rate_type="CAGR",
    calendar_type="EXACT_N_YEARS",
    source_table_filter_conditions=[F.col("appname") == "Douyin Core"],
    slice_columns=["appname"],
    trailing_period_length=None,
    trailing_period_aggregate_function=None,
)

unified_kpi_df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

df = A.standard_metric_monthly_qtd_progress(unified_kpi_df, calendar_df)
display(df)
etl_toolkit.A.standard_metric_daily_qtd_progress(df, calendar_df)[source]#

Transforms a metric’s unified KPI table to generate a dataframe containing daily analyses through the quarter and the comparable daily analyses in prior years.

Parameters:
  • df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table

  • calendar_df (pyspark.sql.DataFrame) – Calendar dataframe

Returns:

DataFrame of daily QTD analyses for a metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating daily quarter-to-date progress analyses for a metric.#
from etl_toolkit import A

input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date")
calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="Chewy",
    top_level_entity_ticker="CHWY:XNYS",
    figi="BBG00P19DLQ4",
)
standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Net Sales - Order Date",
    company_comparable_kpi=False,
    currency="USD",
    value_divisor=1000000,
)
standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="net_sales_order_date",
    source_input_date_column="date",
    max_relevant_years=4,
    calendar_type="52_WEEK",
    trailing_period_aggregate_function="SUM",
)

unified_kpi_df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

df = A.standard_metric_daily_qtd_progress(unified_kpi_df, calendar_df)
display(df)
etl_toolkit.A.standard_metric_half_year_progress(df, calendar_df)[source]#

Transforms a metric’s unified KPI table to generate a dataframe containing daily analyses through the half year and the comparable daily analyses in prior years.

Parameters:
  • df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table

  • calendar_df (pyspark.sql.DataFrame) – Calendar dataframe

Returns:

DataFrame of daily half year progress analyses for a metric

Return type:

pyspark.sql.DataFrame

Examples#

Generating daily half year progress analyses for a metric.#
from etl_toolkit import A

input_df = spark.table("yd_production.itx_reported.itx_gmv_es")
calendar_df = spark.table("yd_fp_investor_audit.itx_xmad_deliverable_gold.custom_calendar__dmv__000")

entity_configuration = A.entity_configuration(
    top_level_entity_name="Inditex",
    top_level_entity_ticker="ITX:XMAD",
    exchange=None,
    entity_name=None,
    figi=None,
)

standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Spain Net Sales",
    company_comparable_kpi=True,
    uses_va_for_actuals=False,
    display_period_granularity="DAY",
    report_period_granularity="HALF_YEAR",
    currency="EUR",
    value_divisor=1000000,
    visible_alpha_id=None,
)

standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="value",
    source_input_date_column="date",
    source_table_granularity="DAY",
    aggregate_function="SUM",
    max_relevant_years=2,
    growth_rate_type="CAGR",
    calendar_type="EXACT_N_YEARS",
    source_table_filter_conditions=None,
    slice_columns=None,
    trailing_period_length=7,
    trailing_period_aggregate_function="AVG",
)

unified_kpi_df = A.standard_metric_unified_kpi(
    input_df,
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

df = A.standard_metric_half_year_progress(unified_kpi_df, calendar_df)
display(df)
etl_toolkit.A.standard_metric_ui_metadata(entity_configuration, standard_metric_metadata, standard_metric_configuration, standard_metric_dashboard_configuration, spark=None)[source]#

Generates a dataframe containing the UI metadata necessary for a standard metric’s dashboard configuration.

Parameters:
  • entity_configuration (standard_metric_ui_metadata.entity_configuration) – A.entity_configuration for configuring entity details

  • standard_metric_metadata (standard_metric_ui_metadata.standard_metric_metadata) – A.standard_metric_metadata for configuring metric metadata

  • standard_metric_configuration (standard_metric_ui_metadata.standard_metric_configuration) – A.standard_metric_configuration for configuring metric calculations

  • standard_metric_dashboard_configuration (standard_metric_ui_metadata.standard_metric_dashboard_configuration) – A.standard_metric_dashboard_configuration for configurating the dashboard

  • spark (pyspark.sql.SparkSession) – Optional SparkSession (will create one if not provided)

Returns:

DataFrame containing UI metadata for the standard metric dashboard

Return type:

pyspark.sql.DataFrame

Examples#

Generating UI metadata dataframe for a metric#
from etl_toolkit import A

entity_configuration = A.entity_configuration(
    top_level_entity_name="Affirm", top_level_entity_ticker="AFRM:XNAS"
)
standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Gross Merchandise Volume",
    company_comparable_kpi=True,
    display_period_granularity="DAY",
    report_period_granularity="QUARTER",
    currency="USD",
    value_divisor=1000000,
    visible_alpha_id=14081329,
)
standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="gmv",
    source_input_date_column="date",
    aggregate_function="SUM",
    growth_rate_type="CAGR",
    max_relevant_years=4,
    calendar_type="EXACT_N_YEARS",
    slice_columns=["merchant"],
    trailing_period_length=7,
    trailing_period_aggregate_function="AVG",
)
standard_metric_dashboard_configuration = A.standard_metric_dashboard_configuration(
    page_title="Gross Merchandise Volume",
    page_assignment="page_1_1",
    metric_short_name="GMV",
    data_lag="T-3",
    publishing_schedule="Weekly (Thu)",
)

df = A.standard_metric_ui_metadata(
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
    standard_metric_dashboard_configuration
)
display(df)

Calendar Analyses#

Suite of functions and classes to generate and manage calendar data with support for custom fiscal periods, holidays, and business day calculations. These calendar utilities standardize period definitions and holiday handling across an organization.

etl_toolkit.A.calendar(calendar_id='standard', start_date=None, end_date=None, custom_holidays=None, country_holiday_codes=None, quarters=None, halves=None, fiscal_year_start_month=1, fiscal_year_start_day=1, qa=False, spark=None)[source]#

Generate a dataframe with periodicities (day, week, month, quarter, year) and calendar metadata like holidays and business days. The calendar can be customized with different fiscal year configurations, custom holidays, and period definitions.

The function returns calendar data formatted similarly to the Freeport calendar features with standardized fields for dates, periods, and calendar metadata.

Parameters:
  • calendar_id (str) – Identifier for the calendar configuration to use. Defaults to “standard” which uses a standard 365-day calendar.

  • start_date (Optional[datetime.date]) – Optional minimum date to include in the calendar. Defaults to 5 years before current date.

  • end_date (Optional[datetime.date]) – Optional maximum date to include in the calendar. Defaults to 5 years after current date.

  • custom_holidays (Optional[List[Holiday]]) – Optional list of Holiday objects defining custom holidays to include.

  • country_holiday_codes (Optional[List[str]]) – Optional list of two letter country codes to include default holidays.

  • quarters (Optional[List[Period]]) – Optional list of Period objects defining custom quarter date ranges.

  • halves (Optional[List[Period]]) – Optional list of Period objects defining custom half-year date ranges.

  • fiscal_year_start_month (int) – Month when fiscal year starts (1-12). Defaults to 1 (January).

  • fiscal_year_start_day (int) – Day of month when fiscal year starts (1-31). Defaults to 1.

  • qa (bool) – Enable QA mode to add additional columns for validation.

  • spark (pyspark.sql.SparkSession) – Optional SparkSession. If not provided, will attempt to get session from yipit_databricks_utils. Generally, this is not needed as the session is automatically generated in databricks. It is used by library developers.

Returns:

DataFrame: Calendar data with the following columns: - day (date): The calendar date - calendar_type (string): Type of calendar (fiscal/standard) - year_label (string): Year label (e.g. FY2024) - year_period_start (date): Start date of year period - year_period_end (date): End date of year period - quarter_period_start (date): Start date of quarter - quarter_period_end (date): End date of quarter - quarter_label (string): Quarter label (e.g. 1Q24) - month_period_start (date): Start date of month - month_period_end (date): End date of month - week_period_start (date): Start of week (Sunday) - week_period_end (date): End of week (Saturday) - half_year_period_start (date): Start of half-year period - half_year_period_end (date): End of half-year period - half_year_label (string): Half-year label (e.g. 1HY24) - is_business_day (boolean): Is this a business day - is_holiday (boolean): Is this a holiday - holiday_name (string): Name of holiday if applicable - days_in_week (int): Days elapsed in current week - days_in_month (int): Days elapsed in current month - days_in_quarter (int): Days elapsed in current quarter - days_in_year (int): Days elapsed in current year - days_in_half_year (int): Days elapsed in current half year Additional columns for fiscal calendars: - custom_year_of_quarter (int): Custom year mapping for quarter - leap_year (boolean): Is this a leap year - leap_day (boolean): Is this a leap day (Feb 29)

Return type:

pyspark.sql.DataFrame

Examples#

Generate standard calendar for default date range#
from etl_toolkit import A
df = A.calendar()
Generate calendar with specific date range#
from etl_toolkit import A
from datetime import date

df = A.calendar(
    start_date=date(2024, 1, 1),
    end_date=date(2024, 12, 31)
)
Generate calendar with custom holidays#
from etl_toolkit import A
from datetime import date

df = A.calendar(
    custom_holidays=[
        A.holiday(
            name="Company Holiday",
            start_date=date(2024, 12, 24),
            end_date=date(2024, 12, 26)
        )
    ]
)
Generate calendar with custom fiscal quarters#
from etl_toolkit import A
from datetime import date

df = A.calendar(
    quarters=[
        A.period(
            start_date=date(2024, 10, 1),
            end_date=date(2024, 12, 31),
            type="quarter"
        ),
        A.period(
            start_date=date(2025, 1, 1),
            end_date=date(2025, 3, 31),
            type="quarter"
        ),
        A.period(
            start_date=date(2025, 4, 1),
            end_date=date(2025, 6, 30),
            type="quarter"
        ),
        A.period(
            start_date=date(2025, 7, 1),
            end_date=date(2025, 9, 30),
            type="quarter"
        )
    ]
)
Generate calendar with holidays from multiple countries#
from etl_toolkit import A
from datetime import date

df = A.calendar(
    start_date=date(2024, 1, 1),
    end_date=date(2024, 12, 31),
    country_holiday_codes=["US", "GB", "JP"]  # Include US, UK, and Japan holidays
)