E module (“expressions”)#

The expressions module in the etl_toolkit is imported as E. This module contains many functions to generate common but complex pyspark columns for use in data transformations. The functions should be seen as core building blocks that can greatly simplify code that handles case statements, condition logic, regex parsing, datetime parsing, and more.

Note

ETL toolkit expressions are functions that return a single pyspark Column. Use these when selecting or filtering columns on a pyspark DataFrame.

Using expressions from the etl_toolkit#
1from etl_toolkit import E, F
2
3df = (
4    spark.table("yd_production.etl_toolkit_static.states")
5    .where(E.any(
6        F.col("state_name").like("New%"),
7        F.col("region") == "Northeast",
8    ))
9)

Aggregate Expressions#

These expressions can be used to simplify writing complex aggregation expressions.

Caution

Aggregate expressions must be passed into a .agg method call when using DataFrame.groupBy. Use these functions when performing some group by transformation.

etl_toolkit.E.sum_if(condition, value, otherwise=None)[source]#

Function to sum a column based on a specified condition For example, E.sum_if(F.col('x') > 10, F.col('x')) will sum values of column x where x is greater than 10. The optional keyword argument determines the default value when the condition is False for a row. Defaults to NULL.

Parameters:
  • condition (pyspark.sql.Column) – Boolean expression that when True for a row, includes the value expression in the sum calculation

  • value (pyspark.sql.Column | str) – Value column that is used to calculate the sum. If passed as a string, it will be treated as a Column.

  • otherwise (pyspark.sql.Column | int | float) – Default value to use when the condition is False. If not specified, None (NULL) will be used. If not passing a Column, it will be treated as a literal.

Return type:

pyspark.sql.Column

Examples#

Aggregating values within a limit#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"value": 100, "color": "red"},
    {"value": 1000, "color": "red"},
])

display(
    df.groupBy("color")
    .agg(
        E.sum_if(F.col("value") < 1000, F.col("value")).alias("output")
    )
)

color

output

red

100

Can sum on multiple conditions using E.all or E.any. Any column expression that returns a boolean is allowed for condition.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"value": 100, "color": "red"},
    {"value": 1000, "color": "red"},
])

display(
    df.groupBy("color")
    .agg(
        E.sum_if(
            E.all(
                F.col("value") > 100,
                F.col("value") < 10000,
            ),
            F.col("value")
        ).alias("output")
    )
)

color

output

red

1000

etl_toolkit.E.avg_if(condition, value, otherwise=None)[source]#

Function to average a column based on a specified condition For example, E.avg_if(F.col('x') > 10, F.col('x')) will average values of column x where x is greater than 10. The optional keyword argument determines the default value when the condition is False for a row. Defaults to NULL.

Parameters:
  • condition (pyspark.sql.Column) – Boolean expression that when True for a row, includes the value expression in the average calculation

  • value (pyspark.sql.Column | str) – Value column that is used to calculate the average. If passed as a string, it will be treated as a Column.

  • otherwise (pyspark.sql.Column | int | float) – Default value to use when the condition is False. If not specified, None (NULL) will be used. If not passing a Column, it will be treated as a literal.

Return type:

pyspark.sql.Column

Examples#

Aggregating values within a limit#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"value": 100, "color": "red"},
    {"value": 1000, "color": "red"},
])

display(
    df.groupBy("color")
    .agg(
        E.avg_if(F.col("value") < 1000, F.col("value")).alias("output")
    )
)

color

output

red

100

Can average on multiple conditions using E.all or E.any. Any column expression that returns a boolean is allowed for condition.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"value": 100, "color": "red"},
    {"value": 1000, "color": "red"},
])

display(
    df.groupBy("color")
    .agg(
        E.avg_if(
            E.all(
                F.col("value") > 100,
                F.col("value") < 10000,
            ),
            F.col("value")
        ).alias("output")
    )
)

color

output

red

1000

Boolean Expressions#

These expressions can be used to simplify writing complex and/or conditions.

Tip

It is highly recommended to use these boolean expressions where applicable to make code for complex filters, joins, and enrichment columns as easy to read as possible. These functions also make it easier to add new logic given you don’t have to worry about parenthesis and order of operations that come with built-in pyspark boolean operators.

etl_toolkit.E.any(*conditions)[source]#

Expression that can take a series of boolean expressions and wrap them in OR statements. This will return a boolean Column that is True if any one condition is True. It is recommended you use E.any over | expressions in pyspark as it is easier to read and makes chaining multiple conditions simpler to write.

e.g. [x>2, y>3] -> (x>2) | (y>3)

Returns a python None object if no conditions are passed.

Parameters:

conditions (list[pyspark.sql.Column | str] | pyspark.sql.Column | str) – A series of Column expressions that can be passed in as a list or directly, separated by commas. If strings are passed, they will be treated as Columns. Expressions must be of boolean type.

Return type:

pyspark.sql.Column

Examples#

Simple OR expression using E.any#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": 100},
])

display(
    df
    .select(
        "input",
        E.any(
            F.col("input") < 0,
            F.col("input") < 1000
        ).alias("output")
    )
)

input

output

100

True

Pass in any number of conditions to use in E.any. It can be helpful to pass a list if building conditions dynamically.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": 100},
])

columns = ["input"]
conditions = [F.col(column_name) > 10 for column_name in columns]

display(
    df
    .select(
        "input",
        E.any(conditions).alias("output"),
    )
)

