E module (“expressions”)#

The expressions module in the etl_toolkit is imported as E. This module contains many functions to generate common but complex pyspark columns for use in data transformations. The functions should be seen as core building blocks that can greatly simplify code that handles case statements, condition logic, regex parsing, datetime parsing, and more.

Note

ETL toolkit expressions are functions that return a single pyspark Column. Use these when selecting or filtering columns on a pyspark DataFrame.

Using expressions from the etl_toolkit#
1from etl_toolkit import E, F
2
3df = (
4    spark.table("yd_production.etl_toolkit_static.states")
5    .where(E.any(
6        F.col("state_name").like("New%"),
7        F.col("region") == "Northeast",
8    ))
9)

Aggregate Expressions#

These expressions can be used to simplify writing complex aggregation expressions.

Caution

Aggregate expressions must be passed into a .agg method call when using DataFrame.groupBy. Use these functions when performing some group by transformation.

Boolean Expressions#

These expressions can be used to simplify writing complex and/or conditions.

Tip

It is highly recommended to use these boolean expressions where applicable to make code for complex filters, joins, and enrichment columns as easy to read as possible. These functions also make it easier to add new logic given you don’t have to worry about parenthesis and order of operations that come with built-in pyspark boolean operators.

Calculation Expressions#

These expressions can be used to simplify writing complex numerical calculations like growth rates.

ID Expressions#

These expressions are used to generate primary keys or ID fields when developing datasets.

Mapping Expressions#

These expressions can be used to simplify writing complex case statement expressions.

Tip

It is highly recommended to use these mapping expressions where applicable to make enrichment columns as easy to read as possible. These functions also make it easier to dynamically build mapping columns where a long list of conditions needs to be considered or complex boolean conditions need to be met. These conditions work well with the boolean expressions found in the ETL toolkit.

etl_toolkit.E.chain_assigns(conditions, otherwise=None)#

Utility function to write case statements in Pyspark more efficiently. It accepts a list of conditions that are E.cases or E.assign.

Tip

E.chain_assigns and E.chain_cases are aliases of each other. They both produce the same results, however, sylistically it is better to use E.case with E.chain_cases and E.assign with E.chain_assigns.

The E.case function makes it straightforward to write when/then cases. For example, E.case(F.col('x') == 1, 'test') -> case when x=1 then ‘test’. The case function accepts a boolean expresssion Column as the first argument and a Column, string, or dict for the second argument that is returned if the boolean expression is True. If a string is specified for the second argument, it is treated as a literal.

The E.assign function is similar to E.case, however the arguments are reversed. For example, E.assign('test', F.col('x') == 1) -> case when x=1 then ‘test’. The assign function can be more readable code when defining a long series of mapping conditions, since it can be easier to see the match value first before the condition.

Can set a default value if no cases are met via the otherwise argument.

Parameters:
  • conditions (list[CaseStatement]) – List of E.cases or E.assigns that are evaluated in order for matches. If the case matches, the associate value is used for the row. Must have at least one condition in the list.

  • otherwise (Column | str, default: None) – A default value to use if no cases are matched from conditions. If a string is passed it will be treated as a literal.

Return type:

Column

Examples#

Using E.chain_cases to define a mapping column#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "dpz"},
])

sector_mapping = E.chain_cases([
    E.case(F.col("input") == "aapl", "Tech"),
    E.case(F.col("input") == "nke", "Retail"),
    E.case(F.col("input") == "dpz", "Food"),
])

display(
    df
    .withColumn("output", sector_mapping)
)

input

output

aapl

Tech

nke

Retail

dpz

Food

Using E.chain_assigns to define a mapping column. Notice that it can be easier to read the match value first and then the associated condition.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "dpz"},
])

sector_mapping = E.chain_assigns([
    E.assign("Tech", F.col("input") == "aapl"),
    E.assign("Retail", F.col("input") == "nke"),
    E.assign("Food", F.col("input") == "dpz"),
])

display(
    df
    .withColumn("output", sector_mapping)
)

input

output

aapl

Tech

nke

Retail

dpz

Food

Using the otherwise parameter to specify default values for no matches.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "tsla"},
])

sector_mapping = E.chain_cases([
    E.case(F.col("input") == "aapl", "Tech"),
    E.case(F.col("input") == "nke", "Retail"),
    E.case(F.col("input") == "dpz", "Food"),
], otherwise="N/A")

display(
    df
    .withColumn("output", sector_mapping)
)

input

output

aapl

Tech

nke

Retail

tsla

N/A

Using dict valutes in E.case will return a Map type column for matches. The nested fields can also be accessed in pyspark directly if a map isn’t desired. This approach works well when multiple columns need to be assigned for a given condition, rather than creating multiple case statement expressions with repeated logic.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "dpz"},
])

sector_mapping = E.chain_cases([
    E.case(
        F.col("input") == "aapl",
        {
            "sector": "Technology",
            "subsector": "Consumer Devices",
        },
    ),
    E.case(
        F.col("input") == "nke",
        {
            "sector": "Retail",
            "subsector": "Apparel",
        },
    ),
])

display(
    df
    .withColumn("output", sector_mapping)
    .withColumn("sector", sector_mapping.sector)
    .withColumn("subsector", sector_mapping.subsector)
)

input

output

sector

subsector

aapl

{“sector”: “Tech”, “subsector”: “Consumer Devices”}

Tech

Consumer Devices

nke

{“sector”: “Retail”, “subsector”: “Apparel”}

Retail

Apparel

tsla

null

null

null

Normalization Expressions#

These expressions are used to standardize the format of various column types. This is a common cleaning technique to make columns easier to analyze by removing edge cases related to formatting inconsistencies.

Regex Expressions#

These expressions can be used to simplify writing complex regex logic.

Time Expressions#

These expressions can be used to simplify writing complex datetime logic.