A module (“analyses”)#
The analyses module in the etl_toolkit is imported as A.
This module contains several functions that accept a dataframe, apply complex transformations, and return new dataframes.
The functions should be used as larger or routine components of pipelines when possible. These transformations
include deduping, complex filtering, enrichment, and aggregations.
Tip
It is highly recommended to use these functions to cut down on large pieces of repetitive logic. Steps like parsing, deduplication, and generating KPIs can be greatly simplified using these functions. It is also clearer to a teammate or a reviewer of the intention of a transformation when using these functions.
Tip
Many analysis functions that do some filtering or aggregation have a QA mode that when enabled the output dataframe
will include additional columns and/or records to make investigation of the transformation or data easier. Functions
that have a QA mode will include a qa boolean argument. See each function’s documentation for details on if it supports QA mode.
Calculation Analyses#
Suite of functions that perform common calculations to enrich existing dataframes, including lagged values or percent of total. It is recommended these functions are used rather than implementing the calculations in native pyspark as they apply special tecniques to be performant and are easier to read.
- etl_toolkit.A.add_lag_columns(df, value_columns, date_column, slice_columns=None, steps=1, step_unit='DAY')[source]#
Adds additional columns to input dataframe that are lagged version of the
value_columnsspecified. The lag is calculated based ondate_columnand the specified interval (steps *step_unit). Lags can be performed within each slice if theslice_columnsare specified.The added lag columns are named in a standard way based on the interval, ex: (“revenue”, 1, “DAY”) -> “revenue_lag_1_day”
Caution
The lag is performed via a self-join to match a date with the corresponding (date - interval) and within any slice(s). It’s important to account for missing dates in the data to ensure accurate calculations. The
A.fill_periodsfunction can be useful in these scenarios.- Parameters:
df (pyspark.sql.DataFrame) – The input Dataframe to add lag calculations to
value_columns (list[str | pyspark.sql.Column]) – A list of Columns or strings to base the lag calculations for. Each column will have a corresponding lag column added. If strings are provided, they are resolved as Columns.
date_column (str | pyspark.sql.Column) – A Column or str that should be of date or timestamp type. This column is used to determine the lagged period for lag calculations. If a string is provided, it is resolved as a Column.
slice_columns (list[str | pyspark.sql.Column]) – An optional list of Columns or strings to define the slices of the dataframe. Within each slice a lag calculation will be generated based on the date_column + interval. If strings are provided, they are resolved as Columns.
steps (int) – The number of step_units that define a lag interval.
step_unit (Literal['DAY', 'WEEK', 'MONTH', 'YEAR', 'HOUR', 'MINUTE', 'SECOND']) – The unit of length of the lag period, and when combined with steps, equals the lag interval.
- Return type:
pyspark.sql.DataFrame
Examples#
Basic example of a 1-day lag of the value column based on the date column.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 2)}, {"value": 50, "color": "red", "date": date(2024, 1, 1)}, ]) display( A.add_lag_columns( df, value_columns=["value"], date_column="date", step_unit="DAY", steps=1, ) )
color
date
value
value_lag_1_day
red
2024-01-02
100
50
red
2024-01-01
50
null
Example of a 2-day lag of the value column based on the date column. Notice the lag column name changed to reflect the interval.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 250, "color": "red", "date": date(2024, 1, 5)}, {"value": 200, "color": "red", "date": date(2024, 1, 4)}, {"value": 150, "color": "red", "date": date(2024, 1, 3)}, {"value": 100, "color": "red", "date": date(2024, 1, 2)}, {"value": 50, "color": "red", "date": date(2024, 1, 1)}, ]) display( A.add_lag_columns( df, value_columns=["value"], date_column="date", step_unit="DAY", steps=2, ) )
color
date
value
value_lag_2_day
red
2024-01-05
250
150
red
2024-01-04
200
100
red
2024-01-03
150
50
red
2024-01-02
100
null
red
2024-01-01
50
null
Example of a 2-week lag of the value column based on the date column. Notice how the dataframe has a weekly periodicity, and the data is still lagged correctly because a self-join is performed in the operation.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 250, "color": "red", "date": date(2024, 1, 29)}, {"value": 200, "color": "red", "date": date(2024, 1, 22)}, {"value": 150, "color": "red", "date": date(2024, 1, 15)}, {"value": 100, "color": "red", "date": date(2024, 1, 8)}, {"value": 50, "color": "red", "date": date(2024, 1, 1)}, ]) display( A.add_lag_columns( df, value_columns=["value"], date_column="date", step_unit="WEEK", steps=2, ) )
color
date
value
value_lag_2_week
red
2024-01-29
250
150
red
2024-01-22
200
100
red
2024-01-15
150
50
red
2024-01-08
100
null
red
2024-01-01
50
null
Example of a 1-day lag of the value column using the color column for slices. Notice that the lag is applied within each unique slice of the dataframe.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 2)}, {"value": 50, "color": "red", "date": date(2024, 1, 1)}, {"value": 150, "color": "blue", "date": date(2024, 1, 2)}, {"value": 75, "color": "blue", "date": date(2024, 1, 1)}, ]) display( A.add_lag_columns( df, value_columns=["value"], date_column="date", slice_columns=["color"], step_unit="DAY", steps=1, ) )
color
date
value
value_lag_1_day
red
2024-01-02
100
50
red
2024-01-01
50
null
blue
2024-01-02
150
75
blue
2024-01-01
75
null
- etl_toolkit.A.add_percent_of_total_columns(df, value_columns, total_grouping_columns, suffix='percent')[source]#
Add additional column(s) to the dataframe that equal the percent of each row of the
value_columnsgiven the sum of the values across thetotal_grouping_columns.Each percent column added will have a standard naming convention that is the <value_column>_<suffix> (ex: “gmv” -> “gmv_percent”). The default
suffixis “percent”, but this can be adjusted.Tip
It is recommended this function is used whenever a percent of total operation is needed. The implementation uses a group by + join under the hood, which is more performant than a window expression.
- Parameters:
df (pyspark.sql.DataFrame) – The dataframe to add percent of total columns to
value_columns (list[str | pyspark.sql.Column]) – A list of Columns or strings to generate percent of total columns. Each column specified will have a percent of total column added to the output dataframe. If a strings are provided, they will be resolved as Columns.
total_grouping_columns (list[str | pyspark.sql.Column]) – A list of Columns or strings to define the slices of the dataframe. The sum of the value column for each unique combination of values in this list will be used as the denominator for the percentage column. If a strings are provided, they will be resolved as Columns.
suffix (str) – The suffix that is added to each percent column generated for the output dataframe.
- Return type:
pyspark.sql.DataFrame
Examples#
Example of generating percent of total columns based on the color column for groupings.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 2)}, {"value": 50, "color": "red", "date": date(2024, 1, 1)}, ]) display( A.add_percent_of_total_columns( df, value_columns=["value"], total_grouping_columns=["color"], ) )
color
date
value
value_percent
red
2024-01-02
100
0.666666
red
2024-01-01
50
0.333333
Example of modifying the suffix argument to adjust the new column names.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 2)}, {"value": 50, "color": "red", "date": date(2024, 1, 1)}, ]) display( A.add_percent_of_total_columns( df, value_columns=["value"], total_grouping_columns=["color"], suffix="percent_of_total", ) )
color
date
value
value_percent_of_total
red
2024-01-02
100
0.666666
red
2024-01-01
50
0.333333
Example of generating multiple percent of total columns based on the date column for groupings.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1), "count": 5}, {"value": 50, "color": "blue", "date": date(2024, 1, 1), "count": 10}, ]) display( A.add_percent_of_total_columns( df, value_columns=["value", "count"], total_grouping_columns=["date"], ) )
color
date
value
count
count_percent
value_percent
red
2024-01-01
100
5
0.333333
0.666666
blue
2024-01-01
50
10
0.666666
0.333333
Card Analyses#
Suite of functions that perform common adjustments on card (Skywalker, Yoda, etc.) datasets derived dataframes, including lag adjustments, weighting, and paneling. It is recommended these functions are used rather than implementing manually as they are common enhancements to normalize the data trends typically found in these datasets.
- etl_toolkit.A.add_card_date_adjustment(df, date_column_name='date', group_column_names=None, adjustment_type='revenue', correct_jan_24=True, dataset='skywalker', source_country_list=['uk_de_at', 'fr'], amount_column='amount_usd')[source]#
Modifies the
date_column_namespecified for the input dataframe (df) to account for delays in processing transactions. For Skywalker data, this is based on theyipit_cobrand_idand cardsource. For Mando data, this is based on thesource_countryandcard_type. The date adjustment is determined by theadjustment_type, which indicates either thecountof transactions or totalrevenuefor the transactions observed during each period. This adjustment is necessary given delays in processing transactions due to holidays or non-business days that can distort the actual date a transaction occurred.The additional or modified columns added to the input
dffor this adjustment include:<date_column_name>: This column will be modified to reflect the corrected date based on the expected lag. Date type.<date_column_name>_raw: A new column added that indicates the original, non-adjusted date for the transaction. Date type.
- Parameters:
df (pyspark.sql.DataFrame) – Input dataframe to add adjustment columns to through this function. It should be generated from the txns_all table from either Skywalker or Mando and parsed for specific merchant(s).
date_column_name (str) – The date column name of the
dfto adjust. This function will correct the dates of transactions within this column. Defaults todate.group_column_names (Optional[list[str]]) – An optional list of grouping columns to generate lag adjustments within. This can be used if the input dataframe spans multiple slices where the date behavior can be different. Defaults to None, which indicates there are no slices to group by for this adjustment.
adjustment_type (Literal['revenue', 'count']) – Indicates whether the date adjustment should be calculated based on
revenueor transactioncount. Defaults torevenue.correct_jan_24 (bool) – Optional flag to control if a subset of 2024-01-23 transactions should be assigned to 2024-01-22 via a deterministic sample. Defaults to
True.dataset (Literal['skywalker', 'mando']) – The source dataset that the input
dfis derived from. Can be eitherskywalkerormando. Defaults toskywalker.source_country_list (List) – For Mando data only. List of countries to process. Must be one or both of
uk_de_atandfr. Defaults to["uk_de_at", "fr"].amount_column (str) – For Mando data only. The name of the amount column to use for revenue adjustments. Defaults to
amount_usd.
- Return type:
pyspark.sql.DataFrame
Examples#
Example of applying the date adjustment to a Skywalker dataset. Note how the date column values are changed and a new date_raw column is added with the original values.#from etl_toolkit import E, F, A cmg_txns = spark.table(f'yd_production.cmg_silver.txns_paneled') display( A.add_card_date_adjustment(cmg_txns) )
source
yipit_cobrand_id
date_raw
…
date
bank
1
2019-02-19
…
2019-02-16
bank
1
2019-02-19
…
2019-02-16
Example of applying the date adjustment to a Mando dataset for specific countries.#from etl_toolkit import E, F, A eu_txns = spark.table(f'yd_production.eu_silver.txns_paneled') display( A.add_card_date_adjustment( eu_txns, dataset="mando", source_country_list=["uk_de_at"], amount_column="amount_eur" ) )
card_type
source_country
date_raw
…
date
DEBIT
uk_de_at
2019-02-19
…
2019-02-16
CREDIT
uk_de_at
2019-02-19
…
2019-02-19
- etl_toolkit.A.add_card_day_of_week_lag_adjustment(df, threshold=84, dataset='yoda', transaction_type='debit', panel_ids=None, group_column_names=None, max_adjustment=5.0, use_central_adjustment=False, enable_paneling=False)[source]#
Add additional column(s) to an input dataframe,
df, of card data that calculates adjustments to account for the merchant-specific lag of when transactions (txns) are processed. The lag addressed in this function is due to the day-of-week (“DOW”) a transaction falls on.The additional columns added to the input
dffor this adjustment include:txns: The number of transactions found within each grouping of thegroup_column_names, the dataset’s card type, the DOW of the transaction, and the number of days (lag) between the transaction and the max file date of the df. Int column type.txns_percent_fill: Percentage oftxnsthat have been observed by the above grouping, ordered by the lag calculation. Double column type.lag_adjustment: Equal to inverse of thetxns_percent_fill, this is the adjustment factor that should be multiplied to the transaction amount to account for the DOW lag behavior. Defaults to 1 if no lag is observed. Double column type.
For further context, certain merchants may have lag patterns that are different from the lag pattern observed when looking at the whole card dataset. For these merchants, using this lag adjustment functions allows you to calculate a more accurate lag adjustment.
For example in Skywalker, we have noticed transactions from merchants like Toast and Starbucks arrive faster than typical Skywalker transactions, which causes any central lag adjustment to consistently overestimate for these merchants.
This trend is not as prominent in Yoda, however, it is still preferred to use a merchant-specific lag adjustment.
- Parameters:
df (pyspark.sql.DataFrame) – Input dataframe to add adjustment columns to through this function. It should be generated from the txns_all table from Skywalker or Yoda table and parsed for specific merchant(s).
threshold (int) – An integer indicating the number of days prior to the maximum file date to calculate the lag adjustment for each cobrand, card type, and any other grouping columns. This value must be between 63 and 126 days.
dataset (Literal['skywalker', 'yoda', 'mando']) – The source dataset that the input
dfis derived from. Can be eitheryodaorskywalker. Defaults toyoda.transaction_type (Literal['debit', 'credit']) – The type of transactions that should be used to calculate the adjustment. Can be either
creditordebitand is only applied whendataset="skywlaker". Defaults todebit.panel_ids (Optional[list[str]]) – A list of strings indicated the Panel ID(s) (panel table names) that this adjustment should account for. Defaults to panel
fixed_201901_100for Yoda andfixed_201901_111_exfor Skywalker.group_column_names (Optional[list[str]]) – An optional list of grouping columns to generate lag adjustments within. This can be used if the input dataframe spans multiple slices where the lag behavior can be different. Defaults to
["cobrid"]for Yoda and["yipit_cobrand_id"]for Skywalker.max_adjustment (float) – An upper bound value to apply for the final
lag_adjustmentcolumn. Defaults to5.0, which indicates the adjustment value cannot be a factor greater than 5.use_central_adjustment (bool) – Deprecated feature. An optional boolean to use the central lag adjustment value from the source dataframe. Using merchant-specific DOW adjustments is preferred. Defaults to
False.enable_paneling (bool)
- Return type:
pyspark.sql.DataFrame
Tip
One of the arguments for this function is
threshold, a couple of notes on this:Lag behavior can change over time, if your merchant’s lag behavior has changed in the last 126 days, you may want to limit the
thresholdto a lower number.It may be beneficial to alter the threshold, however, if you do, you should test several different thresholds and QA the results.
Examples#
Example of applying day of week adjustments to a Yoda dataset.#from etl_toolkit import E, F, A tost_txns = spark.table("yd_production.tost_silver.txns_paneled") display( A.add_card_day_of_week_lag_adjustment( tost_txns, dataset="yoda", ) )
cobrid
cardtype
trans_dow
…
txns
txns_percent_fill
lag_adjustment
175
DEBIT
4
…
1154536
0.890645002553501
1.1227818009790382
174
DEBIT
5
…
null
null
1
Example of applying day of week adjustments to a Yoda dataset while setting amax_adjustment. Note how it establishes a ceiling for thelag_adjustmentcolumn.#from etl_toolkit import E, F, A tost_txns = spark.table("yd_production.tost_silver.txns_paneled") display( A.add_card_day_of_week_lag_adjustment( tost_txns, dataset="yoda", max_adjustment=1.1, ) )
cobrid
cardtype
trans_dow
…
txns
txns_percent_fill
lag_adjustment
175
DEBIT
4
…
1154536
0.890645002553501
1.1
174
DEBIT
5
…
null
null
1
- etl_toolkit.A.add_card_paneling(df, dataset='yoda', panel_ids=None, add_geo_weights=True, add_income_weights=True, add_card_type_weights=True, qa=False)[source]#
QA Mode support
Applies card paneling on the input dataframe (
df) using the specifiedpanel_ids(panel table names) anddataset(Skywalker, Yoda, or Mando). Transactions where the user is not a member of the panel will be filtered out unlessqa=True. In addition, various weight columns will be added based on the panel to account for geography, income, and card type biases in the dataset. Currently only one panel ID can be provided.Note
For Mando datasets, only basic user paneling is supported. Income weights, card type weights, and geography weights are not yet supported for Mando data.
A fully adjusted transaction amount should multiply geopgraphy, income, card type weights, and any other adjustments (ex: lag_adjustment) to the transaction amount.
The additional columns added to the input
dffor this adjustment include:is_in_panel: This column indicates if the transaction’s user is a member of any of the Panel ID(s) specified . Boolean type.geo_weight: This column includes the geo weight value for a given transaction given its panelist and panel ID. Double type. Only available for Skywalker and Yoda datasets.income_weight: This column includes the income weight value for a given transaction given its panelist and panel ID. Double type. Only available for Skywalker and Yoda datasets.card_type_weight: This column includes the card type weight value for a given transaction given its panelist and panel ID. Double type. Only available for Skywalker and Yoda datasets.
- Parameters:
df (pyspark.sql.DataFrame) – Input dataframe to add adjustment columns to through this function. It should be generated from the txns_all table from Skywalker, Yoda or Mando table and parsed for specific merchant(s).
dataset (Literal['skywalker', 'yoda', 'mando']) – The source dataset that the input
dfis derived from. Can be eitheryoda,skywalker, ormando. Defaults toyoda.panel_ids (Optional[list[str]]) – A list of strings indicated the Panel ID(s) (panel table names) that this adjustment should account for. Defaults to panel
fixed_201901_100for Yoda,fixed_201901_111_exfor Skywalker, and appropriate geographic panels for Mando.add_geo_weights (bool) – Boolean flag to control if a
geo_weightcolumn should be added to the dataset. Only applicable for Skywalker/Yoda. Defaults toTrue.add_income_weights (bool) – Boolean flag to control if an
income_weightcolumn should be added to the dataset. Only applicable for Skywalker/Yoda. Defaults toTrue.add_card_type_weights (bool) – Boolean flag to control if a
card_type_weightcolumn should be added to the dataset. Only applicable for Skywalker/Yoda. Defaults toTrue.qa (bool) – Boolean flag to control QA mode for this function. When
True, then all rows are preserved and theis_in_panelcolumn indicates if the transaction was in any of the Panel IDs specified. Defaults toFalse, where non-panel transactions are filtered out.
- Return type:
pyspark.sql.DataFrame
Examples#
Example of applying card paneling to a Yoda dataset. Note the default Yoda panel is used here.#from etl_toolkit import E, F, A tost_txns = spark.table("yd_production.tost_silver.txns_parsed") display( A.add_card_paneling( tost_txns, dataset="yoda", ) )
cobrid
cardtype
trans_dow
…
is_in_panel
geo_weight
income_weight
card_type_weight
175
DEBIT
4
…
True
0.6403014580204711
0.6190908069398377
2.9107223403194861
174
DEBIT
5
…
True
0.4155116960839209
0.6190908069398377
2.9107223403194861
Example of applying card paneling to a Yoda dataset when QA mode is enabled. Note that out of panel transactions are included.#from etl_toolkit import E, F, A tost_txns = spark.table("yd_production.tost_silver.txns_parsed") display( A.add_card_paneling( tost_txns, dataset="yoda", ) )
cobrid
cardtype
trans_dow
…
is_in_panel
geo_weight
income_weight
card_type_weight
175
DEBIT
4
…
True
0.6403014580204711
0.6190908069398377
2.9107223403194861
174
DEBIT
5
…
True
0.4155116960839209
0.6190908069398377
2.9107223403194861
174
DEBIT
5
…
False
0.4155116960839209
0.6190908069398377
2.9107223403194861
Example of applying card paneling to a Yoda dataset with a specific Panel ID and not including card type weights.#from etl_toolkit import E, F, A tost_txns = spark.table("yd_production.tost_silver.txns_parsed") display( A.add_card_paneling( tost_txns, dataset="yoda", panel_ids=["fixed_201901_333_cbs_green_red_teal"], add_card_type_weights=False, ) )
cobrid
cardtype
trans_dow
…
is_in_panel
geo_weight
income_weight
175
DEBIT
4
…
True
0.6403014580204711
0.6190908069398377
174
DEBIT
5
…
True
0.4155116960839209
0.6190908069398377
Example of applying card paneling to a Mando dataset. Note how only the is_in_panel column is added.#from etl_toolkit import E, F, A mando_txns = spark.table("yd_3p_mando.mando_gold.txns_parsed") display( A.add_card_paneling( mando_txns, dataset="mando", ) )
- etl_toolkit.A.add_card_paneling_reweighted(df, dataset='yoda', base_panel_id=None, weighting_panel_id=None, panel_overlap_start_date=date(2021, 1, 1), panel_overlap_end_date=date(2021, 12, 31), reweight_min=0.5, reweight_max=2.0)[source]#
Applies a base panel using A.add_card_paneling and then uses a weighted panel to generate weights for shifting merchant volumes by merchant. This combines two panels - a base panel for the main paneling and weights, and a weighting panel used to determine merchant reweighting factors.
- Parameters:
df (pyspark.sql.DataFrame) – Input dataframe of transactions data
dataset (Literal['skywalker', 'mando', 'yoda']) – The source dataset the df belongs to
base_panel_id (Optional[str]) – The base panel ID for the dataset. If not provided the default panel is used based on the dataset.
weighting_panel_id (Optional[str]) – The weighting panel ID for the dataset. If not provided the default weighting panel is used based on the dataset.
panel_overlap_start_date (datetime.date) – The minimum date the provided panel IDs overlap, used to generate a fixed window to determine reweighting
panel_overlap_end_date (datetime.date) – The maximum date the provided panel IDs overlap, used to generate a fixed window to determine reweighting
reweight_min (float) – Overall minimum value for the reweight column
reweight_max (float) – Overall maximum value for the reweight column
- Returns:
A dataframe of transactions for the given dataset, with paneling applied and a reweight column added to indicate the adjustment.
- Return type:
pyspark.sql.DataFrame
Examples#
Example of applying card paneling reweighting with default panels#from etl_toolkit import A df = spark.table("yd_production.tost_silver.txns_parsed") panel_df = A.add_card_paneling_reweighted( df, dataset="yoda", )
Example of applying card paneling reweighting with custom panel IDs#from etl_toolkit import A from datetime import date df = spark.table("yd_production.tost_silver.txns_parsed") panel_df = A.add_card_paneling_reweighted( df, dataset="yoda", base_panel_id="fixed_201901_202203_100", weighting_panel_id="fixed_202101_202403_100", panel_overlap_start_date=date(2021,1,1), panel_overlap_end_date=date(2021,12,31) )
- etl_toolkit.A.get_card_panels(dataset='yoda')[source]#
Utility function to list all available Panel IDs for a given
dataset. The Panel IDs can be used in thepanel_idsargument for ETL toolkit card analyses.- Parameters:
dataset (Literal['skywalker', 'yoda']) – The dataset to retrieve available Panel IDs from. Can be either
yodaorskywalker. Defaults toyoda.- Return type:
pyspark.sql.DataFrame
Examples#
Example of retrieving all Yoda Panel IDs#from etl_toolkit import E, F, A display( A.get_card_panels( dataset="yoda", ) )
panel_id
fixed_201701_100
fixed_201701_100_card_weights
…
Example of retrieving all Skywalker Panel IDs#from etl_toolkit import E, F, A display( A.get_card_panels( dataset="skywalker", ) )
panel_id
fixed_201701_333_ex
fixed_201701_333_ex_card_weights
…
- etl_toolkit.A.source_card_transactions(dataset, start_date=None, end_date=None, transaction_types=None, group_column_names=None)[source]#
Returns a DataFrame of card transactions from the specified source dataset, with optional filtering for date ranges and transaction types. This function provides a standardized way to access card transaction data across different datasets while handling their unique schemas and storage patterns.
The function supports both single-table datasets (like Mando) where transaction types are distinguished by a column value, and multi-table datasets (like Skywalker/Yoda) where different transaction types are stored in separate tables.
Tip
When working with card transaction data, it’s recommended to use this function rather than directly accessing the source tables to ensure consistent filtering and proper handling of dataset-specific schemas.
- Parameters:
dataset (Literal['skywalker', 'mando', 'yoda']) – The source dataset to retrieve transactions from. Must be one of “skywalker”, “mando”, or “yoda”.
start_date (Optional[datetime.date]) – Optional start date to filter transactions. If provided, only includes transactions on or after this date.
end_date (Optional[datetime.date]) – Optional end date to filter transactions. If provided, only includes transactions on or before this date.
transaction_types (Optional[List[str | CardTransactionType]]) – Optional list of transaction types to include. Can use either string values or CardTransactionType enum. Valid values are “transactions”, “deposits”, or “all”. Defaults to [“transactions”] if not specified. If “all” is included in the list, other types are ignored.
group_column_names (Optional[List[str]]) – Optional list of grouping columns to use for this dataset, overriding defaults
- Returns:
A DataFrame containing the filtered card transactions from the specified dataset.
- Raises:
InvalidInputException – If any validation checks fail
- Return type:
pyspark.sql.DataFrame
Examples#
Basic usage to get recent Mando transactions#from datetime import date from etl_toolkit import A # Get Mando transactions for January 2024 transactions_df = A.source_card_transactions( dataset="mando", start_date=date(2024, 1, 1), end_date=date(2024, 1, 31) )
Get both transactions and deposits from Skywalker#from etl_toolkit import A from etl_toolkit.analyses.card.source import CardTransactionType # Get all Skywalker transaction types using enum all_activity_df = A.source_card_transactions( dataset="skywalker", transaction_types=[ CardTransactionType.TRANSACTIONS, CardTransactionType.DEPOSITS ] ) # Alternative using string values all_activity_df = A.source_card_transactions( dataset="skywalker", transaction_types=["transactions", "deposits"] )
Get all transaction types using the ALL type#from etl_toolkit import A from etl_toolkit.analyses.card.source import CardTransactionType # Get all transaction types from Mando all_df = A.source_card_transactions( dataset="mando", transaction_types=[CardTransactionType.ALL] ) # Alternative using string value all_df = A.source_card_transactions( dataset="mando", transaction_types=["all"] )
Get only deposits from a specific date range#from datetime import date from etl_toolkit import A # Get Yoda deposits for Q1 2024 deposits_df = A.source_card_transactions( dataset="yoda", start_date=date(2024, 1, 1), end_date=date(2024, 3, 31), transaction_types=["deposits"] )
- etl_toolkit.A.card_coverage_metrics(dataframe, metric_definitions, periodicity='QUARTER', time_period_grouping_columns=None, additional_grouping_columns=['ticker'], transaction_date_column='trans_date', time_period_ordering_column=None, normalize_by_days=False)[source]#
Calculates coverage ratios by comparing observed values to reported metrics across time periods. Coverage metrics help assess data completeness by measuring the difference between transactions captured in card data versus those reported in company filings.
The function adds coverage calculations and lag-based estimates: - Basic coverage ratios (observed/reported) - Period-over-period coverage changes - Lag-adjusted metrics accounting for historical patterns - Seasonal adjustments (for quarterly data) - Recent trend indicators (for daily data)
- Parameters:
dataframe (pyspark.sql.DataFrame) – DataFrame containing transaction and reported metric data
metric_definitions (list[coverage_metric]) – List of coverage_metric objects defining metrics to calculate
periodicity (Literal['QUARTER', 'DAY']) – Analysis granularity, either “QUARTER” or “DAY”. Defaults to “QUARTER”
time_period_grouping_columns (Optional[list[Union[str, pyspark.sql.Column]]]) – Columns to group timestamps. Defaults to [“quarter”, “start_date”, “end_date”] for quarters, [“date”] for days
additional_grouping_columns (list[Union[str, pyspark.sql.Column]]) – Additional grouping dimensions (e.g. ticker). Defaults to [“ticker”]
transaction_date_column (Union[str, pyspark.sql.Column]) – Column containing transaction dates. Defaults to “trans_date”
time_period_ordering_column (Optional[Union[str, pyspark.sql.Column]]) – Column for ordering periods. Defaults to “start_date” or “date” based on periodicity
normalize_by_days (bool) – Whether to adjust for partial data periods. Defaults to False
- Return type:
pyspark.sql.DataFrame
Examples#
Calculate quarterly coverage metrics with base metrics#from etl_toolkit import F, A # Define metrics to analyze metrics = [ A.coverage_metric( name='revenue_total', observed_column_name='adj_trans_amount', reported_column_name='reported_total' ) ] quarterly_coverage = A.card_coverage_metrics( transactions_df, metric_definitions=metrics, periodicity='QUARTER' ) display(quarterly_coverage)
Calculate daily coverage with normalization and channel splits#from etl_toolkit import F, A metrics = [ A.coverage_metric( name='revenue_total', observed_column_name='adj_trans_amount', reported_column_name='reported_total' ), A.coverage_metric( name='revenue_online', observed_column_name='adj_trans_amount', reported_column_name='reported_online', filter=F.col('channel')=='ONLINE' ) ] daily_coverage = A.card_coverage_metrics( transactions_df, metric_definitions=metrics, periodicity='DAY', normalize_by_days=True ) display(daily_coverage)
- etl_toolkit.A.coverage_metric()[source]#
Defines a metric for calculating coverage ratios between observed and reported values.
- Parameters:
name – Name of the metric (must be alphanumeric/lowercase/underscore)
observed_column_name – Column containing the observed values to measure
reported_column_name – Column containing the reported values to compare against
filter – Optional filter condition to apply before aggregation
Examples#
from etl_toolkit import F, A # Basic metric definition revenue_metric = A.coverage_metric( name='revenue_total', observed_column_name='adj_trans_amount', reported_column_name='reported_total' ) # Metric with filter condition online_revenue = A.coverage_metric( name='revenue_online', observed_column_name='adj_trans_amount', reported_column_name='reported_online', filter=F.col('channel')=='ONLINE' )
E-Receipt Analyses#
- etl_toolkit.A.source_ereceipts(receipt_type, vendor_list=None, country=None, include_duplicates=False, granularity=None, add_app_id=False, add_demo_columns=False)[source]#
Fetch a DataFrame of Edison e-receipts with filtering and transformations. This function standardizes e-receipts access, ensuring consistency across modules.
Tip
Use this function instead of directly accessing source tables for consistent filtering and schema handling.
- Parameters:
receipt_type (Literal['clean_headers', 'purchase', 'flights', 'hotels', 'rental_cars', 'rides', 'subscriptions', 'walmart']) – Type of receipt data to retrieve. Supported types: - clean_headers - purchase - subscriptions - rental_cars - hotels - flights - rides - walmart
vendor_list (list[str]) – List of vendors to filter. Must be non-empty.
country (Optional[Literal['us', 'intl', 'ca', 'all', 'US', 'INTL', 'CA', 'ALL']]) – Optional. Valid values: ALL, US, CA, INTL. Default varies by type.
include_duplicates (Optional[bool]) – Include duplicate entries. Default: False. Some types disallow duplicates.
granularity (Literal['orders', 'items']) – Specify granularity level. Valid: items, orders. Default depends on receipt type.
add_app_id (Optional[bool]) – Include application ID. Default: False. Supported only for certain receipt types.
add_demo_columns (Optional[bool]) – Add demographic columns. Default: False.
- Returns:
Filtered e-receipts DataFrame.
- Raises:
InvalidInputException – On validation failure.
- Return type:
pyspark.sql.DataFrame
### Examples#
#### Clean Headers Retrieve receipts for specific vendors in the US:
clean_headers_df = A.source_ereceipts( receipt_type="clean_headers", vendor_list=["Uber", "Walmart"], # at least 1 vendor required country="ALL" # defaults to ALL for clean headers )
#### Including Duplicates Allow duplicates (if supported):
purchase_df = A.source_ereceipts( receipt_type="purchase", vendor_list=["Walmart"], country="US" # defaults to US for purchase include_duplicates=True # defaults to False for all receipt types )
#### Adjusting Granularity Retrieve order-level data:
subscriptions_df = A.source_ereceipts( receipt_type="purchase", vendor_list=["UberEats"], granularity="orders" # defaults to "items" for purchase )
#### Adding Extra Columns Include app IDs and user demographic columns:
purchase_with_details = A.source_ereceipts( receipt_type="purchase", vendor_list=["Walmart"], add_app_id=True, add_demo_columns=True )
Validation Rules#
### Receipt Type-Specific Behavior
+-------------------+-----------------+------------+-----------------+--------+--------------+ | Receipt Type | country | duplicates | granularity | app ID | demo columns | +-------------------+-----------------+------------+-----------------+--------+--------------+ | clean_headers | ALL | No | N/A | No | Yes | +-------------------+-----------------+------------+-----------------+--------+--------------+ | purchase | US | Yes | items, orders | Yes | Yes | +-------------------+-----------------+------------+-----------------+--------+--------------+ | subscriptions | ALL | No | items, orders | Yes | Yes | +-------------------+-----------------+------------+-----------------+--------+--------------+ | rental_cars | ALL | Yes | orders | No | Yes | +-------------------+-----------------+------------+-----------------+--------+--------------+ | hotels | ALL | Yes | orders | No | Yes | +-------------------+-----------------+------------+-----------------+--------+--------------+ | flights | ALL | Yes | items | No | Yes | +-------------------+-----------------+------------+-----------------+--------+--------------+ | rides | ALL | No | items, orders | No | Yes | +-------------------+-----------------+------------+-----------------+--------+--------------+ | walmart | ALL | Yes | items, orders | No | Yes | +-------------------+-----------------+------------+-----------------+--------+--------------+
Dedupe Analyses#
Suite of functions to handle deduplication operations on dataframes. The returned output will be a dataframe with unique rows based on some condition. These functions can be easier to read/write than trying to handle deduping in native pyspark.
- etl_toolkit.analyses.dedupe.dedupe_by_condition(df, condition, qa=False)[source]#
QA Mode support
Function that applies a generic deduplication transformation where rows will be excluded if the supplied condition does not equal the expected_value. The expected value is 1 as a common use case is to dedupe based on a row number over a custom window expression.
If
qa=True, then all rows will be preserved, and adedupe_indexcolumn matching the condition will be added to the returned dataframe. This can be useful when investigating the behavior of deduplication for QA purposes.- Parameters:
df (pyspark.sql.DataFrame) – DataFrame to deduplicate
condition (str | pyspark.sql.Column) – A pyspark Column or string that should return a 1 if the row is meant to be preserved. If a string is provided, it is resolved as a Column.
qa (bool) – Boolean flag to control QA mode for this function. When
True, then all rows are preserved and adedupe_indexcolumn is added to the dataframe that stores the value of thecondition. IfFalse, rows that with a condition value that does not equal 1 are dropped.
- Return type:
pyspark.sql.DataFrame
Dedupe by taking the first value of rows for each color based on the date column. Note that we use a Window expression to define the condition.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) window = W.partitionBy("color").orderBy("date") display( A.dedupe_by_condition( df, condition=F.row_number().over(window), ) )
color
date
output
blue
2023-01-01
50
red
2023-01-01
1000
When using QA mode, the function will include all rows and adedupe_indexindicating the value of the condition.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) window = W.partitionBy("color").orderBy("date") display( A.dedupe_by_condition( df, condition=F.row_number().over(window), qa=True, ) )
color
date
output
dedupe_index
blue
2023-01-01
50
1
red
2023-01-01
1000
1
red
2023-01-01
100
2
- etl_toolkit.analyses.dedupe.dedupe_by_row_number(df, dedupe_columns, order_columns, qa=False)[source]#
QA Mode support
Function that dedupes rows from the supplied
dfby dropping rows that are sorted and the row number != 1. The row ordering is handled within each window partition of thededupe_columnsand then sorted in order oforder_columns(i.e. the columns produce a window expressionF.row_number().over(W.partitionBy(*dedupe_columns).orderBy(*order_columns))).If
qa=True, then all rows will be preserved, and adedupe_indexcolumn indicating the row number will be added to the returned dataframe. This can be useful when investigating the behavior of deduplication for QA purposes.Caution
The deduplication logic in this function uses
F.row_numberfrom pyspark. If a different ranking function is preferred, for ex.F.rankorF.dense_rank, useA.dedupe_by_conditionwith a custom window expression.- Parameters:
df (pyspark.sql.DataFrame) – DataFrame to deduplicate
dedupe_columns (list[str | pyspark.sql.Column]) – A list of pyspark columns or strings that define the window partitions of the dataframe to sort the rows by. In most cases, the this will be the columns that define a row’s uniqueness. If strings are provided, they will be resolved as Columns.
order_columns (list[str | pyspark.sql.Column]) – A list of pyspark columns or strings that sorts rows within each window partition of the dataframe. This sorting will determine the row_number. Be explicit in sorting ascending or descending when specifying these columns. If strings are passed they are resolved as Columns.
qa (bool) – Boolean flag to control QA mode for this function. When
True, then all rows are preserved and adedupe_indexcolumn is added to the dataframe that stores the row_number. IfFalse, rows with row numbers that do not equal 1 are dropped.
- Return type:
pyspark.sql.DataFrame
Examples#
Dedupe by taking the first value of rows for each color based on the date column#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) display( A.dedupe_by_row_number( df, dedupe_columns=["color"], order_columns=["date"], ) )
color
date
value
blue
2023-01-01
50
red
2023-01-01
1000
When using the QA mode all rows are included and adedupe_indexcolumn is added that indicates the row number.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) display( A.dedupe_by_row_number( df, dedupe_columns=["color"], order_columns=["date"], qa=True, ) )
color
date
value
dedupe_index
blue
2023-01-01
50
1
red
2023-01-01
1000
1
red
2023-01-01
100
2
Dedupe by taing the last value of rows for each color based on the date column. Note that implementing descending order is via theF.descfunction in pyspark.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) display( A.dedupe_by_row_number( df, dedupe_columns=["color"], order_columns=[F.desc("date")], ) )
color
date
value
blue
2023-01-01
50
red
2024-01-01
100
Index Analyses#
Suite of functions to generate index metrics that can reveal the trajectory of a company KPI without directly benchmarking to it. These index columns can be tricky to calculate in native pyspark, so these functions can assist in the standardization of how those are calculated.
- etl_toolkit.A.index_from_rolling_panel(input_df, start_date, date_column, metrics, slices=None, panel_periodicity='MONTH', index_periodicity='DAY', panel_end_date_column='panel_end_date', user_create_date_column='create_time', spark=None)[source]#
Returns a dataframe that maps the input dataframe to a series of indices for each metric provided. The indices are calculated through an aggregate function, a base value, and a rolling panel methodology, where the panel is fixed for 2 consecutive periods. The period length can be configured using
panel_periodicity.Indices can be broken down across various
slicesto visualize trends on key segments of the dataset.Multiple metrics can be specified, these will be available as different columns in the output dataframe.
Note
This function is expected to work with Edison (e-receipt) derived dataframes. The rolling panel methodology is designed around the fluctuating behavior of the Edison user panel.
- Parameters:
input_df (pyspark.sql.DataFrame) – The input dataframe to use to generate a series of indices.
start_date (datetime.datetime | datetime.date) – The minimum start date of the
input_df, all metrics will be filtered on or after this date.date_column (pyspark.sql.Column | str) – A Column or string that indicates the date column of the
input_dfto use in paneling and index logic. If a string is used, it is referenced as a Column.metrics (dict[str, dict]) – A dict of various metric configurations to generate indices. An index will be created for each metric defined in this dictionary. Each metric config must have an
aggregationColumn and astart_valuedefined for the index.slices (list[str | pyspark.sql.Column]) – Optional list of string or columns that define the slices with in the dataframe. If slices are used, the indices generated will be for each slice within the
input_df. If strings are used in the list they are resolved as Columns.panel_periodicity (Literal['WEEK', 'MONTH', 'QUARTER']) – The unit of length for the inverval of each paneling window.
index_periodicity (Literal['DAY', 'WEEK', 'MONTH', 'QUARTER']) – The unit of length for the inverval of each indexing window. A row will be generated for each period based on this argument.
panel_end_date_column (pyspark.sql.Column | str) – The Column to use when determing the exit date of a user from the panel in the rolling panel methodology.
user_create_date_column (pyspark.sql.Column | str) – The Column to use when determing the state date of a user in the panel in the rolling panel methodology.
spark (pyspark.sql.SparkSession) – Optional argument to pass in a spark session to this function. Normally this is not used and a spark session is generated automatically. This is usually used for library developers.
- Return type:
pyspark.sql.DataFrame
Examples#
Example of a daily index creation for Lyft. This creates two indices,ridesandtotal_chargesbased on different aggregations. The paneling is a rolling-panel based on a weekly interval.#from etl_toolkit import E, A, F source_df = spark.table(f"{DESTINATION_CATALOG}.{SILVER_DATABASE}.deduped_receipts") index_df = A.index_from_rolling_panel( source_df, start_date=date(2017, 1, 2), date_column="adjusted_order_date", metrics= { "rides": { "calculation": F.sum("geo_weight"), "start_value": 13.471719625795382, }, "total_charges": { "calculation": F.sum(total_charges), "start_value": 13.471719625795382, }, }, panel_periodicity="WEEK", index_periodicity="DAY", )
Investor Standard Analyses#
Suite of functions to generate common analyses used for investor research. It is recommended to use these functions as they follow specific standards for research reports and integrate with standard visualizations for charts to simplify publishing workflows.
- etl_toolkit.A.revenue_mix_by_income_bucket(df, revenue_column, date_column, income_bucket_column='yd_income_bucket1', income_weight_column=None, apply_income_weighting=False, periodicity='MONTH', trailing_period=None, interval_period=None, include_growth_rates=False, start_date=None, start_day_of_week='SUNDAY', end_day_of_week=None, calendar=None, spark=None)[source]#
Returns a dataframe of revenue total and percent of total calculated by
income_bucket_columnand the specifiedperiodicity. Can optionally include trailing period calcuations for revenue by income bucket and/or 1-year growth rates for revenue metrics. The input data can also be weighted for income if that was not already applied.- Parameters:
df (pyspark.sql.DataFrame) – Input dataframe that should be used to generate revenue calculations. The dataframe must include the remaining column arguments for this function.
revenue_column (str | pyspark.sql.Column) – Numeric type column on the
dfto use to calculate revenue totals by income bucket.date_column (str | pyspark.sql.Column) – Date type column on the
dfto use to aggregate revenue by the relevantperiodicity.income_bucket_column (str | pyspark.sql.Column) – Column on the
dfthat indicates the income bucket for the given row. Revenue mix will be based on grouping by this column.income_weight_column (Optional[str | pyspark.sql.Column]) – Optional column on the
dfthat indicates the income weighting for a panelized dataset. If specified, the income weight will be used to adjust therevenue_columnbefore aggregating by period and income bucket.apply_income_weighting (bool) – When
True, theincome_weight_columnis used to adjust therevenue_columnbefore aggregating by period and income bucket. WhenFalse, therevenue_columnis directly aggregated by period and income bucket. Default isFalse.periodicity (Literal['WEEK', 'MONTH', 'QUARTER']) – The time resolution to group revenue data on. When specified, the
date_columnwill be rounded to the nearest matching period to aggregate revenue on that period + income bucket.trailing_period (Optional[int]) – Optionally specified as an integer. When specified, additional revenue metrics will be added to the output dataframe that equals trailing n-period average revenue by period and income bucket. The addtional columns will be prefixed by the trailing period used, ex: t3m_revenue for trailing 3-month aggregations.
interval_period (Optional[int]) – Optionally specified as an integer. When specified the first time period and every n-time periods after will be included in the output dataframe. This can be used to display data more effectively for trailing periods, where every nth month is desired in the output.
start_date (Optional[datetime.date | datetime.datetime]) – Optional start date to filter the
date_columnon before aggregating revenue. Thestart_dateshould be consistent with thestart_day_of_week.start_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – When
periodicity='WEEK', this parameter will ensure periods are aggregated on 7-day intervals that start on this specified day of the week. This parameter should not be specified ifend_day_of_weekis specified as it is duplicative.end_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – When
periodicity='WEEK', this parameter will ensure periods are aggregated on 7-day intervals that end on this specified day of the week. This parameter should not be specified ifstart_day_of_weekis specified as it is duplicative.spark (pyspark.sql.SparkSession) – Spark session to use. Generally, this is not needed as the session is automatically generated in databricks. It is used by library developers.
include_growth_rates (bool)
- Return type:
pyspark.sql.DataFrame
Examples#
Generate revenue mix by week and income bucket. Notice how thetime_periodcolumn indicates the end of the week.#from etl_toolkit import E, A, F # cmg_df would be the yoda transactions for CMG df = A.revenue_mix_by_income_bucket( cmg_df, revenue_column="rev", date_column="date", start_date=date(2021, 1, 3), periodicity="WEEK", start_day_of_week="SUNDAY", income_bucket_column="yd_income_bucket2", ) display(df)
time_period
yd_income_bucket
revenue
income_bucket_revenue_percent
2021-01-09
High ($150k+)
22127.71
0.2071751921632703
2021-01-09
Low ($0-60k)
38568.29
0.3611035438424529
2021-01-09
Lower-Middle ($60-100k)
25451.31
0.2382930843839744
2021-01-09
Upper-Middle ($100-150k)
20659.43
0.1934281796103024
2022-01-16
High ($150k+)
22187.53
0.2092824865506127
2022-01-16
Low ($0-60k)
38352.40
0.3617565842338732
2022-01-16
Lower-Middle ($60-100k)
24908.72
0.2349499211492873
2022-01-16
Upper-Middle ($100-150k)
20568.49
0.1940110080662268
Aggregate by monthly periods and income bucket, while also including trailing 3-month metrics. Trailing month metrics will be null for earlier periods where not enough periods have elapsed to make the calculation. This also includes weighting the revenue data by income weight by specifying theincome_weight_columnargument.#from etl_toolkit import E, A, F # cmg_df would be the yoda transactions for CMG df = A.revenue_mix_by_income_bucket( cmg_df, revenue_column="rev", date_column="date", start_date=date(2021, 1, 1), periodicity="MONTH", income_bucket_column="yd_income_bucket2", income_weight_column="income_weight", apply_income_weighting=True, trailing_period=3, ) display(df)
time_period
yd_income_bucket
revenue
income_bucket_revenue_percent
trailing_revenue
trailing_income_bucket_revenue_percent
2021-01-01
High ($150k+)
95083.00
0.2085046527102458
null
null
2021-01-01
Low ($0-60k)
163845.34
0.3592915276434441
null
null
2021-01-01
Lower-Middle ($60-100k)
108469.64
0.2378598260263688
null
null
2021-01-01
Upper-Middle ($100-150k)
88625.41
0.1943439936199410
null
null
2021-02-01
High ($150k+)
83032.50
0.2146970928471732
null
null
2021-02-01
Low ($0-60k)
136581.27
0.3531580923512764
null
null
2021-02-01
Lower-Middle ($60-100k)
92097.17
0.2381355876897015
null
null
2021-02-01
Upper-Middle ($100-150k)
75031.62
0.1940092271118489
null
null
2021-03-01
High ($150k+)
104205.85
0.2085999518166622
94107.12
0.2103242607554216
2021-03-01
Low ($0-60k)
180829.90
0.3619864599887388
160418.84
0.3585273177006854
2021-03-01
Lower-Middle ($60-100k)
118999.91
0.2382147830825141
106522.24
0.23807137636607104
2021-03-01
Upper-Middle ($100-150k)
95513.13
0.1911988051120848
86390.05
0.19307704517782195
Investor Reporting Analyses#
Suite of functions specific to the investor reporting process. It is recommended to use these functions as they follow specific standards for customer-facing assets and reduce time spent on otherwise manual processes.
- etl_toolkit.A.earnings_results_from_gsheet(gsheet_file_id, sheet_id, additional_columns=None, calculate_va_yy_growth=False, yy_growth_tickers=None)[source]#
Returns a dataframe of Earnings Results data retrieved from the specified Gsheet. The dataframe follows the standard Earnings Results schema for Investor reporting.
Warning
When using this function, the Gsheet will need to be permissioned correctly to be read from databricks and has the correct sheet columns and values to be compatible the standard schema.
- Parameters:
gsheet_file_id (str) – The Gsheet file ID that contains earnings results. Can be found in the Gsheet URL.
sheet_id (int) – The Gsheet sheet (tab) ID that contains earnings results. Can be found in the Gsheet URL.
additional_columns (Optional[list[str | pyspark.sql.Column]]) – Additional columns to add to the end of the Earnings results schema. Not typically used, and the default is for no additional columns to be added.
calculate_va_yy_growth (bool) – Flag to control whether Y/Y growth rates should be calculated from VA nominal values. Used if the metric being reported is a growth rate while VA publishes actuals.
yy_growth_tickers (list[str]) – List of tickers that when
calculate_va_yy_growth=True, these will use a the calculated Y/Y growth value from VA nominals. All other tickers will use the VA value as-is.
- Return type:
pyspark.sql.DataFrame
Examples#
Generating an Earnings Results dataframe#from etl_toolkit import E, A, F EARNINGS_FILE_ID = "1w6cXoj6TaAt5WA8gm29DvYg4PNkiwyT4A219g6JS2p8" EARNINGS_SHEET_ID = 1232299054 df = A.earnings_results_from_gsheet(EARNINGS_FILE_ID, EARNINGS_SHEET_ID) display(df)
quarter_start
quarter_end
…
last_updated
2024-01-01
2024-03-31
…
2024-09-18
- etl_toolkit.A.earnings_rankings_from_gsheet(gsheet_file_id, sheet_id)[source]#
Returns a dataframe of Earnings Rankings data retrieved from the specified Gsheet. The dataframe follows the standard Earnings Rankings schema for Investor reporting.
In addition, will automatically add a
va_consensus_valuebased on the values for theva_metric_idandquarter_endcolumns of the input dataframe.Warning
When using this function, the Gsheet will need to be permissioned correctly to be read from databricks and has the correct sheet columns and values to be compatible the standard schema.
- Parameters:
gsheet_file_id (str) – The Gsheet file ID that contains the narrative rankings data. Can be found in the Gsheet URL.
sheet_id (int) – The Gsheet sheet (tab) ID that contains the narrative rankings data. Can be found in the Gsheet URL.
- Return type:
pyspark.sql.DataFrame
Examples#
Generating a Earnings Rankings dataframe#from etl_toolkit import E, A, F EARNINGS_RANKINGS_FILE_ID = "1w6cXoj6TaAt5WA8gm29DvYg4PNkiwyT4A219g6JS2p8" EARNINGS_RANKINGS_SHEET_ID = 2006888117 df = A.earnings_rankings_from_gsheet(EARNINGS_RANKINGS_FILE_ID, EARNINGS_RANKINGS_SHEET_ID) display(df)
quarter_start
quarter_end
…
narrative_ranking
2024-01-01
2024-03-31
…
Green
- etl_toolkit.A.earnings_results_with_backtests(df, backtest_configurations=None, calculate_va_yy_growth=False, yy_growth_tickers=None)[source]#
Returns a dataframe combining a summary of earnings results with backtests for KPIs using this function. The dataframe follows the standard Backtest + Earnings Results schema for Investor reporting.
- Parameters:
df (pyspark.sql.DataFrame) – The Earnings Results Dataframe that the backtests correspond to w.r.t. Product, ticker, and KPI names. It is recommended that this is a dataframe of the table created from the
A.earnings_results_from_gsheetbacktest_configurations (Optional[list[BackTestConfiguration]]) – An optional list of backtest configurations that will populate the output dataframe. The configurations should be created via
A.backtest_configuration. While typically backtests are specified, if no backtests are provided, the output table will pass through the earnings results dataframe with the standard schema for backtests.calculate_va_yy_growth (bool)
yy_growth_tickers (list[str])
- Return type:
pyspark.sql.DataFrame
Examples#
Generating a Earnings Rankings with Backtests dataframe. Note that this also uses theA.backtest_configurationfunction.#from etl_toolkit import A, E, F # Earnings results from Gsheets EARNINGS_FILE_ID = "1w6cXoj6TaAt5WA8gm29DvYg4PNkiwyT4A219g6JS2p8" EARNINGS_SHEET_ID = 1232299054 # Define backtests to include in output Datafame BACKTESTS = [ A.backtest_configuration( df=sample_backtest_df, primary_dataset="skywalker", secondary_dataset=None, panel_used="202101_100", methodology_notes="Uses geo weights and card type weights, yoy coverage change", metric_type="backtest", ), A.backtest_configuration( df=sample_backtest_df, primary_dataset="yoda", secondary_dataset=None, panel_used="202101_100", methodology_notes="Uses geo weights and card type weights, yoy coverage change", metric_type="qa", ), ] df = A.earnings_results_with_backtests( spark.table("yd_production.tgt_gold.earnings_results"), BACKTESTS ) display(df)
quarter_start
quarter_end
…
metric_type
2024-01-01
2024-03-31
…
published
2023-10-01
2023-12-31
…
backtest
2023-10-01
2023-12-31
…
qa
- etl_toolkit.A.backtest_configuration(df, primary_dataset, panel_used, methodology_notes, secondary_dataset=None, metric_type='backtest')[source]#
Use this function to define backtest configurations that are used in the investor reporting process. A list of
backtest_configurationscan then be added to earnings results data usingA.earnings_results_with_backtests.Note
This function should not be used on its own. It’s not a standard analysis function that returns a dataframe. Instead, it defines a configuration that can be supplied to
A.earnings_results_with_backtests.- Parameters:
df (
DataFrame) – A backtest dataframe that contains the results of the back test for each prior publishing quarter. This can be obtained by filtering a quarterly_accuracy formetric_type="backtest"andstatus="active".primary_dataset (
str) – The primary dataset used for this KPI backtest. For ex:skywalkeroryoda.panel_used (
str) – The corresponding Panel ID for the datasets used in this KPI backtest. For ex: 202101_100.methodology_notes (
str) – Any relevant information about the methodology for this backtest, examples include adjustments, coverage changes, weights, etc.panel_used – The corresponding Panel ID for the datasets used in this KPI backtest. For ex:
202101_100.secondary_dataset (
Optional[str], default:None) – An optional, secondary dataset used for this KPI backtest. For ex:skywalkeroryoda. Can leaveNoneif not used. Default isNone.metric_type (
Literal['backtest','qa'], default:'backtest') – The type of KPI Backtest, can bebacktestorqa. Default isbacktest.
- Return type:
None
Examples#
Create a list of backtest_configurations for use in A.earnings_results_with_backtests.#from etl_toolkit import A, E, F sample_backtest_df = ( spark.table("yd_production.tgt_analysts.tgt_quarterly_accuracy") .where(F.col("metric_type") == 'backtest') .where(F.col("status") == 'active') ) backtests = [ A.backtest_configuration( df=sample_backtest_df, primary_dataset="skywalker", secondary_dataset=None, panel_used="202101_100", methodology_notes="Uses geo weights and card type weights, yoy coverage change", metric_type="backtest", ), ]
- etl_toolkit.A.add_unified_consensus_column(df, calculate_va_yy_growth=False, yy_growth_tickers=None)[source]#
Helper function to add
va_consensus_valuecolumn to the input dataframe that represents the quarterly VA estimate and is used in reporting processes.The column is introduced via a join so that we resolve differences in how VA reports quarterly estimates The join logic handles the following adjustments: - Differing quarter ends between the company’s actual fiscal periods and what VA provides - Normalization of growth rates represented as a percentage between 0 to 100 - Growth rate calculations when the reported metric is a growth rate while VA publishes nominal values
- Parameters:
df (
DataFrame) – Input DataFrame of quarterly earnings data. Must have ava_metric_idcolumn and aquarter_endcolumn to perform the join.calculate_va_yy_growth (
bool, default:False) – Flag to control whether Y/Y growth rates should be calculated from VA nominal values. Used if the metric being reported is a growth rate while VA publishes actuals.yy_growth_tickers (
list[str], default:None) – List of tickers that whencalculate_va_yy_growth=True, these will use a the calculated Y/Y growth value from VA nominals. All other tickers will use the VA value as-is.
Ordering Analyses#
Suite of functions to handle re-arranging dataframes columns and rows for common scenarios. These are helpful to use as they will be optimized for performance and/or avoid duplicating columns.
- etl_toolkit.A.shift_columns(df, priority_columns)[source]#
Utility transformation that shifts any specified columns of the dataframe to the beginning of the schema This can be useful for performance optimization (ex: improving join performance on key columns and clustering).
The re-ordered olumns can include new columns that don’t already exist on the dataframe. These will be added in addition to any existing columns in the order of the
priority_columnslist- Parameters:
df (pyspark.sql.DataFrame) – The dataframe to reorder columns on
priority_columns (list[str | pyspark.sql.Column]) – A list of Column(s) or string(s) to shift to the beginning of the schema. If strings are provided, they will be resolved as Columns.
- Return type:
pyspark.sql.DataFrame
Examples#
Re-order columns by specifying which columns should come at the beginning of the schema. All remaining columns will preserve the original order of the dataframe.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"color": "red", "value": 1000, "date": date(2023, 1, 1)}, {"color": "blue", "value": 50, "date": date(2023, 1, 1)}, ]) display( A.shift_columns( df, priority_columns=["date"], ) )
date
color
value
2023-01-01
red
1000
2023-01-01
blue
50
New derived columns can be introduced in thepriority_columnslist. These must be derived used other columns of the provided dataframe.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"color": "red", "value": 1000, "date": date(2023, 1, 1)}, {"color": "blue", "value": 50, "date": date(2023, 1, 1)}, ]) display( A.shift_columns( df, priority_columns=["date", F.upper("color").alias("label")], ) )
date
label
color
output
2023-01-01
RED
red
1000
2023-01-01
BLUE
blue
50
Parser Analyses#
Suite of functions to filter and extract important values from string columns to enrich dataframes. These are powerful functions to simplify and organize complex regex or conditional logic that are usually the critical first steps of product pipelines to filter out unrelated data.
- etl_toolkit.A.parse_records(df, configs, qa=False, include_non_matches=False)[source]#
QA Mode support
Takes the input
dfand returns an output dataframe filtered by the provided configs, as each config is mapped to aparser_idcolumn that is added to the output dataframe. Rows are filtered if:they match the inclusion cases of any parser
they do not match the exclusion case of the corresponding matched parser.
If
qa=True, then theexclude_anyargument for each parser will not drop records. Instead, those columns will be included with aparser_exclusion_casecolumn added to flag which exclude criteria matches that record.Additionally, if
qa=True, then theinclude_anyargument for each parser will be used to populate theparser_inclusion_casecolumn added to the output df. However, this is in addition to theinclude_anyoption filtering records.If
include_non_matches=True, the parser will not filter out any records that were not able to match.If
qa=Trueandinclude_non_matches=True, the function allows multiple parser matches per record. In this case:The
parser_idandtagcolumns become arrays, potentially containing multiple parser IDs.The
parser_inclusion_caseandparser_exclusion_casecolumns become arrays of structs, each containing theparser_idand the corresponding inclusion or exclusion case.
- Parameters:
df (pyspark.sql.DataFrame) – Input dataframe that is filtered and enriched by the specified parser
configs.configs (list[parser]) – List of
A.parserconfigs to use in parsing thedf. Parsers are evaluated in order and the first matching parser is used for a given record.qa (bool) – Boolean flag to control QA mode for this function. When
True, then all rows are preserved andparser_inclusion_case+parser_exclusion_casecolumns are added to the dataframe that indicate the specific case that caused the parser to match. IfFalse, any rows that don’t match any of the inclusion cases or match any of the exclusion cases are removed.include_non_matches (bool) – Boolean flag to control if unmatched records are excluded from the output dataframe. Unmatched records are included when this flag is
True.
- Return type:
pyspark.sql.DataFrame
Examples#
Example of usingA.parse_recordsto filter data based on a list ofA.parsersand their conditions. Notice how only matched records are included and theparser_idcolumn indicates which parser was matched.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 50, "color": "blue", "date": date(2024, 1, 2)}, ]) display( A.parse_records( df, configs=[ A.parser( parser_id="red_parser", include_any=[ F.col("color") == "red", ], ) ] ) )
color
date
value
parser_id
red
2024-01-01
100
red_parser
Example of using A.parse_records to filter data on both inclusion and exclusion criteria.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 50, "color": "blue", "date": date(2024, 1, 2)}, ]) display( A.parse_records( df, configs=[ A.parser( parser_id="red_parser", include_any=[ F.col("date") >= date(2024, 1, 1), ], exclude_any=[ F.col("color") == "blue" ] ) ] ) )
color
date
value
parser_id
red
2024-01-01
100
red_parser
Example of how multipler parsers can be used. Notice how if a record satisfies any of the included parsers it will be included, and the associatedparser_idis present in the output row.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 50, "color": "blue", "date": date(2024, 1, 2)}, ]) display( A.parse_records( df, configs=[ A.parser( parser_id="red_parser", include_any=[ F.col("color") == "red" ], ), A.parser( parser_id="blue_parser", include_any=[ F.col("color") == "blue" ], ), ], ) )
color
date
value
parser_id
red
2024-01-01
100
red_parser
blue
2024-01-02
50
blue_parser
Example of how QA mode works. Notice how the inclusion and exclusion cases apply to each matchedparser_id. The values of these additional columns match the positional index ofincludes_anyorexcludes_anyarguments for the matching parser if they are lists. If dicts are used for these arguments, then the matching key will be used as the value. These columns pinpoint the logic responsible for a records being included/excluded from the dataframe.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 50, "color": "blue", "date": date(2024, 1, 2)}, ]) display( A.parse_records( df, configs=[ A.parser( parser_id="red_parser", include_any=[ F.col("date") >= date(2024, 1, 1), ], exclude_any=[ F.col("color") == "blue" ] ) ], qa=True, ) )
color
date
value
parser_id
red
2024-01-01
100
red_parser
Example of how the include_non_matches flag works. Notice how unmatched records are preserved in the dataframe with aparser_idcolumn value of NULL.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 50, "color": "blue", "date": date(2024, 1, 2)}, ]) display( A.parse_records( df, configs=[ A.parser( parser_id="red_parser", include_any=[ F.col("date") >= date(2024, 1, 1), ], exclude_any=[ F.col("color") == "blue" ] ) ], include_non_matches=True, ) )
color
date
value
parser_id
red
2024-01-01
100
red_parser
blue
2024-01-02
50
null
Example of how the QA mode works with include_non_matches. Notice how records can match multiple parsers and show multiple inclusion/exclusion cases.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"id": 1, "shape": "square", "color": "red"}, {"id": 2, "shape": "square", "color": "blue"}, {"id": 3, "shape": "triangle", "color": "blue"}, {"id": 4, "shape": "circle", "color": "red"}, ]) result_df = A.parse_records( df, configs=[ A.parser( parser_id="red_square_parser", include_any=[F.col("shape") == "square"], exclude_any=[F.col("color").isin(["blue", "green"])], ), A.parser( parser_id="blue_parser", include_any=[F.col("color") == "blue"], ), ], qa=True, include_non_matches=True, ) display(result_df)
In this example: - The square red shape (id=1) matches only the “red_square_parser” and is listed as primary match since it’s the only match. - The square blue shape (id=2) initially matches both parsers but after exclusions only matches “blue_parser” (shown in matched_parser_ids). The parser_id shows all initial matches before exclusions. - The triangle blue shape (id=3) matches only the “blue_parser” and thus has it as primary_match. - The circle red shape (id=4) doesn’t match any parser, but is included due to include_non_matches=True.
The columns: - parser_id: Array showing all initial parser matches before exclusions - matched_parser_ids: Array showing final parser matches after applying exclusions - primary_match: Single parser ID when there’s exactly one match after exclusions, otherwise null - parser_inclusion_case and parser_exclusion_case: Arrays of structs containing the parser_id and corresponding inclusion or exclusion case that matched
- etl_toolkit.A.parser(parser_id, include_any=<factory>, exclude_any=<factory>, with_columns=<factory>, metadata=<factory>)[source]#
Use this function to define parsing configurations that are applied in the
A.parse_recordsfunction. These parsers can control inclusion and exclusion filtering logic viainclude_anyandexclude_anyand enrichment columns throughwith_columns.Tip
Encapsulating logic in parsers like this make transformation logic modular and more easily understood. For example, if covering multiple companies, each company’s filtering logic can be defined as a distinct parser.
Note
This function should not be used on its own. It’s not a standard analysis function that returns a dataframe. Instead, it defines a configuration that can be supplied to
A.parse_records.- Parameters:
parser_id (
str) – Unique ID of the parser object that is used in theparser_idcolumn included in theA.parse_recordsfunction. This ID must be unique within the list of configurations used inA.parse_recordsinclude_any (
list[Column] |dict[str,Column], default:<factory>) – A list or dict of boolean Column expressions. If any of these column expressions evaluate to True for a record, the parser is considered match (assumingexclude_anyalso does not have a match). If a dict is provided instead of a list, then key/value pairs are supplied where the key is a Unique ID for condition and the value is the boolean expression.exclude_any (
list[Column] |dict[str,Column], default:<factory>) – A list or dict of boolean Column expressions. If any of these column expressions evaluate to True for a record, the record is not included in the output ofA.parse_records. If a dict is provided instead of a list, then key/value pairs are supplied where the key is a Unique ID for condition and the value is the boolean expression.with_columns (
dict[str,str|Column], default:<factory>) – A dict of key/value pairs where the key is a column name to add and the value is a Column expression. These columns are included on a record ofA.parse_recordsshould the parser match.metadata (
dict[str,str|int|date|datetime|bool|float], default:<factory>) – Generally not used outside of library developers. These dicts can include arbitrary key/value information about the parser. These values are not added to the outputs ofA.parse_records
- Return type:
None
Examples#
Simple parser example withA.parse_records.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 50, "color": "blue", "date": date(2024, 1, 2)}, ]) display( A.parse_records( df, configs=[ A.parser( parser_id="red_parser", include_any=[ F.col("color") == "red" ], ), A.parser( parser_id="blue_parser", include_any=[ F.col("color") == "blue" ], ), ], ) )
color
date
value
parser_id
red
2024-01-01
100
red_parser
blue
2024-01-02
50
blue_parser
Parser example withA.parse_recordswhenwith_columnsis used. Notice how the columns are added to the output dataframe. Notice that if the set of keys of columns to add are not consisted between parsers, the function will resolve the missing columns as nulls.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 50, "color": "blue", "date": date(2024, 1, 2)}, ]) display( A.parse_records( df, configs=[ A.parser( parser_id="red_parser", include_any=[ F.col("color") == "red" ], with_columns={ "enriched_red": F.lit(True), "enriched_red_2": F.lit(1), }, ), A.parser( parser_id="blue_parser", include_any=[ F.col("color") == "blue" ], with_columns={ "enriched_blue": F.lit(True), "enriched_blue_2": F.lit(1), }, ), ], ) )
color
date
value
parser_id
enriched_red
enriched_red_2
enriched_blue
enriched_blue_2
red
2024-01-01
100
red_parser
true
1
null
null
blue
2024-01-02
50
blue_parser
null
null
true
1
Parser example withinclude_anyandexclude_anyexpressed as dicts. The keys are present in the output dataframe when QA mode is used forA.parse_recordsand the condition is satisfied for the record.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 50, "color": "blue", "date": date(2024, 1, 2)}, ]) display( A.parse_records( df, configs=[ A.parser( parser_id="red_parser", include_any={ "color_match": F.col("color") == "red", }, ), A.parser( parser_id="blue_parser", include_any={ "color_match": F.col("color") == "blue", }, exclude_any={ "date_stale": F.col("date") < date(2024, 1, 3), }, ), ], qa=True, ) )
color
date
value
parser_id
parser_exclusion_case
parser_inclusion_case
red
2024-01-01
100
red_parser
null
color_match
blue
2024-01-02
50
blue_parser
date_stale
color_match
Scalar Analyses#
Suite of functions to genenerate scalars (i.e. python literal values) for common pyspark operations. These can be useful to retrieve values into python from dataframes and re-use them in pipeline code.
Note
Unlike other analyses functions in the toolkit, these scalar functions return python literal values, (ex: int, float, etc.) instead of dataframes.
- etl_toolkit.A.get_aggregates(df, value_column, aggregate_functions)[source]#
Returns a dictionary of aggregate values based on the
value_column,df, andaggregate_functionsspecified. This is used to be able to extract the min, max, etc. values from a pyspark dataframe into python and use in subsequent code. The output is a dictionary with key/value pairs for each aggregate calculation specified.Tip
Certain pipeline logic can benefit from calculating these simple aggregates in a dedicated step and then referenced in subsequent transformations as literals - this simplifies the spark query plan by cutting down on window expressions and/or joins. A common example of this is calculating the percentage of a column across the total for the entire dataset. This would otherwise be an expensive window operation without the use of this function.
- Parameters:
df (pyspark.sql.DataFrame) – Dataframe used to calculate aggregate values
value_column (str | pyspark.sql.Column) – Column or string to use when calculating aggregate values. If a string is provided, it is referenced as a Column.
aggregate_functions (list[Literal['min', 'max', 'count', 'avg', 'sum', 'count_distinct']]) – A list of aggregate function names. For each function name, a separate aggregate calculation will be returned in the output dict.
- Return type:
dict[str, Any]
Examples#
Example of generating aggregate values. Notice the output is a dictonary with keys representing the aggregate functions specified.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 50, "color": "blue", "date": date(2024, 1, 1)}, ]) print( A.get_aggregates( df, value_column="value", aggregate_functions=["min", "max", "sum"], ) ) # out: {'min': 50, 'max': 100, 'sum': 150}
Example of using aggregate values to avoid an expensive window operation. Notice how the aggregates are python values that can be used as literals in subsequent spark operations.#from etl_toolkit import E, F, A, W from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 1)}, {"value": 50, "color": "blue", "date": date(2024, 1, 1)}, ]) # This costly window operation can be reimplemented using get_aggregates enriched_df = df.withColumn( "value_percent", F.col("value") / F.sum("value").over(W.partitionBy(F.lit(1))) ) # Using get_aggregates, we can accomplish the same logic: aggregates = A.get_aggregates( df, value_column="value", aggregate_functions=["sum"] ) display( df .withColumn("value_percent", F.col("value") / F.lit(aggregates["sum"])) )
color
date
value
value_percent
red
2024-01-01
100
0.666666
blue
2024-01-01
50
0.333333
Time Analyses#
Suite of functions to manage date and timestamp operations. Use these functions for filling date ranges and grossing up data to higher periodicities.
- etl_toolkit.A.fill_periods(df, date_column, slice_columns=None, steps=1, step_unit='DAY', start_day_of_week='MONDAY', end_day_of_week=None, start=None, end=None, calendar=None, spark=None)[source]#
Function to transform the provided df so that includes any missing periods as blank rows given the specified date_column and interval defined as (
steps*step_unit). This is a useful function to normalize data when there is gaps due to missing time periods which can be important to address when calculating growth rates or other lagged metrics. The periods are filled between the min/max range of the dataframes date_column.Additionally, this function can fill records based on
slice_columns. When specified, the rows will be added per permuation of slice columns in addition to the defined interval.In the returned dataframe, the filled date and slices (if any specified) will be provided. Other columns in the dataframe will be NULL.
Tip
This is a recommended replacement for the
fill_periodsfunction inyipit_databricks_utils, which is deprecated. It handles other use cases including variable intervals and slices, while being a more performant spark operation.- Parameters:
df (pyspark.sql.DataFrame) – The dataframe to transform to fill in missing periods.
date_column (str | pyspark.sql.Column) – A date or timestamp Column or string that is used to evaluate the min/max range of the fill operation. In addition the filled records will include the date value under this column. If a string is specified, it is normalized as a Column.
slice_columns (list[str | pyspark.sql.Column]) – A list of Columns or strings, that can be used to evaluate the records to fill within each slice. If not specified, only the date column is inspected for gaps in values. If string(s) are specified, it is normalized as a Column.
steps (int) – The number of step_units to use between each row’s
period_startandperiod_endstep_unit (Literal['DAY', 'WEEK', 'MONTH', 'YEAR', 'HOUR', 'MINUTE', 'SECOND']) – The interval of each step to use between each row’s
period_startandperiod_end. A standard calendar is used for quarters.start_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – Only used when
step_unit="WEEK". Indicates which day of week to start each period by, ex start every week on Saturday or Mondays. The input data should also have rows starting on the same day of the week, or anInvalidInputExceptionwill be raised. By default, weeks start on Monday.end_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – When
step_unit='WEEK', this parameter will ensure periods are aggregated on 7-day intervals that end on this specified day of the week. This parameter should not be specified ifstart_day_of_weekis specified as it is duplicative.start (datetime.datetime | datetime.date) – The start date or timestamp (datetime) of the earliest period to fill. If a
datetimeobject is supplied the resulting columns will be of timestamp type. If not specified, the min value of the date_column of the input df is used.end (datetime.datetime | datetime.date) – The end date or timestamp (datetime) of the latest period to fill. If a
datetimeobject is supplied the resulting columns will be of timestamp type. If not specified, the max value of the date_column of the input df is used.spark (pyspark.sql.SparkSession) – Spark session to use. Generally, this is not needed as the session is automatically generated in databricks. It is used by library developers.
- Return type:
pyspark.sql.DataFrame
Examples#
Fill missing dates with 1 day intervals between rows of the dataframe. Notice that the dates are filled based on the overall/min max date of the dataframe.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2023, 1, 5)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) display( A.fill_periods( df, date_column="date", ) )
date
color
value
2023-01-01
red
1000
2023-01-01
blue
50
2023-01-02
null
null
2023-01-03
null
null
2023-01-04
null
null
2023-01-05
red
50
Fill missing dates with 1 day intervals while using the slice_columns feature. Notice that the dates are filled based on the overall/min max date of the dataframe for each unique slice in the dataframe.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2023, 1, 3)}, {"value": 1000, "color": "red", "date": date(2023, 1, 1)}, {"value": 50, "color": "blue", "date": date(2023, 1, 1)}, ]) display( A.fill_periods( df, date_column="date", slice_columns=["color"], ) )
date
color
value
2023-01-01
blue
50
2023-01-02
blue
null
2023-01-03
blue
null
2023-01-01
red
1000
2023-01-02
red
null
2023-01-03
red
50
Fill missing dates with 1 week intervals between rows of the dataframe, with weeks starting on saturdays.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 6)}, {"value": 50, "color": "blue", "date": date(2024, 1, 20)}, ]) display( A.fill_periods( df, date_column="date", step_unit="WEEK", start_day_of_week="SATURDAY", ) )
date
color
value
2024-01-06
red
100
2024-01-13
null
null
2024-01-20
blue
50
Fill missing dates with 1 week intervals between rows of the dataframe, but include data through and include Jan. 1, 2024 using the start parameter.#from etl_toolkit import E, F, A from datetime import date df = spark.createDataFrame([ {"value": 100, "color": "red", "date": date(2024, 1, 8)}, {"value": 50, "color": "blue", "date": date(2024, 1, 22)}, ]) display( A.fill_periods( df, date_column="date", step_unit="WEEK", start=date(2024, 1, 1), ) )
date
color
value
2024-01-01
null
null
2024-01-08
red
100
2024-01-15
null
null
2024-01-22
blue
50
- etl_toolkit.A.periods(start, end, steps=1, step_unit='DAY', start_day_of_week='MONDAY', calendar=None, spark=None)[source]#
Generate a dataframe dynamically based on the start and end values provided, with 1 row per interval that has elapsed (
step*step_unit) time since the previous row. This function can be useful to generate a series of expected dates or timestamps to join to another dataframe.The output dataframe also includes larger periodicies to round the period up to (ex: week, month, quarter, etc.)
- Parameters:
start (datetime.datetime | datetime.date) – The start date or timestamp (datetime) of the dataframe to create. If a
datetimeobject is supplied the resulting columns will be of timestamp type.end (datetime.datetime | datetime.date) – The end date or timestamp (datetime) of the dataframe to create. If a
datetimeobject is supplied the resulting columns will be of timestamp type.steps (int) – The number of step_units to use between each row’s
period_startandperiod_endstep_unit (Literal['DAY', 'WEEK', 'MONTH', 'QUARTER', 'YEAR', 'HOUR', 'MINUTE', 'SECOND']) – The interval of each step to use between each row’s
period_startandperiod_end. A standard calendar is used for quarters.start_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – Only used when
step_unit="WEEK". Indicates which day of week to start each period by, ex start every week on Saturday or Mondays. The start value should have the same day of week, otherwise anInvalidInputExceptionwill be raised. By default, weeks start on Monday.spark (pyspark.sql.SparkSession) – Spark session to use. Generally, this is not needed as the session is automatically generated in databricks. It is used by library developers.
- Return type:
pyspark.sql.DataFrame
Examples#
Generate a simple date range with 1 day per row between the specifiedstartandend.#from etl_toolkit import E, F, A from datetime import date display( A.periods( start=date(2024, 1, 1), end=date(2024, 1, 5), ) )
period_start
period_end
2024-01-01
2024-01-01
2024-01-02
2024-01-02
2024-01-03
2024-01-03
2024-01-04
2024-01-04
2024-01-05
2024-01-05
Generate a date range based on a weekly periodicity. Notice how the start and end dates reflect a 7-day duration.#from etl_toolkit import E, F, A from datetime import date display( A.periods( start=date(2024, 1, 1), end=date(2024, 1, 14), step_unit="WEEK", ) )
period_start
period_end
2024-01-01
2024-01-07
2024-01-08
2024-01-14
Generate a date range based on a weekly periodicity with weeks starting on saturdays. Notice how the start and end dates reflect a 7-day duration.#from etl_toolkit import E, A, F from datetime import date, datetime display( A.periods( start=date(2024, 1, 6), end=date(2024, 1, 19), step_unit="WEEK", start_day_of_week="SATURDAY", ) )
period_start
period_end
2024-01-06
2024-01-12
2024-01-13
2024-01-19
Generate a date range based on a two-month periodicity. This is accomplished by specifyingstepsas 2 and a “MONTH”step_unit.#from etl_toolkit import E, F, A from datetime import date display( A.periods( start=date(2024, 1, 1), end=date(2024, 6, 30), step_unit="MONTH", steps=2, ) )
period_start
period_end
2024-01-01
2024-02-29
2024-03-01
2024-04-30
2024-05-01
2024-06-30
Comparison Analyses#
Suite of functions to compare DataFrames by schema or row content. These are useful for data validation, testing pipeline outputs, and ensuring data quality. The functions help identify differences between two DataFrames either at the schema level or by comparing individual rows.
- etl_toolkit.A.get_schema_comparison(df1, df2, diff_only=False)[source]#
Compare schemas from two DataFrames and returns a DataFrame with the differences.
For schema, all columns are compared regardless of what is passed to columns parameter.
- Parameters:
df1 (pyspark.sql.DataFrame) – The first DataFrame to compare.
df2 (pyspark.sql.DataFrame) – The second DataFrame to compare.
diff_only (bool) – If True, only columns that are different will be returned in schema comparison dataframe.
- Returns:
DataFrame with the schema comparison.
- Return type:
pyspark.sql.DataFrame
Examples#
column_name
df1_data_type
df2_data_type
is_diff
name date
datetime
- is_active
id
_timestamp_ingested
- StringType()
DateType()
- TimestampType()
BooleanType() IntegerType()
NULL
- StringType()
NULL
StringType() StringType() StringType()
TimestampType()
- false
true true true true true
- etl_toolkit.A.get_rows_comparison(df1, df2, columns=None, sample_size=None, checksum_algorithm='hash')[source]#
Compare rows from both DataFrames and return two DataFrames with the differences.
It compares rows by computing a checksum for both dataframes using either the intersection of columns from both dataframes or the columns specified in the columns parameter. Both dataframes are left-anti joined twice on the checksum column to identify differences.
For large datasets, consider using the sample_size parameter to limit the comparison to a subset of rows.
- Parameters:
df1 (pyspark.sql.DataFrame) – The first DataFrame to compare.
df2 (pyspark.sql.DataFrame) – The second DataFrame to compare.
columns (Optional[List[str]]) – A list of columns to compare. If not specified, intersection of the columns from both dataframes will be used.
sample_size (Optional[int]) – Optional limit on the number of rows to include in the comparison. Useful for comparing large datasets.
checksum_algorithm (str) – Algorithm to use for checksum calculation, options are ‘uuid5’ or ‘hash’ (default). For large datasets, ‘hash’ is generally more efficient.
- Returns:
A tuple with two DataFrames, the first one with the rows that are in df1 but not in df2 and the second one with the rows that are in df2 but not in df1.
- Return type:
Tuple[pyspark.sql.DataFrame, pyspark.sql.DataFrame]
Examples#
Investor Standard Metrics Analyses#
Suite of functions specific to the standard metrics experience. These are used to generate unified KPI analyses and downstream dashboard and feed tables for a metric.
- etl_toolkit.A.standard_metric_unified_kpi(df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df=None)[source]#
Generates a comprehensive dataframe containing all unified KPI analyses for a metric.
This function performs a series of transformations to analyze a metric across multiple time periods and calculation types. The process includes:
Data Preparation: - Validates and extracts configuration parameters - Sets up calendar information for time-based analysis - Applies any source table filters
Analysis Generation: - Creates analysis groups based on periodicity (QUARTER, MONTH, etc.) - Calculates simple aggregates (SUM, AVG) for each period - Computes growth rates (YoY, QoQ) if configured - Handles period-to-date comparisons - Processes data slices (e.g., by product, region)
Data Enrichment: - Adds entity information (company name, ticker) - Includes metric metadata (currency, divisor) - Attaches calendar metadata (period labels, dates)
Special Handling: - Supports custom functions (e.g., PCT_OF_TOTAL) - Handles leap year adjustments - Manages trailing period calculations - Normalizes values across different period lengths
- Parameters:
df (pyspark.sql.DataFrame) – Source DataFrame containing the metric data
entity_configuration (standard_metric_unified_kpi.entity_configuration) – A.entity_configuration for configuring entity details
standard_metric_metadata (standard_metric_unified_kpi.standard_metric_metadata) – A.standard_metric_metadata for configuring metric metadata
standard_metric_configuration (standard_metric_unified_kpi.standard_metric_configuration) – A.standard_metric_configuration for configuring metric calculations
calendar_df (Optional[pyspark.sql.DataFrame]) – Optional calendar DataFrame (uses standard calendar if not provided)
- Returns:
DataFrame containing comprehensive metric unified KPI analyses
- Return type:
pyspark.sql.DataFrame
Examples#
Generating unified KPI analyses for a metric. Example output is a subset of the actual output.#from etl_toolkit import A input_df = spark.table("yd_production.afrm_live_reported.afrm_gmv_sliced") calendar_df = spark.table("yd_fp_investor_audit.afrm_xnas_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Affirm", top_level_entity_ticker="AFRM:XNAS" ) standard_metric_metadata = A.standard_metric_metadata( metric_name="GMV", company_comparable_kpi=True, display_period_granularity="DAY", report_period_granularity="QUARTER", currency="USD", value_divisor=1000000, visible_alpha_id=14081329, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="gmv", source_input_date_column="date", aggregate_function="SUM", growth_rate_type="CAGR", max_relevant_years=4, calendar_type="EXACT_N_YEARS", slice_columns=["merchant"], trailing_period_length=7, trailing_period_aggregate_function="AVG", ) df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) display(df)
metric_name
…
slice_name_1
…
value
…
aggregation_type
calculation_type
trailing_period
periodicity
…
period_start
GMV
…
null
…
80536050.596995
…
SUM
SIMPLE_AGGREGATE
null
DAY
…
2024-10-01
GMV
…
merchant
…
35380357.162450
…
SUM
SIMPLE_AGGREGATE
null
DAY
…
2024-10-01
GMV
…
null
…
77256191.770087
…
AVG
SIMPLE_AGGREGATE
7
DAY
…
2024-10-01
GMV
…
merchant
…
41700742.409985
…
AVG
SIMPLE_AGGREGATE
7
DAY
…
2024-10-01
- etl_toolkit.A.standard_metric_unified_kpi_derived(unified_kpi_df_1, unified_kpi_df_2, standard_metric_metadata, standard_metric_configuration, operator='DIVISION')[source]#
Generates a dataframe containing a unified KPI analyses for a derived standard metric based on unified KPI dataframes of two input metrics.
Note
The mathematical operation preserves the order of the input metrics’ unified KPI tables.
If the operator is DIVISION, for example, the metric from unified_kpi_df_1 will be used as the numerator and the metric from unified_kpi_df_1 will be used as the denominator.
- Parameters:
unified_kpi_df_1 (pyspark.sql.DataFrame) – A dataframe containing unified KPI analyses of one metric
unified_kpi_df_2 (pyspark.sql.DataFrame) – A dataframe containing unified KPI analyses of another metric
standard_metric_metadata (standard_metric_unified_kpi_derived.standard_metric_metadata) – A.standard_metric_metadata configurations
standard_metric_configuration (standard_metric_unified_kpi_derived.standard_metric_configuration) – A.standard_metric_configuration configurations
operator (Literal['DIVISION', 'ADDITION', 'MULTIPLICATION', 'SUBTRACTION']) – mathematical operation between the two standard metrics to get the derived metric
- Returns:
A dataframe containing unified KPI analyses of the derived metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating unified KPI analyses for a derived metric. Example output is a subset of the actual output.#from etl_toolkit import A entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) calendar_df = spark.table( "yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000" ) standard_metric_metadata_1 = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration_1 = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) input_df_1 = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") unified_kpi_df_1 = A.standard_metric_unified_kpi( input_df_1, entity_configuration, standard_metric_metadata_1, standard_metric_configuration_1, calendar_df ) standard_metric_metadata_2 = A.standard_metric_metadata( metric_name="Orders - Order Date", company_comparable_kpi=False, ) standard_metric_configuration_2 = A.standard_metric_configuration( source_input_column="order_date_orders_index", source_input_date_column="date", calendar_type="52_WEEK", trailing_period_length=None, trailing_period_aggregate_function=None, ) input_df_2 = spark.table("yd_production.chwy_reported.edison_daily_sales") unified_kpi_df_2 = A.standard_metric_unified_kpi( input_df_2, entity_configuration, standard_metric_metadata_2, standard_metric_configuration_2, calendar_df ) derived_standard_metric_metadata = A.standard_metric_metadata( metric_name="AOV - Order Date", company_comparable_kpi=False, ) derived_standard_metric_configuration = A.standard_metric_configuration( max_relevant_years=2, growth_rate_type="CAGR", calendar_type="52_WEEK", trailing_period_length=7, trailing_period_aggregate_function="AVG", ) df = unified_kpi_derived( unified_kpi_df_1, unified_kpi_df_2, derived_standard_metric_metadata, operator="DIVISION" ) display(df)
metric_name
…
slice_name_1
…
value
…
aggregation_type
calculation_type
trailing_period
periodicity
…
period_start
AOV - Order Date
…
null
…
34136.196960
…
SUM
SIMPLE_AGGREGATE
null
QUARTER
…
2021-02-01
AOV - Order Date
…
null
…
40176.041644
…
SUM
SIMPLE_AGGREGATE
null
QUARTER
…
2023-01-30
AOV - Order Date
…
null
…
29545.027237
…
SUM
SIMPLE_AGGREGATE
null
QUARTER
…
2019-11-04
AOV - Order Date
…
null
…
37124.360009
…
SUM
SIMPLE_AGGREGATE
null
QUARTER
…
2022-05-02
- etl_toolkit.A.standard_metric_data_download(df, entity_configuration)[source]#
Transforms the unified KPI table to generate a dataframe containing analyses for the data download.
- Parameters:
df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table
entity_configuration (standard_metric_data_download.entity_configuration) – A.entity_configuration configurations
- Returns:
DataFrame of data download analyses for a metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating data download analyses for a metric.#from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_data_download(unified_kpi_df, entity_configuration) display(df)
date
top_level_entity_ticker
top_level_entity_name
internal_dashboard_analysis_name
analysis_label_name
metric_name
slice_name_1
slice_value_1
…
value
…
2025-02-02
CHWY:XNYS
Chewy
day_1y_growth_rate_trailing_day
T7D SUM 1Y Growth
Net Sales - Order Date
null
null
…
0.019454
…
2025-02-02
CHWY:XNYS
Chewy
day_2y_growth_rate_trailing_day
T7D SUM 2Y Growth
Net Sales - Order Date
null
null
…
0.019231
…
2025-02-02
CHWY:XNYS
Chewy
day_3y_growth_rate_trailing_day
T7D SUM 3Y Growth
Net Sales - Order Date
null
null
…
0.110622
…
2025-02-02
CHWY:XNYS
Chewy
day_4y_growth_rate_trailing_day
T7D SUM 4Y Growth
Net Sales - Order Date
null
null
…
0.104899
…
2025-02-02
CHWY:XNYS
Chewy
day_simple_aggregate
Nominal Value.
Net Sales - Order Date
null
null
…
32434644.148619
…
2025-02-02
CHWY:XNYS
Chewy
day_simple_aggregate_trailing_day
T7D SUM
Net Sales - Order Date
null
null
…
237115829.752783
…
- etl_toolkit.A.standard_metric_live_feed(df, entity_configuration, spark=None)[source]#
Transforms the unified KPI table to generate a dataframe containing YDL feed analyses. This function will be deprecated alongside YDL Feeds in July 2025.
- Parameters:
df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table
entity_configuration (standard_metric_live_feed.entity_configuration) – A.entity_configuration configurations
spark (pyspark.sql.SparkSession)
- Returns:
DataFrame of live feed analyses for a metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating live feed analyses for a metric.#from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_live_feed(unified_kpi_df, entity_configuration) display(df)
- etl_toolkit.A.standard_metric_feed(df, entity_configuration, spark=None)[source]#
Transforms the unified KPI table to generate a dataframe containing core metric feed analyses.
- Parameters:
df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table
entity_configuration (standard_metric_feed.entity_configuration) – A.entity_configuration configurations
spark (pyspark.sql.SparkSession)
- Returns:
DataFrame of core metric feed analyses for a metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating core metric feed analyses for a metric.#from etl_toolkit import A input_df = spark.table("yd_production.aap_reported.aap_gmv") calendar_df = spark.table("yd_fp_investor_audit.aap_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Advance Auto Parts", top_level_entity_ticker="AAP:XNYS", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales", currency="USD", value_divisor=1000000, visible_alpha_id=5598475, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="value", source_input_date_column="date", max_relevant_years=5, calendar_type="52_WEEK", source_table_filter_conditions=[F.col("metric")=="Net Sales"], trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df, ) df = A.standard_metric_feed(unified_kpi_df, entity_configuration) display(df)
yd_product_type
yd_product
metric_name
analysis_name
…
periodicity
slice_names
slice_values
value
period_start
period_end
currency
figi
…
Core Metrics
AAP
Net Sales
Net Sales - Total
…
daily
null
null
18512030.489125
2025-01-01
2025-01-01
USD
null
…
Core Metrics
AAP
Net Sales
Net Sales - Total
…
monthly
null
null
747219529.371808
2025-01-01
2025-01-31
USD
null
…
Core Metrics
AAP
Net Sales
Net Sales - Total
…
quarterly
null
null
2026418754.661688
2024-10-06
2024-12-28
USD
null
…
Core Metrics
AAP
Net Sales
Net Sales - Total
…
MTD Y-0
null
null
29447541.625361
2025-02-01
2025-02-01
USD
null
…
Core Metrics
AAP
Net Sales
Net Sales - Total
…
QTD Y-0
null
null
868300454.561080
2024-12-09
2025-02-01
USD
null
…
- etl_toolkit.A.standard_metric_daily_growth(df)[source]#
Transforms a metric’s unified KPI table to generate a dataframe containing daily growth analyses.
- Parameters:
df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table
- Returns:
DataFrame of daily growth analyses for a metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating daily growth analyses for a metric.#from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_daily_growth(unified_kpi_df) display(df)
top_level_entity_ticker
top_level_entity_name
period_start
period_end
metric_name
dashboard_analysis_name
day_1y_growth_rate_trailing_day
day_2y_growth_rate_trailing_day
…
CHWY:XNYS
Chewy
2025-02-01
2025-02-01
Net Sales - Order Date
Daily Growth
0.012224
0.017250
…
CHWY:XNYS
Chewy
2025-02-02
2025-02-02
Net Sales - Order Date
Daily Growth
0.014802
0.016585
…
CHWY:XNYS
Chewy
2025-02-03
2025-02-03
Net Sales - Order Date
Daily Growth
0.040221
0.024634
…
- etl_toolkit.A.standard_metric_quarter_month_pivot(df)[source]#
Transforms a metric’s unified KPI table to generate a dataframe containing MTD and QTD analyses.
- Parameters:
df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table
- Returns:
DataFrame of MTD and QTD analyses for a metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating MTD and QTD analyses for a metric.#from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_quarter_month_pivot(unified_kpi_df) display(df)
top_level_entity_ticker
top_level_entity_name
period_start
period_end
metric_name
…
periodicity
analysis_label
…
value
…
CHWY:XNYS
Chewy
2024-10-28
2025-02-02
Net Sales - Order Date
…
QUARTER
Nominal
…
3252.308829
…
CHWY:XNYS
Chewy
2024-10-28
2025-02-02
Net Sales - Order Date
…
QUARTER
Y/Y Growth
…
0.152283
…
CHWY:XNYS
Chewy
2024-10-28
2025-02-02
Net Sales - Order Date
…
QUARTER
2Y CAGR
…
0.094586
…
CHWY:XNYS
Chewy
2024-10-28
2025-02-02
Net Sales - Order Date
…
QUARTER
3Y CAGR
…
0.106234
…
CHWY:XNYS
Chewy
2024-10-28
2025-02-02
Net Sales - Order Date
…
QUARTER
4Y CAGR
…
0.123719
…
- etl_toolkit.A.standard_metric_trailing_day_pivot(df)[source]#
Transforms a metric’s unified KPI table to generate a dataframe containing trailing day analyses.
- Parameters:
df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table
- Returns:
DataFrame of trailing day analyses for a metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating trailing day analyses for a metric.#from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_txd_pivot(unified_kpi_df) display(df)
top_level_entity_ticker
top_level_entity_name
period_start
period_end
metric_name
…
analysis_label
…
value
previous_week_txd_value
…
CHWY:XNYS
Chewy
2025-01-28
2025-02-03
Net Sales - Order Date
…
Nominal
…
233.494012
230.214723
…
CHWY:XNYS
Chewy
2024-10-28
2025-02-02
Net Sales - Order Date
…
Y/Y Growth
…
0.040221
0.075195
…
CHWY:XNYS
Chewy
2024-10-28
2025-02-02
Net Sales - Order Date
…
2Y CAGR
…
0.024634
0.041169
…
CHWY:XNYS
Chewy
2024-10-28
2025-02-02
Net Sales - Order Date
…
3Y CAGR
…
0.116819
0.043802
…
CHWY:XNYS
Chewy
2024-10-28
2025-02-02
Net Sales - Order Date
…
4Y CAGR
…
0.105718
0.085727
…
- etl_toolkit.A.standard_metric_net_adds(df, calendar_df)[source]#
Transforms a metric’s unified KPI table to generate a dataframe containing net add analyses.
- Parameters:
df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table
calendar_df (pyspark.sql.DataFrame)
- Returns:
DataFrame of net add analyses for a metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating net add analyses for a metric.#from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Toast", top_level_entity_ticker="TOST:XNYS", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Location Net Adds", company_comparable_kpi=True, uses_va_for_actuals=False, display_period_granularity="DAY", report_period_granularity="QUARTER", currency=None, value_divisor=1, visible_alpha_id=14282041, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="locations", source_input_date_column="date", source_table_granularity="DAY", aggregate_function=None, max_relevant_years=2, growth_rate_type=None, calendar_type=None, source_table_filter_conditions=None, slice_columns=None, trailing_period_length=None, trailing_period_aggregate_function=None, ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_net_adds(unified_kpi_df, calendar_df) display(df)
period_start
period_end
quarter_start
quarter_end
…
value
daily_net_add_qtd_value
value_previous_1_week
daily_net_add_qtd_value_previous_1_week
…
2024-10-01
2024-10-01
2024-10-01
2024-12-31
…
127066.757493
66.757493
126534.726587
6534.726587
…
2024-10-02
2024-10-02
2024-10-01
2024-12-31
…
127121.858916
121.858916
126597.423004
6597.423004
…
2024-10-03
2024-10-03
2024-10-01
2024-12-31
…
127182.258553
182.258553
126653.519799
6653.519799
…
- etl_toolkit.A.standard_metric_weekly_qtd_progress(df, calendar_df, spark=None)[source]#
Transforms a metric’s unified KPI table to generate a dataframe containing weekly analyses through the quarter and the comparable weekly analyses in prior years.
- Parameters:
df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table
calendar_df (pyspark.sql.DataFrame) – Calendar dataframe
spark (pyspark.sql.SparkSession) – Optional SparkSession (will create one if not provided)
- Returns:
DataFrame of weekly QTD analyses for a metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating weekly quarter-to-date progress analyses for a metric.#from etl_toolkit import A input_df = spark.table("yd_production.cmg_reported.dash_daily_rev") calendar_df = spark.table("yd_fp_investor_audit.calendar_gold.standard_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chipotle", top_level_entity_ticker="CMG:XNYS", exchange=None, entity_name=None, figi=None, ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Total Revenue", company_comparable_kpi=True, uses_va_for_actuals=False, display_period_granularity="WEEK", report_period_granularity="QUARTER", currency="USD", value_divisor=1000, visible_alpha_id=6639756, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="revenue", source_input_date_column="date", source_table_granularity="DAY", aggregate_function="SUM", max_relevant_years=2, growth_rate_type="CAGR", calendar_type="EXACT_N_YEARS", source_table_filter_conditions=None, slice_columns=None, trailing_period_length=7, trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_weekly_qtd_progress(unified_kpi_df, calendar_df) display(df)
- etl_toolkit.A.standard_metric_monthly_qtd_progress(df, calendar_df, spark=None)[source]#
Transforms a metric’s unified KPI table to generate a dataframe containing monthly analyses through the quarter and the comparable monthly analyses in prior years.
- Parameters:
df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table
calendar_df (pyspark.sql.DataFrame) – Calendar dataframe
spark (pyspark.sql.SparkSession) – Optional SparkSession (will create one if not provided)
- Returns:
DataFrame of monthly QTD analyses for a metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating monthly quarter-to-date progress analyses for a metric.#from etl_toolkit import A input_df = spark.table("yd_production.cn_shortvid_reported.active_users") calendar_df = spark.table("yd_fp_investor_audit.calendar_gold.standard_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="ByteDance", top_level_entity_ticker=None, exchange=None, entity_name="Douyin", figi=None, ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Monthly Active Users", company_comparable_kpi=False, uses_va_for_actuals=False, display_period_granularity="MONTH", report_period_granularity="QUARTER", value_divisor=1000000, visible_alpha_id=None, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="mau", source_input_date_column="month", source_table_granularity="MONTH", max_relevant_years=2, aggregate_function="AVG", growth_rate_type="CAGR", calendar_type="EXACT_N_YEARS", source_table_filter_conditions=[F.col("appname") == "Douyin Core"], slice_columns=["appname"], trailing_period_length=None, trailing_period_aggregate_function=None, ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_monthly_qtd_progress(unified_kpi_df, calendar_df) display(df)
- etl_toolkit.A.standard_metric_daily_qtd_progress(df, calendar_df)[source]#
Transforms a metric’s unified KPI table to generate a dataframe containing daily analyses through the quarter and the comparable daily analyses in prior years.
- Parameters:
df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table
calendar_df (pyspark.sql.DataFrame) – Calendar dataframe
- Returns:
DataFrame of daily QTD analyses for a metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating daily quarter-to-date progress analyses for a metric.#from etl_toolkit import A input_df = spark.table("yd_production.chwy_live_reported.chwy_net_sales_order_date") calendar_df = spark.table("yd_fp_investor_audit.chwy_xnys_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Chewy", top_level_entity_ticker="CHWY:XNYS", figi="BBG00P19DLQ4", ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Net Sales - Order Date", company_comparable_kpi=False, currency="USD", value_divisor=1000000, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="net_sales_order_date", source_input_date_column="date", max_relevant_years=4, calendar_type="52_WEEK", trailing_period_aggregate_function="SUM", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_daily_qtd_progress(unified_kpi_df, calendar_df) display(df)
- etl_toolkit.A.standard_metric_half_year_progress(df, calendar_df)[source]#
Transforms a metric’s unified KPI table to generate a dataframe containing daily analyses through the half year and the comparable daily analyses in prior years.
- Parameters:
df (pyspark.sql.DataFrame) – A dataframe of a metric’s unified KPI table
calendar_df (pyspark.sql.DataFrame) – Calendar dataframe
- Returns:
DataFrame of daily half year progress analyses for a metric
- Return type:
pyspark.sql.DataFrame
Examples#
Generating daily half year progress analyses for a metric.#from etl_toolkit import A input_df = spark.table("yd_production.itx_reported.itx_gmv_es") calendar_df = spark.table("yd_fp_investor_audit.itx_xmad_deliverable_gold.custom_calendar__dmv__000") entity_configuration = A.entity_configuration( top_level_entity_name="Inditex", top_level_entity_ticker="ITX:XMAD", exchange=None, entity_name=None, figi=None, ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Spain Net Sales", company_comparable_kpi=True, uses_va_for_actuals=False, display_period_granularity="DAY", report_period_granularity="HALF_YEAR", currency="EUR", value_divisor=1000000, visible_alpha_id=None, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="value", source_input_date_column="date", source_table_granularity="DAY", aggregate_function="SUM", max_relevant_years=2, growth_rate_type="CAGR", calendar_type="EXACT_N_YEARS", source_table_filter_conditions=None, slice_columns=None, trailing_period_length=7, trailing_period_aggregate_function="AVG", ) unified_kpi_df = A.standard_metric_unified_kpi( input_df, entity_configuration, standard_metric_metadata, standard_metric_configuration, calendar_df ) df = A.standard_metric_half_year_progress(unified_kpi_df, calendar_df) display(df)
- etl_toolkit.A.standard_metric_ui_metadata(entity_configuration, standard_metric_metadata, standard_metric_configuration, standard_metric_dashboard_configuration, spark=None)[source]#
Generates a dataframe containing the UI metadata necessary for a standard metric’s dashboard configuration.
- Parameters:
entity_configuration (standard_metric_ui_metadata.entity_configuration) – A.entity_configuration for configuring entity details
standard_metric_metadata (standard_metric_ui_metadata.standard_metric_metadata) – A.standard_metric_metadata for configuring metric metadata
standard_metric_configuration (standard_metric_ui_metadata.standard_metric_configuration) – A.standard_metric_configuration for configuring metric calculations
standard_metric_dashboard_configuration (standard_metric_ui_metadata.standard_metric_dashboard_configuration) – A.standard_metric_dashboard_configuration for configurating the dashboard
spark (pyspark.sql.SparkSession) – Optional SparkSession (will create one if not provided)
- Returns:
DataFrame containing UI metadata for the standard metric dashboard
- Return type:
pyspark.sql.DataFrame
Examples#
Generating UI metadata dataframe for a metric#from etl_toolkit import A entity_configuration = A.entity_configuration( top_level_entity_name="Affirm", top_level_entity_ticker="AFRM:XNAS" ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Gross Merchandise Volume", company_comparable_kpi=True, display_period_granularity="DAY", report_period_granularity="QUARTER", currency="USD", value_divisor=1000000, visible_alpha_id=14081329, ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="gmv", source_input_date_column="date", aggregate_function="SUM", growth_rate_type="CAGR", max_relevant_years=4, calendar_type="EXACT_N_YEARS", slice_columns=["merchant"], trailing_period_length=7, trailing_period_aggregate_function="AVG", ) standard_metric_dashboard_configuration = A.standard_metric_dashboard_configuration( page_title="Gross Merchandise Volume", page_assignment="page_1_1", metric_short_name="GMV", data_lag="T-3", publishing_schedule="Weekly (Thu)", ) df = A.standard_metric_ui_metadata( entity_configuration, standard_metric_metadata, standard_metric_configuration, standard_metric_dashboard_configuration ) display(df)
Calendar Analyses#
Suite of functions and classes to generate and manage calendar data with support for custom fiscal periods, holidays, and business day calculations. These calendar utilities standardize period definitions and holiday handling across an organization.
- etl_toolkit.A.calendar(calendar_id='standard', start_date=None, end_date=None, custom_holidays=None, country_holiday_codes=None, quarters=None, halves=None, fiscal_year_start_month=1, fiscal_year_start_day=1, qa=False, spark=None)[source]#
Generate a dataframe with periodicities (day, week, month, quarter, year) and calendar metadata like holidays and business days. The calendar can be customized with different fiscal year configurations, custom holidays, and period definitions.
The function returns calendar data formatted similarly to the Freeport calendar features with standardized fields for dates, periods, and calendar metadata.
- Parameters:
calendar_id (str) – Identifier for the calendar configuration to use. Defaults to “standard” which uses a standard 365-day calendar.
start_date (Optional[datetime.date]) – Optional minimum date to include in the calendar. Defaults to 5 years before current date.
end_date (Optional[datetime.date]) – Optional maximum date to include in the calendar. Defaults to 5 years after current date.
custom_holidays (Optional[List[Holiday]]) – Optional list of Holiday objects defining custom holidays to include.
country_holiday_codes (Optional[List[str]]) – Optional list of two letter country codes to include default holidays.
quarters (Optional[List[Period]]) – Optional list of Period objects defining custom quarter date ranges.
halves (Optional[List[Period]]) – Optional list of Period objects defining custom half-year date ranges.
fiscal_year_start_month (int) – Month when fiscal year starts (1-12). Defaults to 1 (January).
fiscal_year_start_day (int) – Day of month when fiscal year starts (1-31). Defaults to 1.
qa (bool) – Enable QA mode to add additional columns for validation.
spark (pyspark.sql.SparkSession) – Optional SparkSession. If not provided, will attempt to get session from yipit_databricks_utils. Generally, this is not needed as the session is automatically generated in databricks. It is used by library developers.
- Returns:
DataFrame: Calendar data with the following columns: - day (date): The calendar date - calendar_type (string): Type of calendar (fiscal/standard) - year_label (string): Year label (e.g. FY2024) - year_period_start (date): Start date of year period - year_period_end (date): End date of year period - quarter_period_start (date): Start date of quarter - quarter_period_end (date): End date of quarter - quarter_label (string): Quarter label (e.g. 1Q24) - month_period_start (date): Start date of month - month_period_end (date): End date of month - week_period_start (date): Start of week (Sunday) - week_period_end (date): End of week (Saturday) - half_year_period_start (date): Start of half-year period - half_year_period_end (date): End of half-year period - half_year_label (string): Half-year label (e.g. 1HY24) - is_business_day (boolean): Is this a business day - is_holiday (boolean): Is this a holiday - holiday_name (string): Name of holiday if applicable - days_in_week (int): Days elapsed in current week - days_in_month (int): Days elapsed in current month - days_in_quarter (int): Days elapsed in current quarter - days_in_year (int): Days elapsed in current year - days_in_half_year (int): Days elapsed in current half year Additional columns for fiscal calendars: - custom_year_of_quarter (int): Custom year mapping for quarter - leap_year (boolean): Is this a leap year - leap_day (boolean): Is this a leap day (Feb 29)
- Return type:
pyspark.sql.DataFrame
Examples#
Generate standard calendar for default date range#from etl_toolkit import A df = A.calendar()
Generate calendar with specific date range#from etl_toolkit import A from datetime import date df = A.calendar( start_date=date(2024, 1, 1), end_date=date(2024, 12, 31) )
Generate calendar with custom holidays#from etl_toolkit import A from datetime import date df = A.calendar( custom_holidays=[ A.holiday( name="Company Holiday", start_date=date(2024, 12, 24), end_date=date(2024, 12, 26) ) ] )
Generate calendar with custom fiscal quarters#from etl_toolkit import A from datetime import date df = A.calendar( quarters=[ A.period( start_date=date(2024, 10, 1), end_date=date(2024, 12, 31), type="quarter" ), A.period( start_date=date(2025, 1, 1), end_date=date(2025, 3, 31), type="quarter" ), A.period( start_date=date(2025, 4, 1), end_date=date(2025, 6, 30), type="quarter" ), A.period( start_date=date(2025, 7, 1), end_date=date(2025, 9, 30), type="quarter" ) ] )
Generate calendar with holidays from multiple countries#from etl_toolkit import A from datetime import date df = A.calendar( start_date=date(2024, 1, 1), end_date=date(2024, 12, 31), country_holiday_codes=["US", "GB", "JP"] # Include US, UK, and Japan holidays )