input

output

100

True

etl_toolkit.E.all(*conditions)[source]#

Expression that can take a series of boolean expressions and wrap them in AND statements. This will return a boolean Column that is True if all conditions are True. It is recommended you use E.all over & expressions in pyspark as it is easier to read and makes chaining multiple conditions simpler to write.

e.g. [x>2, y>3] -> (x>2) & (y>3)

Returns a python None object if no conditions are passed.

Parameters:

conditions (list[pyspark.sql.Column | str] | pyspark.sql.Column | str) – A series of Column expressions that can be passed in as a list or directly, separated by commas. If strings are passed, they will be treated as Columns. Expressions must be of boolean type.

Return type:

pyspark.sql.Column

Examples#

Simple AND expression using E.all#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": 100},
])

display(
    df
    .select(
        "input",
        E.all(
            F.col("input") < 0,
            F.col("input") < 1000
        ).alias("output")
    )
)

input

output

100

False

Pass in any number of conditions to use in E.all. It can be helpful to pass a list if building conditions dynamically.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": 100},
])

columns = ["input"]
conditions = [F.col(column_name) > 10 for column_name in columns]

display(
    df
    .select(
        "input",
        E.all(conditions).alias("output"),
    )
)

input

output

100

True

etl_toolkit.E.between(column, lower_bound_column, upper_bound_column, include_lower_bound=True, include_upper_bound=True)[source]#

Expression to return a boolean that indicates if a column is within two other columns or values (bounds). The definition considers values at the bounds valid, but can be adjusted using include_lower_bound and include_upper_bound

e.g. [x, a, b] -> a <= x <= b

Parameters:
  • column (pyspark.sql.Column | str) – A Column to compare against the lower and upper bounds. The column must be a compatible type with the bounds. Passing a string will be evaluated as a Column.

  • lower_bound_column (pyspark.sql.Column | str | int | float | datetime.date | datetime.datetime) – A Column that represents the minimum bound of the expression. The column must be a compatible type with the primary column and other bound. Passing a non-column type will be evaluated as a literal.

  • upper_bound_column (pyspark.sql.Column | str | int | float | datetime.date | datetime.datetime) – A Column that represents the maximum bound of the expression. The column must be a compatible type with the primary column and other bound. Passing a non-column type will be evaluated as a literal.

  • include_lower_bound (bool) – A boolean flag that if True will consider values at the minimum bound valid (i.e. <= expression), otherwise values as the bound are not valid (i.e. < expression)

  • include_upper_bound (bool) – A boolean flag that if True will consider values at the maximum bound valid (i.e. >= expression), otherwise values as the bound are not valid (i.e. > expression)

Return type:

pyspark.sql.Column

Examples#

Using E.between to handle filtering outliers. Notice how the bounds can be expressed as python literal values or Columns.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": 0},
    {"input": 20},
    {"input": 100},
    {"input": 120},
    {"input": 300},
])

display(
    df
    .where(E.between("input", 10, 200))
)

input

20

100

120

You can make the function not include bounds with the optional flags.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": 0},
    {"input": 20},
    {"input": 100},
    {"input": 120},
    {"input": 300},
])

display(
    df
    .where(E.between("input", 20, 120, include_upper_bound=False, include_lower_bound=False))
)

input

100

Calculation Expressions#

These expressions can be used to simplify writing complex numerical calculations like growth rates.

etl_toolkit.E.growth_rate_by_lag(value_column, window, num_periods=1, default=None, base_value_column=None)[source]#

Generate a simple growth rate calculation using a provided window expression and value column. The window is used to lag the value column to generate the base period for the growth rate.

By default, the lag is 1 row but it can be increased using num_periods. The growth rate is calculated as (x1 - x0) / x0.

Caution

The lag is performed via a row-based window expression and can lead to incorrect results if there are gaps in the date ranges. It’s important to adjust for missing dates in the data to ensure accurate calculations. The A.fill_periods function can be useful in these scenarios.

Parameters:
  • value_column (str | pyspark.sql.Column) – The column to use as the numerator for the growth rate. A numeric type column should be used. If a string is passed, it is treated as a Column.

  • window (pyspark.sql.Window) – A pyspark Window object used for the lag operation. The window must have an .orderBy clause defined.

  • num_periods (int) – The number of periods to lag the current value against for the growth rate.

  • default (int | float | pyspark.sql.Column) – An optional value to use if the growth rate column is NULL. This is useful to fill in the initial value of the growth rate column. A number passed in will be treated a literal.

  • base_value_column (str | pyspark.sql.Column) – An optional column to use as the denominator for the growth rate using num_periods as the lag interval. A numeric type column should be used. If the value_column is desired for the denominator, leave this as None. Default is None (i.e. the value_column is used for the denominator.)

Return type:

pyspark.sql.Column

Examples#

Using E.growth_rate_by_lag to calculate a y/y growth rate.#
from datetime import date
from etl_toolkit import E, F, W

df = spark.createDataFrame([
    {"date": date(2020, 1, 1), "input": 0},
    {"date": date(2021, 1, 2), "input": 10},
    {"date": date(2022, 1, 3), "input": 20},
    {"date": date(2023, 1, 4), "input": 25},
    {"date": date(2024, 1, 5), "input": 50},
])

display(
    df
    .withColumn(
        "output",
        E.growth_rate_by_lag(
            "input",
            W.partitionBy(F.lit(1)).orderBy("date"),
        )
    )
)

