E module (“expressions”)#
The expressions module in the etl_toolkit is imported as E.
This module contains many functions to generate common but complex pyspark columns for use in data transformations.
The functions should be seen as core building blocks that can greatly simplify code that handles case statements, condition logic, regex parsing, datetime parsing, and more.
Note
ETL toolkit expressions are functions that return a single pyspark Column.
Use these when selecting or filtering columns on a pyspark DataFrame.
1from etl_toolkit import E, F
2
3df = (
4 spark.table("yd_production.etl_toolkit_static.states")
5 .where(E.any(
6 F.col("state_name").like("New%"),
7 F.col("region") == "Northeast",
8 ))
9)
Aggregate Expressions#
These expressions can be used to simplify writing complex aggregation expressions.
Caution
Aggregate expressions must be passed into a .agg method call when using DataFrame.groupBy.
Use these functions when performing some group by transformation.
Boolean Expressions#
These expressions can be used to simplify writing complex and/or conditions.
Tip
It is highly recommended to use these boolean expressions where applicable to make code for complex filters, joins, and enrichment columns as easy to read as possible. These functions also make it easier to add new logic given you don’t have to worry about parenthesis and order of operations that come with built-in pyspark boolean operators.
Calculation Expressions#
These expressions can be used to simplify writing complex numerical calculations like growth rates.
ID Expressions#
These expressions are used to generate primary keys or ID fields when developing datasets.
Mapping Expressions#
These expressions can be used to simplify writing complex case statement expressions.
Tip
It is highly recommended to use these mapping expressions where applicable to make enrichment columns as easy to read as possible. These functions also make it easier to dynamically build mapping columns where a long list of conditions needs to be considered or complex boolean conditions need to be met. These conditions work well with the boolean expressions found in the ETL toolkit.
- etl_toolkit.E.chain_assigns(conditions, otherwise=None)#
Utility function to write case statements in Pyspark more efficiently. It accepts a list of conditions that are
E.casesorE.assign.Tip
E.chain_assignsandE.chain_casesare aliases of each other. They both produce the same results, however, sylistically it is better to useE.casewithE.chain_casesandE.assignwithE.chain_assigns.The
E.casefunction makes it straightforward to write when/then cases. For example,E.case(F.col('x') == 1, 'test')-> case when x=1 then ‘test’. The case function accepts a boolean expresssion Column as the first argument and aColumn,string, ordictfor the second argument that is returned if the boolean expression is True. If a string is specified for the second argument, it is treated as a literal.The
E.assignfunction is similar toE.case, however the arguments are reversed. For example,E.assign('test', F.col('x') == 1)-> case when x=1 then ‘test’. The assign function can be more readable code when defining a long series of mapping conditions, since it can be easier to see the match value first before the condition.Can set a default value if no cases are met via the
otherwiseargument.- Parameters:
conditions (
list[CaseStatement]) – List ofE.casesorE.assignsthat are evaluated in order for matches. If the case matches, the associate value is used for the row. Must have at least one condition in the list.otherwise (
Column|str, default:None) – A default value to use if no cases are matched fromconditions. If a string is passed it will be treated as a literal.
- Return type:
Column
Examples#
Using E.chain_cases to define a mapping column#from etl_toolkit import E, F df = spark.createDataFrame([ {"input": "aapl"}, {"input": "nke"}, {"input": "dpz"}, ]) sector_mapping = E.chain_cases([ E.case(F.col("input") == "aapl", "Tech"), E.case(F.col("input") == "nke", "Retail"), E.case(F.col("input") == "dpz", "Food"), ]) display( df .withColumn("output", sector_mapping) )
input
output
aapl
Tech
nke
Retail
dpz
Food
Using E.chain_assigns to define a mapping column. Notice that it can be easier to read the match value first and then the associated condition.#from etl_toolkit import E, F df = spark.createDataFrame([ {"input": "aapl"}, {"input": "nke"}, {"input": "dpz"}, ]) sector_mapping = E.chain_assigns([ E.assign("Tech", F.col("input") == "aapl"), E.assign("Retail", F.col("input") == "nke"), E.assign("Food", F.col("input") == "dpz"), ]) display( df .withColumn("output", sector_mapping) )
input
output
aapl
Tech
nke
Retail
dpz
Food
Using the otherwise parameter to specify default values for no matches.#from etl_toolkit import E, F df = spark.createDataFrame([ {"input": "aapl"}, {"input": "nke"}, {"input": "tsla"}, ]) sector_mapping = E.chain_cases([ E.case(F.col("input") == "aapl", "Tech"), E.case(F.col("input") == "nke", "Retail"), E.case(F.col("input") == "dpz", "Food"), ], otherwise="N/A") display( df .withColumn("output", sector_mapping) )
input
output
aapl
Tech
nke
Retail
tsla
N/A
Using dict valutes inE.casewill return a Map type column for matches. The nested fields can also be accessed in pyspark directly if a map isn’t desired. This approach works well when multiple columns need to be assigned for a given condition, rather than creating multiple case statement expressions with repeated logic.#from etl_toolkit import E, F df = spark.createDataFrame([ {"input": "aapl"}, {"input": "nke"}, {"input": "dpz"}, ]) sector_mapping = E.chain_cases([ E.case( F.col("input") == "aapl", { "sector": "Technology", "subsector": "Consumer Devices", }, ), E.case( F.col("input") == "nke", { "sector": "Retail", "subsector": "Apparel", }, ), ]) display( df .withColumn("output", sector_mapping) .withColumn("sector", sector_mapping.sector) .withColumn("subsector", sector_mapping.subsector) )
input
output
sector
subsector
aapl
{“sector”: “Tech”, “subsector”: “Consumer Devices”}
Tech
Consumer Devices
nke
{“sector”: “Retail”, “subsector”: “Apparel”}
Retail
Apparel
tsla
null
null
null
Normalization Expressions#
These expressions are used to standardize the format of various column types. This is a common cleaning technique to make columns easier to analyze by removing edge cases related to formatting inconsistencies.
Regex Expressions#
These expressions can be used to simplify writing complex regex logic.
Time Expressions#
These expressions can be used to simplify writing complex datetime logic.