Source code for etl_toolkit.analyses.investor.income_bucket

from typing import Optional, Literal
from datetime import date, datetime

from pyspark.sql import DataFrame, Column, functions as F, SparkSession
from pyspark.sql.window import Window as W
from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit.expressions.time import (
    _validate_day_of_week_bounds,
    _get_day_of_week_bounds,
)
from etl_toolkit import E
from etl_toolkit.analyses.time import fill_periods
from etl_toolkit.analyses.scalar import get_aggregates
from etl_toolkit.analyses.calculation import add_percent_of_total_columns
from etl_toolkit.exceptions import InvalidInputException


@track_usage
[docs] def revenue_mix_by_income_bucket( df: DataFrame, revenue_column: str | Column, date_column: str | Column, income_bucket_column: str | Column = "yd_income_bucket1", income_weight_column: Optional[str | Column] = None, apply_income_weighting: bool = False, periodicity: Literal["WEEK", "MONTH", "QUARTER"] = "MONTH", trailing_period: Optional[int] = None, interval_period: Optional[int] = None, include_growth_rates: bool = False, start_date: Optional[date | datetime] = None, start_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = "SUNDAY", end_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, calendar=None, spark: SparkSession = None, ) -> DataFrame: """ Returns a dataframe of revenue total and percent of total calculated by ``income_bucket_column`` and the specified ``periodicity``. Can optionally include trailing period calcuations for revenue by income bucket and/or 1-year growth rates for revenue metrics. The input data can also be weighted for income if that was not already applied. :param df: Input dataframe that should be used to generate revenue calculations. The dataframe must include the remaining column arguments for this function. :param revenue_column: Numeric type column on the ``df`` to use to calculate revenue totals by income bucket. :param date_column: Date type column on the ``df`` to use to aggregate revenue by the relevant ``periodicity``. :param income_bucket_column: Column on the ``df`` that indicates the income bucket for the given row. Revenue mix will be based on grouping by this column. :param income_weight_column: Optional column on the ``df`` that indicates the income weighting for a panelized dataset. If specified, the income weight will be used to adjust the ``revenue_column`` before aggregating by period and income bucket. :param apply_income_weighting: When ``True``, the ``income_weight_column`` is used to adjust the ``revenue_column`` before aggregating by period and income bucket. When ``False``, the ``revenue_column`` is directly aggregated by period and income bucket. Default is ``False``. :param periodicity: The time resolution to group revenue data on. When specified, the ``date_column`` will be rounded to the nearest matching period to aggregate revenue on that period + income bucket. :param trailing_period: Optionally specified as an integer. When specified, additional revenue metrics will be added to the output dataframe that equals trailing n-period average revenue by period and income bucket. The addtional columns will be prefixed by the trailing period used, ex: t3m_revenue for trailing 3-month aggregations. :param interval_period: Optionally specified as an integer. When specified the first time period and every n-time periods after will be included in the output dataframe. This can be used to display data more effectively for trailing periods, where every nth month is desired in the output. :param start_date: Optional start date to filter the ``date_column`` on before aggregating revenue. The ``start_date`` should be consistent with the ``start_day_of_week``. :param start_day_of_week: When ``periodicity='WEEK'``, this parameter will ensure periods are aggregated on 7-day intervals that start on this specified day of the week. This parameter should not be specified if ``end_day_of_week`` is specified as it is duplicative. :param end_day_of_week: When ``periodicity='WEEK'``, this parameter will ensure periods are aggregated on 7-day intervals that end on this specified day of the week. This parameter should not be specified if ``start_day_of_week`` is specified as it is duplicative. :param spark: Spark session to use. Generally, this is **not needed** as the session is automatically generated in databricks. It is used by library developers. Examples -------- .. code-block:: python :caption: Generate revenue mix by week and income bucket. Notice how the ``time_period`` column indicates the end of the week. from etl_toolkit import E, A, F # cmg_df would be the yoda transactions for CMG df = A.revenue_mix_by_income_bucket( cmg_df, revenue_column="rev", date_column="date", start_date=date(2021, 1, 3), periodicity="WEEK", start_day_of_week="SUNDAY", income_bucket_column="yd_income_bucket2", ) display(df) +--------------+-----------------------------+-----------+------------------------------------+ |time_period |yd_income_bucket |revenue |income_bucket_revenue_percent | +--------------+-----------------------------+-----------+------------------------------------+ |2021-01-09 |High ($150k+) |22127.71 |0.2071751921632703 | +--------------+-----------------------------+-----------+------------------------------------+ |2021-01-09 |Low ($0-60k) |38568.29 |0.3611035438424529 | +--------------+-----------------------------+-----------+------------------------------------+ |2021-01-09 |Lower-Middle ($60-100k) |25451.31 |0.2382930843839744 | +--------------+-----------------------------+-----------+------------------------------------+ |2021-01-09 |Upper-Middle ($100-150k) |20659.43 |0.1934281796103024 | +--------------+-----------------------------+-----------+------------------------------------+ |2022-01-16 |High ($150k+) |22187.53 |0.2092824865506127 | +--------------+-----------------------------+-----------+------------------------------------+ |2022-01-16 |Low ($0-60k) |38352.40 |0.3617565842338732 | +--------------+-----------------------------+-----------+------------------------------------+ |2022-01-16 |Lower-Middle ($60-100k) |24908.72 |0.2349499211492873 | +--------------+-----------------------------+-----------+------------------------------------+ |2022-01-16 |Upper-Middle ($100-150k) |20568.49 |0.1940110080662268 | +--------------+-----------------------------+-----------+------------------------------------+ .. code-block:: python :caption: Aggregate by monthly periods and income bucket, while also including trailing 3-month metrics. Trailing month metrics will be null for earlier periods where not enough periods have elapsed to make the calculation. This also includes weighting the revenue data by income weight by specifying the ``income_weight_column`` argument. from etl_toolkit import E, A, F # cmg_df would be the yoda transactions for CMG df = A.revenue_mix_by_income_bucket( cmg_df, revenue_column="rev", date_column="date", start_date=date(2021, 1, 1), periodicity="MONTH", income_bucket_column="yd_income_bucket2", income_weight_column="income_weight", apply_income_weighting=True, trailing_period=3, ) display(df) +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |time_period |yd_income_bucket |revenue |income_bucket_revenue_percent |trailing_revenue |trailing_income_bucket_revenue_percent| +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-01-01 |High ($150k+) |95083.00 |0.2085046527102458 |null |null | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-01-01 |Low ($0-60k) |163845.34 |0.3592915276434441 |null |null | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-01-01 |Lower-Middle ($60-100k) |108469.64 |0.2378598260263688 |null |null | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-01-01 |Upper-Middle ($100-150k) |88625.41 |0.1943439936199410 |null |null | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-02-01 |High ($150k+) |83032.50 |0.2146970928471732 |null |null | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-02-01 |Low ($0-60k) |136581.27 |0.3531580923512764 |null |null | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-02-01 |Lower-Middle ($60-100k) |92097.17 |0.2381355876897015 |null |null | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-02-01 |Upper-Middle ($100-150k) |75031.62 |0.1940092271118489 |null |null | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-03-01 |High ($150k+) |104205.85 |0.2085999518166622 |94107.12 |0.2103242607554216 | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-03-01 |Low ($0-60k) |180829.90 |0.3619864599887388 |160418.84 |0.3585273177006854 | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-03-01 |Lower-Middle ($60-100k) |118999.91 |0.2382147830825141 |106522.24 |0.23807137636607104 | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ |2021-03-01 |Upper-Middle ($100-150k) |95513.13 |0.1911988051120848 |86390.05 |0.19307704517782195 | +--------------+-------------------------+-----------+-------------------------------+-----------------+--------------------------------------+ """ # Normalize and validate input values if spark is None: from yipit_databricks_utils.helpers.pyspark_utils import get_spark_session spark = get_spark_session() if income_weight_column is None and apply_income_weighting: raise InvalidInputException( "Must specify `income_weight_column` if `apply_income_weighting==True`" ) income_bucket_column = E.normalize_column(income_bucket_column) revenue_column = E.normalize_column(revenue_column) if income_weight_column is not None and apply_income_weighting: adjusted_revenue_column = revenue_column * E.normalize_column( income_weight_column ) else: adjusted_revenue_column = revenue_column _validate_day_of_week_bounds( start_day_of_week=start_day_of_week, end_day_of_week=end_day_of_week, ) start_day_of_week, end_day_of_week = _get_day_of_week_bounds( start_day_of_week=start_day_of_week, end_day_of_week=end_day_of_week, ) # Take only complete periods for the dataset given its min date / start date and max date # Then aggregate by the relevant period and income bucket to get revenue calculations # Finally add percent of total calculations to determine the income bucket contribution # as a percent of the total revenue per period date_range_filter = _filter_for_complete_periods( df, periodicity, date_column, end_day_of_week=end_day_of_week, start_date=start_date, calendar=calendar, ) base = df.where(date_range_filter) adjusted_start_date = get_aggregates( base, value_column=date_column, aggregate_functions=["min"], )["min"] if adjusted_start_date != start_date: print( f"Correcting start date to {adjusted_start_date} to be consistent with week start/ends" ) start_date = adjusted_start_date period = _periodicity_column(periodicity, date_column, start_day_of_week) # When generating income bucket mix, ignore null rows # and then reweight all revenue by those percentages group_df = ( base.where(~income_bucket_column.isNull()) .groupBy( period.alias("time_period"), income_bucket_column.alias("yd_income_bucket") ) .agg(F.sum(adjusted_revenue_column).alias("income_bucket_revenue")) ) income_weight_df = add_percent_of_total_columns( group_df, value_columns=["income_bucket_revenue"], total_grouping_columns=["time_period"], ) # Re-weight all revenue by each available income bucket # This accounts for the case where some transaction data exists # without an associated income bucket so that revenue totals tie out adjusted_revenue_df = ( base.groupBy( period.alias("time_period"), ) .agg(F.sum(revenue_column).alias("revenue")) .join(income_weight_df, ["time_period"], how="Left") .select( F.col("time_period"), F.col("yd_income_bucket"), (F.col("revenue") * F.col("income_bucket_revenue_percent")).alias( "revenue" ), F.col("income_bucket_revenue_percent"), ) ) # Fill in missing periods with 0 revenue to account for gaps in the data final_df = fill_periods( adjusted_revenue_df, date_column="time_period", slice_columns=["yd_income_bucket"], steps=1, step_unit=periodicity.upper(), start=start_date, calendar=calendar, start_day_of_week=start_day_of_week, end_day_of_week=end_day_of_week, spark=spark, ).withColumns( { "revenue": F.coalesce(F.col("revenue"), F.lit(0)), "yd_income_bucket_start": F.when( F.regexp_extract("yd_income_bucket", r"\(\$(\d+)", 1) != "", F.regexp_extract("yd_income_bucket", r"\(\$(\d+)", 1).cast("int"), ).otherwise(F.lit(None).cast("int")), "yd_income_bucket_end": F.when( F.regexp_extract("yd_income_bucket", r"\-(\d+)", 1) != "", F.regexp_extract("yd_income_bucket", r"\-(\d+)", 1).cast("int"), ).otherwise(F.lit(None).cast("int")), } ) # Adjust to show week ending which is convention if periodicity.upper() == "WEEK": final_df = final_df.select( [ E.date_end( periodicity, "time_period", start_day_of_week=start_day_of_week ).alias("time_period") if column_name == "time_period" else column_name for column_name in final_df.columns ] ) # If electing for trailing period calculation(s), add it to the dataframe if trailing_period is not None: trailing_start_offset = (trailing_period - 1) * -1 global_window = W.partitionBy().orderBy(F.col("time_period").asc()) number_of_periods = F.dense_rank().over(global_window) trailing_window = ( W.partitionBy(F.col("yd_income_bucket")) .orderBy("time_period") .rowsBetween(trailing_start_offset, 0) ) time_period_window = W.partitionBy("time_period") # standardize additional trailing columns with a common prefix pattern prefix = f"trailing" trailing_period_revenue = f"{prefix}_revenue" trailing_period_revenue_percent = f"{prefix}_income_bucket_revenue_percent" final_df = ( final_df.withColumn( trailing_period_revenue, F.avg(F.col("revenue")).over(trailing_window) ) .withColumn( trailing_period_revenue_percent, F.col(trailing_period_revenue) / F.sum(F.col(trailing_period_revenue)).over(time_period_window), ) .withColumn("time_period_count", number_of_periods) .withColumns( { trailing_period_revenue: F.when( F.col("time_period_count") >= trailing_period, F.col(trailing_period_revenue), ), trailing_period_revenue_percent: F.when( F.col("time_period_count") >= trailing_period, F.col(trailing_period_revenue_percent), ), } ) .drop("time_period_count") ) if include_growth_rates: final_df = _add_revenue_growth_rates_by_income_bucket( final_df, periodicity, trailing_period=trailing_period, ) # Optionally filter so that time periods that don't fall on every nth period are omitted # this it to enable teams to visualize trailing metrics more clearly # ex: Show every 3rd month for a trailing 3-month revenue metric if interval_period is not None: final_df = ( final_df.withColumn("step", F.rank().over(W.orderBy("time_period")) - 1) .where(F.col("step") % interval_period == 0) .drop("step") ) return final_df.orderBy("time_period", "yd_income_bucket")
def _filter_for_complete_periods( df: DataFrame, periodicity: Literal["WEEK", "MONTH", "QUARTER"], date_column: str | Column, start_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, end_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, start_date: Optional[date | datetime] = None, calendar=None, ) -> Column: _validate_day_of_week_bounds( start_day_of_week=start_day_of_week, end_day_of_week=end_day_of_week, ) start_day_of_week, end_day_of_week = _get_day_of_week_bounds( start_day_of_week=start_day_of_week, end_day_of_week=end_day_of_week, ) aggregates = get_aggregates( df, value_column=date_column, aggregate_functions=["max", "min"] ) max_date = aggregates["max"] if start_date is None: start_date = aggregates["min"] day_after_max_date = F.date_add(F.lit(max_date), 1) match periodicity.upper(): case "WEEK": first_period = E.next_complete_period( periodicity, F.lit(start_date), end_day_of_week=end_day_of_week, ) last_period = E.date_trunc( periodicity, day_after_max_date, end_day_of_week=end_day_of_week, ) case "MONTH": first_period = E.next_complete_period( periodicity, F.lit(start_date), end_day_of_week=end_day_of_week, ) last_period = E.date_trunc(periodicity, day_after_max_date) case "QUARTER": first_period = E.next_complete_period( periodicity, F.lit(start_date), end_day_of_week=end_day_of_week, calendar=calendar, ) last_period = E.date_trunc(periodicity, day_after_max_date) case _: raise InvalidInputException( f"Invalid periodicity provided {periodicity}, must be one of 'WEEK', 'MONTH', 'QUARTER'" ) return E.between(date_column, first_period, last_period, include_upper_bound=False) def _periodicity_column( periodicity: Literal["WEEK", "MONTH", "QUARTER"], date_column: str | Column, start_day_of_week: str, ) -> Column: match periodicity.upper(): case "WEEK": period = E.date_trunc( periodicity, date_column, start_day_of_week=start_day_of_week ) case "MONTH" | "QUARTER": period = E.date_trunc(periodicity, date_column) case _: raise InvalidInputException( f"Invalid periodicity provided {periodicity}, must be one of 'WEEK', 'MONTH', 'QUARTER'" ) return period def _add_revenue_growth_rates_by_income_bucket( df: DataFrame, periodicity: Literal["WEEK", "MONTH", "QUARTER"] = "MONTH", trailing_period: Optional[int] = None, ) -> DataFrame: """ Returns a dataframe of revenue growth rates calculated by ``income_bucket_column`` and the specified ``periodicity``. Can optionally include a ``trailing_period`` to use in aggregating revenue that goes into the growth rate. Growth rates are based on a 1-year lag. :param df: Input dataframe that should be used to generate revenue calculations. The dataframe must include the remaining column arguments for this function. :param periodicity: The time resolution to group revenue data on. When specified, the ``date_column`` will be rounded to the nearest matching period to aggregate revenue on that period + income bucket. :param trailing_period: Optionally specified as an integer. When specified, revenue will be aggregated as a trailing n-period average revenue by period and income bucket. The growth rates will then be determined from this metric. """ # - Use the trailing period revenue metric if specified # and ignore periods where trailing metric is unavailable revenue_metric = "revenue" trailing_revenue_metric = "trailing_revenue" revenue_mix_df = df # - Setup a self-join to calculate growth rates # joining values against the comparable period n-year(s) ago year_ago_df = (revenue_mix_df).alias("year_ago") if trailing_period is not None: year_ago_df = ( revenue_mix_df.where(F.col(trailing_revenue_metric).isNotNull()) ).alias("year_ago") current_df = (revenue_mix_df).alias("current") if trailing_period is not None: current_df = ( revenue_mix_df.where(F.col(trailing_revenue_metric).isNotNull()) ).alias("current") matching_income_bucket = F.col("current.yd_income_bucket") == F.col( "year_ago.yd_income_bucket" ) match periodicity.upper(): case "WEEK": # Use 52-week interval to calculate year-over-year growth, i.e. 364 days join_expression = E.all( matching_income_bucket, F.col(f"current.time_period") == F.date_add(F.col(f"year_ago.time_period"), 364), ) case _: # Use 12-month interval to calculate year-over-year growth join_expression = E.all( matching_income_bucket, F.col(f"current.time_period") == F.add_months(F.col(f"year_ago.time_period"), 12), ) # Calculate growth rates across all income buckets as well as growth rates per bucket ( income_bucket_growth_rate, full_period_growth, contribution_calc, ) = _generate_growth_metrics(revenue_metric) ( trailing_income_bucket_growth_rate, trailing_full_period_growth, trailing_contribution_calc, ) = _generate_growth_metrics(trailing_revenue_metric) standard_columns = [ "current.time_period", "current.yd_income_bucket", "current.revenue", "current.income_bucket_revenue_percent", ] trailing_columns = [ "current.trailing_revenue", ] growth_columns = [ income_bucket_growth_rate.alias("yy_growth"), full_period_growth.alias("total_yy_growth"), contribution_calc.alias("yy_growth_contribution"), ] trailing_growth_columns = [ trailing_income_bucket_growth_rate.alias("trailing_yy_growth"), trailing_full_period_growth.alias("trailing_total_yy_growth"), trailing_contribution_calc.alias("trailing_yy_growth_contribution"), ] yy_growth_df = current_df.join(year_ago_df, join_expression, how="inner").select( "current.time_period", "current.yd_income_bucket", *growth_columns, *trailing_growth_columns if trailing_period is not None else [], ) return df.join(yy_growth_df, ["time_period", "yd_income_bucket"], how="left") def _generate_growth_metrics(revenue_metric: str) -> (Column, Column, Column): all_income_buckets_window = W.partitionBy("current.time_period") all_income_buckets_current_total = F.sum(f"current.{revenue_metric}").over( all_income_buckets_window ) all_income_buckets_year_ago_total = F.sum(f"year_ago.{revenue_metric}").over( all_income_buckets_window ) full_period_growth = ( all_income_buckets_current_total / all_income_buckets_year_ago_total ) - 1 contribution_calc = ( F.col(f"current.{revenue_metric}") - F.col(f"year_ago.{revenue_metric}") ) / all_income_buckets_year_ago_total income_bucket_growth_rate = ( F.col(f"current.{revenue_metric}") / F.col(f"year_ago.{revenue_metric}") ) - 1 return ( income_bucket_growth_rate, full_period_growth, contribution_calc, )