date

input

output

2020-01-01

0

NULL

2021-01-02

10

NULL

2022-01-03

20

1.0

2023-01-04

25

0.25

2024-01-05

50

1.0

Use the default value to coalesce null values in the calculation#
from datetime import date
from etl_toolkit import E, F, W

df = spark.createDataFrame([
    {"date": date(2020, 1, 1), "input": 0},
    {"date": date(2021, 1, 2), "input": 10},
    {"date": date(2022, 1, 3), "input": 20},
    {"date": date(2023, 1, 4), "input": 25},
    {"date": date(2024, 1, 5), "input": 50},
])

display(
    df
    .withColumn(
        "output",
        E.growth_rate_by_lag(
            "input",
            W.partitionBy(F.lit(1)).orderBy("date"),
            default=0,
        )
    )
)

date

input

output

2020-01-01

0

0

2021-01-02

10

0

2022-01-03

20

1.0

2023-01-04

25

0.25

2024-01-05

50

1.0

Use num_periods to generate a 2-yr simple growth rate.#
from datetime import date
from etl_toolkit import E, F, W

df = spark.createDataFrame([
    {"date": date(2020, 1, 1), "input": 0},
    {"date": date(2021, 1, 2), "input": 10},
    {"date": date(2022, 1, 3), "input": 20},
    {"date": date(2023, 1, 4), "input": 25},
    {"date": date(2024, 1, 5), "input": 50},
])

display(
    df
    .withColumn(
        "output",
        E.growth_rate_by_lag(
            "input",
            W.partitionBy(F.lit(1)).orderBy("date"),
            num_periods=2,
        )
    )
)

date

input

output

2020-01-01

0

NULL

2021-01-02

10

NULL

2022-01-03

20

NULL

2023-01-04

25

1.5

2024-01-05

50

1.5

Use a different base_value_column to calculate a y/y growth rate. Notice how the lag to determine the denominator is now on the base column instead of the input column.#
from datetime import date
from etl_toolkit import E, F, W

df = spark.createDataFrame([
    {"date": date(2020, 1, 1), "input": 0, "base": 5},
    {"date": date(2021, 1, 2), "input": 10, "base": 16},
    {"date": date(2022, 1, 3), "input": 20, "base": 20},
    {"date": date(2023, 1, 4), "input": 25, "base": 25},
    {"date": date(2024, 1, 5), "input": 50, "base": 30},
])

display(
    df
    .withColumn(
        "output",
        E.growth_rate_by_lag(
            "input",
            W.partitionBy(F.lit(1)).orderBy("date"),
            base_value_column="base",
        )
    )
)

date

input

base

output

2020-01-01

0

5

0

2021-01-02

10

16

1.0

2022-01-03

20

20

0.25

2023-01-04

25

25

0.25

2024-01-05

50

30

1.0

ID Expressions#

These expressions are used to generate primary keys or ID fields when developing datasets.

etl_toolkit.E.uuid5(*columns, schema=None, namespace=uuid.NAMESPACE_OID, separator='-', null_placeholder='\x00')[source]#

Generates a UUIDv5 from the provided columns and namespace, using a custom separator (default “-“).

This function creates a RFC 4122/9562 compliant UUIDv5 string using PySpark. It concatenates the input columns with the specified separator, then uses this concatenated string along with the provided namespace to generate the UUID.

The function can accept individual column names, Column objects, or a list of these. If a list is provided as the first argument, it will be used as the source of columns.

Tip

For complex types (arrays, structs, maps, variants), you must pass in your dataframe schema with the schema parameter

to ensure proper JSON conversion.

Parameters:
  • columns (Union[str, pyspark.sql.functions.Column, List[Union[str, pyspark.sql.functions.Column]]]) – The input columns to use for generating the UUID. Can be string column names, Column objects, or a list of these. If strings are provided, they are converted to Column objects.

  • schema (Optional[pyspark.sql.types.StructType]) – Optional StructType schema that defines the data types of the input columns. If provided, complex types will automatically be converted to JSON strings.

  • namespace – The namespace to use for the UUID generation. Defaults to uuid.NAMESPACE_OID.

  • separator (str) – The separator to use when concatenating columns. Defaults to “-“.

  • null_placeholder (str) – The placeholder to use for null values. Defaults to “" (null byte).

Returns:

A Column containing the generated UUIDv5 string.

Return type:

pyspark.sql.functions.Column

Examples#

Generate a UUIDv5 from multiple columns#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"col1": "value1", "col2": "value2", "col3": "value3"},
])

result = df.withColumn("uuid", E.uuid5("col1", "col2", "col3", separator="|"))
display(result)

col1

col2

col3

uuid

value1

value2

value3

70234258-cd49-5512-b42b-2a2336284bde

Generate a UUIDv5 from multiple columns with a custom separator character. This affects the UUID generated.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"col1": "value1", "col2": "value2", "col3": "value3"},
])

result = df.withColumn("uuid", E.uuid5("col1", "col2", "col3", separator="|"))
display(result)

col1

col2

col3

uuid

value1

value2

value3

2c4cbae4-03c8-57e8-abd3-83dc81568625

Generate a UUIDv5 from multiple columns while using a custom namespace. This affects the UUID generated.#
from etl_toolkit import E, F
import uuid

df = spark.createDataFrame([
    {"col1": "value1", "col2": "value2", "col3": "value3"},
])

custom_namespace = uuid.uuid4()  # returned uuid.UUID("e7950cee-b08d-497e-8eec-564cbabbd81e")
result = df.withColumn("uuid", E.uuid5("col1", "col2", "col3", namespace=custom_namespace))
display(result)

col1

col2

col3

uuid

value1

value2

value3

2f245ebb-3af3-5268-b734-c27a453fcd87


caption:

Generate a UUIDv5 from multiple columns

from etl_toolkit import E, F

df = spark.createDataFrame([

{“col1”: “value1”, “col2”: “value2”, “col3”: “value3”},

])

result = df.withColumn(“uuid”, E.uuid5(“col1”, “col2”, “col3”, separator=”|”)) display(result)

col1

col2

col3

uuid

value1

value2

value3

70234258-cd49-5512-b42b-2a2336284bde

Using schema with complex types#
from etl_toolkit import E, F
from pyspark.sql.types import StructType, StructField, ArrayType, StringType

# Define schema with complex types
schema = StructType([
    StructField("string_col", StringType(), True),
    StructField("array_col", ArrayType(StringType()), True),
    StructField("map_col", MapType(StringType(), StringType()), True)
])

df = spark.createDataFrame([
    {
        "string_col": "value1",
        "array_col": ["a", "b"],
        "map_col": {"key": "value"}
    }
], schema=schema)

# Pass schema to handle complex types automatically
result = df.withColumn(
    "uuid",
    E.uuid5("string_col", "array_col", "map_col", schema=df.schema)
)
display(result)

Mapping Expressions#

These expressions can be used to simplify writing complex case statement expressions.

Tip

It is highly recommended to use these mapping expressions where applicable to make enrichment columns as easy to read as possible. These functions also make it easier to dynamically build mapping columns where a long list of conditions needs to be considered or complex boolean conditions need to be met. These conditions work well with the boolean expressions found in the ETL toolkit.

etl_toolkit.E.chain_assigns(conditions, otherwise=None)[source]#

Utility function to write case statements in Pyspark more efficiently. It accepts a list of conditions that are E.cases or E.assign.

Tip

E.chain_assigns and E.chain_cases are aliases of each other. They both produce the same results, however, sylistically it is better to use E.case with E.chain_cases and E.assign with E.chain_assigns.

The E.case function makes it straightforward to write when/then cases. For example, E.case(F.col('x') == 1, 'test') -> case when x=1 then ‘test’. The case function accepts a boolean expresssion Column as the first argument and a Column, string, or dict for the second argument that is returned if the boolean expression is True. If a string is specified for the second argument, it is treated as a literal.

The E.assign function is similar to E.case, however the arguments are reversed. For example, E.assign('test', F.col('x') == 1) -> case when x=1 then ‘test’. The assign function can be more readable code when defining a long series of mapping conditions, since it can be easier to see the match value first before the condition.

Can set a default value if no cases are met via the otherwise argument.

Parameters:
  • conditions (list[CaseStatement]) – List of E.cases or E.assigns that are evaluated in order for matches. If the case matches, the associate value is used for the row. Must have at least one condition in the list.

  • otherwise (Column | str, default: None) – A default value to use if no cases are matched from conditions. If a string is passed it will be treated as a literal.

Return type:

Column

Examples#

Using E.chain_cases to define a mapping column#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "dpz"},
])

sector_mapping = E.chain_cases([
    E.case(F.col("input") == "aapl", "Tech"),
    E.case(F.col("input") == "nke", "Retail"),
    E.case(F.col("input") == "dpz", "Food"),
])

display(
    df
    .withColumn("output", sector_mapping)
)

input

output

aapl

Tech

nke

Retail

dpz

Food

Using E.chain_assigns to define a mapping column. Notice that it can be easier to read the match value first and then the associated condition.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "dpz"},
])

sector_mapping = E.chain_assigns([
    E.assign("Tech", F.col("input") == "aapl"),
    E.assign("Retail", F.col("input") == "nke"),
    E.assign("Food", F.col("input") == "dpz"),
])

display(
    df
    .withColumn("output", sector_mapping)
)

input

output

aapl

Tech

nke

Retail

dpz

Food

Using the otherwise parameter to specify default values for no matches.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "tsla"},
])

sector_mapping = E.chain_cases([
    E.case(F.col("input") == "aapl", "Tech"),
    E.case(F.col("input") == "nke", "Retail"),
    E.case(F.col("input") == "dpz", "Food"),
], otherwise="N/A")

display(
    df
    .withColumn("output", sector_mapping)
)

input

output

aapl

Tech

nke

Retail

tsla

N/A

Using dict valutes in E.case will return a Map type column for matches. The nested fields can also be accessed in pyspark directly if a map isn’t desired. This approach works well when multiple columns need to be assigned for a given condition, rather than creating multiple case statement expressions with repeated logic.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "dpz"},
])

sector_mapping = E.chain_cases([
    E.case(
        F.col("input") == "aapl",
        {
            "sector": "Technology",
            "subsector": "Consumer Devices",
        },
    ),
    E.case(
        F.col("input") == "nke",
        {
            "sector": "Retail",
            "subsector": "Apparel",
        },
    ),
])

display(
    df
    .withColumn("output", sector_mapping)
    .withColumn("sector", sector_mapping.sector)
    .withColumn("subsector", sector_mapping.subsector)
)

input

output

sector

subsector

aapl

{“sector”: “Tech”, “subsector”: “Consumer Devices”}

Tech

Consumer Devices

nke

{“sector”: “Retail”, “subsector”: “Apparel”}

Retail

Apparel

tsla

null

null

null

etl_toolkit.E.chain_cases(conditions, otherwise=None)[source]#

Utility function to write case statements in Pyspark more efficiently. It accepts a list of conditions that are E.cases or E.assign.

Tip

E.chain_assigns and E.chain_cases are aliases of each other. They both produce the same results, however, sylistically it is better to use E.case with E.chain_cases and E.assign with E.chain_assigns.

The E.case function makes it straightforward to write when/then cases. For example, E.case(F.col('x') == 1, 'test') -> case when x=1 then ‘test’. The case function accepts a boolean expresssion Column as the first argument and a Column, string, or dict for the second argument that is returned if the boolean expression is True. If a string is specified for the second argument, it is treated as a literal.

The E.assign function is similar to E.case, however the arguments are reversed. For example, E.assign('test', F.col('x') == 1) -> case when x=1 then ‘test’. The assign function can be more readable code when defining a long series of mapping conditions, since it can be easier to see the match value first before the condition.

Can set a default value if no cases are met via the otherwise argument.

Parameters:
  • conditions (list[case]) – List of E.cases or E.assigns that are evaluated in order for matches. If the case matches, the associate value is used for the row. Must have at least one condition in the list.

  • otherwise (pyspark.sql.Column | str) – A default value to use if no cases are matched from conditions. If a string is passed it will be treated as a literal.

Return type:

pyspark.sql.Column

Examples#

Using E.chain_cases to define a mapping column#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "dpz"},
])

sector_mapping = E.chain_cases([
    E.case(F.col("input") == "aapl", "Tech"),
    E.case(F.col("input") == "nke", "Retail"),
    E.case(F.col("input") == "dpz", "Food"),
])

display(
    df
    .withColumn("output", sector_mapping)
)

input

output

aapl

Tech

nke

Retail

dpz

Food

Using E.chain_assigns to define a mapping column. Notice that it can be easier to read the match value first and then the associated condition.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "dpz"},
])

sector_mapping = E.chain_assigns([
    E.assign("Tech", F.col("input") == "aapl"),
    E.assign("Retail", F.col("input") == "nke"),
    E.assign("Food", F.col("input") == "dpz"),
])

display(
    df
    .withColumn("output", sector_mapping)
)

input

output

aapl

Tech

nke

Retail

dpz

Food

Using the otherwise parameter to specify default values for no matches.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "tsla"},
])

sector_mapping = E.chain_cases([
    E.case(F.col("input") == "aapl", "Tech"),
    E.case(F.col("input") == "nke", "Retail"),
    E.case(F.col("input") == "dpz", "Food"),
], otherwise="N/A")

display(
    df
    .withColumn("output", sector_mapping)
)

input

output

aapl

Tech

nke

Retail

tsla

N/A

Using dict valutes in E.case will return a Map type column for matches. The nested fields can also be accessed in pyspark directly if a map isn’t desired. This approach works well when multiple columns need to be assigned for a given condition, rather than creating multiple case statement expressions with repeated logic.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "aapl"},
    {"input": "nke"},
    {"input": "dpz"},
])

sector_mapping = E.chain_cases([
    E.case(
        F.col("input") == "aapl",
        {
            "sector": "Technology",
            "subsector": "Consumer Devices",
        },
    ),
    E.case(
        F.col("input") == "nke",
        {
            "sector": "Retail",
            "subsector": "Apparel",
        },
    ),
])

display(
    df
    .withColumn("output", sector_mapping)
    .withColumn("sector", sector_mapping.sector)
    .withColumn("subsector", sector_mapping.subsector)
)

input

output

sector

subsector

aapl

{“sector”: “Tech”, “subsector”: “Consumer Devices”}

Tech

Consumer Devices

nke

{“sector”: “Retail”, “subsector”: “Apparel”}

Retail

Apparel

tsla

null

null

null

Normalization Expressions#

These expressions are used to standardize the format of various column types. This is a common cleaning technique to make columns easier to analyze by removing edge cases related to formatting inconsistencies.

etl_toolkit.E.normalize_text(col, case='lower')[source]#

Clean string type columns by removing extra white spaces and optionally converting to lower or upper case.

Parameters:
  • col (pyspark.sql.Column | str) – Column to be normalized. The column should be of string type. If a string is passed, it will be referenced as a Column.

  • case (Literal['lower', 'upper']) – The desired case of the output column, should be either “lower” or “upper”.

Return type:

pyspark.sql.Column

Examples#

Using E.normalize_text to clean up a column. Notice how leading and extra spaces are cleaned along with capitalization.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": " This is a Test  String"},
])

display(
    df.withColumn("output", E.normalize_text("input"))
)

input

output

This is a Test String

this is a test string

etl_toolkit.E.try_cast(column, to_type)[source]#

Safely attempts to cast a column to the specified type, returning null if the cast fails. This is useful for handling potentially dirty data where some values may not conform to the desired type.

Parameters:
  • column (pyspark.sql.Column | str) – The column to cast. Can be a Column object or string column name.

  • to_type (str) – Target type as string. For decimal type, specify as ‘decimal(precision,scale)’. Other supported types: ‘string’, ‘int’, ‘long’, ‘float’, ‘double’, ‘boolean’, ‘timestamp’, ‘date’

Returns:

A Column containing the cast values, with nulls where casting failed

Return type:

pyspark.sql.Column

Examples#

Example of safely casting string values to integers#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"value": "123"},
    {"value": "abc"},
    {"value": "456"}
])

display(
    df.withColumn("cast_value", E.try_cast("value", "int"))
)

value

cast_value

123 abc 456

123 null 456

Example of safely casting strings to dates#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"date_str": "2024-01-01"},
    {"date_str": "invalid"},
    {"date_str": "2024-02-01"}
])

display(
    df.withColumn("parsed_date", E.try_cast("date_str", "date"))
)

date_str

parsed_date

2024-01-01 invalid 2024-02-01

2024-01-01 null 2024-02-01

Example of casting to decimal with custom precision#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"value": "123.456"},
    {"value": "invalid"},
    {"value": "789.012"}
])

# Cast to decimal with precision 10 and scale 3
display(
    df.withColumn("cast_value", E.try_cast("value", "decimal(38,18)"))
)

Regex Expressions#

These expressions can be used to simplify writing complex regex logic.

etl_toolkit.E.rlike_any(col, patterns)[source]#

Function that validates if a column matches any of the provided regex patterns. This is equivalent to doing the following logic in spark: [‘hello’,’test’] -> RLIKE ‘hello|test’. It is recommended using this expression over building a long regex string yourself as it can be hard to maintain and/or read.

The return value is a boolean column that is True if any of the provided patterns are satisified, and False otherwise.

Parameters:
  • col (pyspark.sql.Column | str) – The input column to compare the patterns against. It should be a column of string type. If a string is provided, it is referenced as a Column.

  • patterns (list[str]) – A list of regex patterns to match the input col against. This matching is done using the .rlike method.

Return type:

pyspark.sql.Column

Examples#

Using E.rlike_any to check a series of regex patterns.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "Outback"},
    {"input": "outbacksthouse"},
])

display(
    df.withColumn("output", E.rlike_any("input", ["outbackst", "outback s"]))
)

input

output

Outback

False

outbacksthouse

True

etl_toolkit.E.rlike_all(col, patterns)[source]#

Function that validates if a column matches all the provided regex patterns. This is equivalent to doing the following logic in spark: [‘hello’,’test’] -> RLIKE ‘hello and RLIKE ‘test’. It is recommended using this expression over building a long regex string yourself as it can be hard to maintain and/or read.

The return value is a boolean column that is True if all the provided patterns are satisified, and False otherwise.

Parameters:
  • col (pyspark.sql.Column | str) – The input column to compare the patterns against. It should be a column of string type. If a string is provided, it is referenced as a Column.

  • patterns (list[str]) – A list of regex patterns to match the input col against. This matching is done using the .rlike method.

Return type:

pyspark.sql.Column

Examples#

Using E.rlike_all to check a series of regex patterns.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"input": "Outback"},
    {"input": "outbacksthouse"},
])

display(
    df.withColumn("output", E.rlike_all("input", ["outback", "house"]))
)

input

output

Outback

False

outbacksthouse

True

Time Expressions#

These expressions can be used to simplify writing complex datetime logic.

etl_toolkit.E.normalize_date(date_column)[source]#

Parse a date from a string column, attempting to handle various formats. Returns null for unparseable dates. Always returns a DATE type.

Parameters:

date_column (str | pyspark.sql.Column) – The text column containing the date or timestamp string

Returns:

A date column with the parsed result

Return type:

pyspark.sql.Column

Examples#

Parse various date formats into a standardized date column#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"date_string": "2024-03-25"},
    {"date_string": "03/25/2024"},
    {"date_string": "March 25, 2024"},
    {"date_string": "1616630400"},
])

display(
    df.withColumn("parsed_date", E.normalize_date("date_string"))
)

date_string

parsed_date

2024-03-25

2024-03-25

03/25/2024

2024-03-25

March 25, 2024

2024-03-25

1616630400

2021-03-25

etl_toolkit.E.normalize_timestamp(date_column)[source]#

Parse a timestamp from a string column, attempting to handle various formats. Returns null for unparseable dates. Always returns a TIMESTAMP type.

Parameters:

date_column (str | pyspark.sql.Column) – The text column containing the date or timestamp string

Returns:

A timestamp column with the parsed result

Return type:

pyspark.sql.Column

Examples#

Parse various formats into a standardized timestamp column#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"date_string": "2024-03-25T14:30:00"},
    {"date_string": "March 25, 2024 2:30 PM"},
    {"date_string": "1616630400"},
])

display(
    df.withColumn("parsed_ts", E.normalize_timestamp("date_string"))
)

date_string

parsed_ts

2024-03-25T14:30:00

2024-03-25 14:30:00.0

March 25, 2024 2:30 PM

2024-03-25 14:30:00.0

1616630400

2021-03-25 00:00:00.0

etl_toolkit.E.parse_all_dates(text_column, patterns=None, infer_dates=False, debug=False)[source]#

Extracts and parses date-like strings from a text column using multiple regex patterns.

This function attempts to find and parse multiple date formats from a given text column, returning an array of parsed DATE objects. The parsing leverages the ETL Toolkit’s normalize_date function to convert extracted strings into valid date objects.

Parameters:
  • text_column (str | pyspark.sql.Column) – A column containing text to extract dates from. Can be a string column name or a PySpark Column.

  • patterns (Optional[List[str]]) – Optional list of custom regex patterns to use for date extraction. If not provided, a predefined set of comprehensive date patterns is used.

  • infer_dates (bool) – If True, enables month-year inference to expand strings like “June 2024” into start/end dates. If False, keeps basic date extraction behavior. Defaults to False.

  • debug (bool) – If True, returns the raw string array of the dates extracted, not the converted dates. Defaults to False. This is primarily for testing improvements to this function.

Returns:

A PySpark Column containing an array of parsed dates in ascending order. Returns an empty array if no valid dates are found.

Return type:

pyspark.sql.Column

Examples#

Basic usage of parse_all_dates#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"text": "Meeting scheduled for 2024-03-25 and 05/15/2024"},
    {"text": "Conference on February 10, 2023"},
    {"text": "Due in June 2024"},
])

result_df = df.withColumn(
    "parsed_dates",
    E.parse_all_dates("text", infer_dates=True)
)

display(result_df)
Using custom regex patterns#
custom_patterns = [
    r'\d{1,2}/\d{1,2}/\d{4}',  # Only match full M/D/YYYY format
    r'\d{4}-\d{2}-\d{2}'       # Only match YYYY-MM-DD format
]

result_df = df.withColumn(
    "parsed_dates",
    E.parse_all_dates("text", patterns=custom_patterns)
)

Caution#

  • The function attempts to parse dates using various patterns, but not all possible date formats may be captured.

  • Ambiguous date formats (e.g., MM/DD/YYYY vs DD/MM/YYYY) are parsed using a best-effort approach and may require additional context or validation.

etl_toolkit.E.parse_date_range(text_column)[source]#

Parse a date range from text, returning a struct with start and end dates.

This function attempts to extract date ranges from free-form text by finding and parsing date references. It returns a struct with start_date and end_date fields. If exactly one date is found, that date is used for both start and end. If multiple dates are found, the first is used as the start and the second as the end.

Parameters:

text_column (str | pyspark.sql.Column | List[str | pyspark.sql.Column]) – Text column(s) to parse dates from. Can be a single column, list of columns, or Column expressions.

Returns:

Struct column with ‘start_date’ and ‘end_date’ fields. Both fields will be NULL if no valid dates are found in the text.

Return type:

pyspark.sql.Column

Examples#

Parse date ranges from text#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"text": "Operating Supplies 3/13/23 - 3/20/23"},
    {"text": "Due in June 2024"},
    {"text": "Meeting on 2024-03-25"},
    {"text": "No dates here"},
])

# Get date range as a struct
result_df = df.withColumn(
    "date_range",
    E.parse_date_range("text")
)

# Or unpack into separate columns
result_df = df.withColumns({
    "start_date": E.parse_date_range("text").start_date,
    "end_date": E.parse_date_range("text").end_date,
})

display(result_df)

Note#

  • The function automatically enables date inference to handle cases like “June 2024”

  • When multiple columns are provided, they are concatenated with spaces before parsing

  • The function handles a wide variety of date formats including partial dates and month names

etl_toolkit.E.date_trunc(periodicity, date_column, start_day_of_week=None, end_day_of_week=None, calendar=None)[source]#

Truncates a date column to the nearest date that matches the given periodicity. For example, this expression can truncate a date column to its nearest week, month, quarter, year, etc.

Tip

E.date_trunc expands on the built-in F.date_trunc to address some common gaps. For example: - It allows weeks to start on a different day of the week if specified. - It returns a date type instead of a timestamp type

Parameters:
  • periodicity (Literal['YEAR', 'HALF', 'QUARTER', 'MONTH', 'WEEK', 'DAY']) – The date periodicity to trunc the date_column by.

  • date_column (str | pyspark.sql.Column) – The date column to transform given the periodicity.

  • start_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – Optionally set what the start day of week should be. Only used with a WEEK periodicity value. For example, if start_day_of_week="MONDAY", then the date_column is truncated to the nearest prior date that falls on a Monday.

  • end_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – Similar to start_day_of_week can specify the ending day of the week to achieve the same effect. For example, if end_day_of_week="SUNDAY", then the date_column is truncated to the nearest prior date that falls on a Monday so that each week ends on a Sunday. Cannot specify both start_day_of_week and end_day_of_week.

Returns:

A date column with the date values rounded down to the nearest date given the periodicity (i.e. the first week, month, quarter, etc. that the date value would belong to). If not specified (default), the behavior will match built-in pyspark F.date_trunc.

Examples#

Truncate dates to the nearest week, but have weeks start on Mondays using start_day_of_week argument.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"date": date(2024, 10, 1)},
    {"date": date(2024, 10, 2)},
    {"date": date(2024, 10, 8)},
    {"date": date(2024, 10, 14)},
])

display(
    df.withColumns({
        "week_start": E.date_trunc("WEEK", "date", start_day_of_week="SUNDAY"),
        "month_start": E.date_trunc("MONTH", "date"),
        "quarter_start": E.date_trunc("QUARTER", "date"),
        "year_start": E.date_trunc("YEAR", "date"),
    })
)

date

week_start

month_start

quarter_start

year_start

2024-10-01

2024-09-29

2024-10-01

2024-10-01

2024-01-01

2024-10-02

2024-09-29

2024-10-01

2024-10-01

2024-01-01

2024-10-08

2024-10-06

2024-10-01

2024-10-01

2024-01-01

2024-10-14

2024-10-13

2024-10-01

2024-10-01

2024-01-01

etl_toolkit.E.date_end(periodicity, date_column, start_day_of_week=None, end_day_of_week=None, calendar=None)[source]#

Converts a date column to the latest date that is within the given periodicity. For example, this expression can transform a date column to the last day of it’s given week, month, quarter, year, etc.

Tip

E.date_end expands on the E.date_trunc function. It supports similar options and and is effectively the inverse of E.date_trunc.

Parameters:
  • periodicity (Literal['YEAR', 'QUARTER', 'MONTH', 'WEEK', 'DAY']) – The date periodicity to modify the date_column by.

  • date_column (str | pyspark.sql.Column) – The date column to transform given the periodicity.

  • start_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – Optionally set what the start day of week should be. Only used with a WEEK periodicity value. For example, if start_day_of_week="MONDAY", then the date_column is converted to the last day of the week it falls on, assuming weeks start on Monday and end on Sunday.

  • end_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – Optionally set what the end day of week should be. Only used with a WEEK periodicity value. For example, if end_day_of_week="SUNDAY", then the date_column is converted to the last day of the week it falls on, assuming weeks start on Monday and end on Sunday. Cannot specify both start_day_of_week and end_day_of_week.

Returns:

A date column with the date values representing the latest date given the periodicity (i.e. the the week, month, quarter, etc.) that the input date value would belong to).

Examples#

Convert dates to the latest week, month, quarter, and year. Weeks start on Mondays using start_day_of_week argument.#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"date": date(2024, 10, 1)},
    {"date": date(2024, 10, 2)},
    {"date": date(2024, 10, 8)},
    {"date": date(2024, 10, 14)},
])

display(
    df.withColumns({
        "week_end": E.date_end("WEEK", "date", start_day_of_week="SUNDAY"),
        "month_end": E.date_end("MONTH", "date"),
        "quarter_end": E.date_end("QUARTER", "date"),
        "year_end": E.date_end("YEAR", "date"),
    })
)

date

week_end

month_end

quarter_end

year_end

2024-10-01

2024-10-05

2024-10-31

2024-12-31

2024-12-31

2024-10-02

2024-10-05

2024-10-31

2024-12-31

2024-12-31

2024-10-08

2024-10-12

2024-10-31

2024-12-31

2024-12-31

2024-10-14

2024-10-19

2024-10-31

2024-12-31

2024-12-31

etl_toolkit.E.next_complete_period(periodicity, date_column, start_day_of_week=None, end_day_of_week=None, calendar=None)[source]#

Converts the input date_column to the next earliest date that startsa new period, given the periodicity specified. For example, this can return the start of the next complete week, month, quarter, year, etc.

If the value in the date_column already starts a given period, the value will be returned without any transformation.

Parameters:
  • periodicity (Literal['YEAR', 'QUARTER', 'MONTH', 'WEEK', 'DAY']) – The date periodicity to modify the date_column by.

  • date_column (str | pyspark.sql.Column) – The date column to transform given the periodicity.

  • start_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – Optionally set what the start day of week should be. Only used with a WEEK periodicity value. For example, if start_day_of_week="MONDAY", then the date_column is converted to the next date that begins a week, assuming weeks start on Monday and end on Sunday.

  • end_day_of_week (Literal['MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY']) – Optionally set what the end day of week should be. Only used with a WEEK periodicity value. For example, if end_day_of_week="SUNDAY", then the date_column is converted to the next date that begins a week, assuming weeks start on Monday and end on Sunday. Cannot specify both start_day_of_week and end_day_of_week.

Return type:

pyspark.sql.Column

Examples#

Convert dates to the next complete period, given the periodicity. Notice how for PERIODICITY==’WEEK’, the input date falls in the middle of a week, so the start of next week is returned. Similar case for PERIODICITY==’YEAR’. However for PERIODICITY==’MONTH’ the input date falls at the start of the month already, so the current value is returned. Similar case for PERIODICITY==’QUARTER’#
from etl_toolkit import E, F

df = spark.createDataFrame([
    {"date": date(2024, 10, 1)},
])

display(
    df.withColumns({
        "next_complete_week": E.next_complete_period("WEEK", "date", start_day_of_week="SUNDAY"),
        "next_complete_month": E.next_complete_period("MONTH", "date"),
        "next_complete_quarter": E.next_complete_period("QUARTER", "date"),
        "next_complete_year": E.next_complete_period("YEAR", "date"),
    })
)

date

next_complete_week

next_complete_month

next_complete_quarter

next_complete_year

2024-10-01

2024-10-06

2024-10-01

2024-10-01

2025-01-01

etl_toolkit.E.quarter_label(date_column)[source]#

Function that generates a human-readable quarter label given a date column, ex: “2024-01-15” -> “1Q24” These labels are commonly used for investor reporting assets.

Parameters:

date_column (str | pyspark.sql.Column) – A date-type column that is used as the input to generate the corresponding quarter labels

Return type:

pyspark.sql.Column

Examples#

Generate a Quarter label from a given column in a dataframe#
from etl_toolkit import E, F
from datetime import date

df = spark.createDataFrame([{"order_date": date(2024, 1, 15)}])

display(
    df.select(quarter_label("order_date").alias("label"))
)

label

1Q24