from pyspark.sql import functions as F, Column
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit.expressions.core import normalize_column, normalize_literal
@track_usage
[docs]
def sum_if(
condition: Column, value: Column | str, otherwise: Column | int | float = None
) -> Column:
"""
.. role:: python(code)
:language: python
Function to sum a column based on a specified condition
For example, :python:`E.sum_if(F.col('x') > 10, F.col('x'))` will sum values
of column x where x is greater than 10.
The optional keyword argument determines the default value when the condition is False for a row. Defaults to NULL.
:param condition: Boolean expression that when True for a row, includes the value expression in the sum calculation
:param value: Value column that is used to calculate the sum. If passed as a string, it will be treated as a Column.
:param otherwise: Default value to use when the condition is False. If not specified, `None` (NULL) will be used. If not passing a Column, it will be treated as a literal.
Examples
-----------
.. code-block:: python
:caption: Aggregating values within a limit
from etl_toolkit import E, F
df = spark.createDataFrame([
{"value": 100, "color": "red"},
{"value": 1000, "color": "red"},
])
display(
df.groupBy("color")
.agg(
E.sum_if(F.col("value") < 1000, F.col("value")).alias("output")
)
)
+--------------+--------------+
|color |output |
+--------------+--------------+
|red | 100|
+--------------+--------------+
.. code-block:: python
:caption: Can sum on multiple conditions using ``E.all`` or ``E.any``. Any column expression that returns a boolean is allowed for condition.
from etl_toolkit import E, F
df = spark.createDataFrame([
{"value": 100, "color": "red"},
{"value": 1000, "color": "red"},
])
display(
df.groupBy("color")
.agg(
E.sum_if(
E.all(
F.col("value") > 100,
F.col("value") < 10000,
),
F.col("value")
).alias("output")
)
)
+--------------+--------------+
|color |output |
+--------------+--------------+
|red | 1000|
+--------------+--------------+
"""
value = normalize_column(value)
otherwise = normalize_literal(otherwise)
return F.sum(F.when(condition, value).otherwise(otherwise))
@track_usage
def count_if(condition: Column) -> Column:
"""
.. role:: python(code)
:language: python
Function to count a column for occurences of a specified condition
For example, :python:`E.count_if(F.col('x') > 10, F.col('x'))` will count values where x is greater than 10.
:param condition: Boolean expression that when True for a row, includes the value expression in the sum calculation
Examples
-----------
.. code-block:: python
:caption: Aggregating values within a limit
from etl_toolkit import E, F
df = spark.createDataFrame([
{"value": 100, "color": "red"},
{"value": 1000, "color": "red"},
])
display(
df.groupBy("color")
.agg(
E.count_if(F.col("value") < 1000, F.col("value")).alias("output")
)
)
+--------------+--------------+
|color |output |
+--------------+--------------+
|red | 1|
+--------------+--------------+
.. code-block:: python
:caption: Can count on multiple conditions using ``E.all`` or ``E.any``. Any column expression that returns a boolean is allowed for condition.
from etl_toolkit import E, F
df = spark.createDataFrame([
{"value": 100, "color": "red"},
{"value": 1000, "color": "red"},
])
display(
df.groupBy("color")
.agg(
E.count_if(
E.all(
F.col("value") > 100,
F.col("value") < 10000,
),
F.col("value")
).alias("output")
)
)
+--------------+--------------+
|color |output |
+--------------+--------------+
|red | 1|
+--------------+--------------+
"""
return F.count(F.when(condition, F.lit(1)))
@track_usage
[docs]
def avg_if(
condition: Column, value: Column | str, otherwise: Column | int | float = None
) -> Column:
"""
.. role:: python(code)
:language: python
Function to average a column based on a specified condition
For example, :python:`E.avg_if(F.col('x') > 10, F.col('x'))` will average values
of column x where x is greater than 10.
The optional keyword argument determines the default value when the condition is False for a row. Defaults to NULL.
:param condition: Boolean expression that when True for a row, includes the value expression in the average calculation
:param value: Value column that is used to calculate the average. If passed as a string, it will be treated as a Column.
:param otherwise: Default value to use when the condition is False. If not specified, `None` (NULL) will be used. If not passing a Column, it will be treated as a literal.
Examples
-----------
.. code-block:: python
:caption: Aggregating values within a limit
from etl_toolkit import E, F
df = spark.createDataFrame([
{"value": 100, "color": "red"},
{"value": 1000, "color": "red"},
])
display(
df.groupBy("color")
.agg(
E.avg_if(F.col("value") < 1000, F.col("value")).alias("output")
)
)
+--------------+--------------+
|color |output |
+--------------+--------------+
|red | 100|
+--------------+--------------+
.. code-block:: python
:caption: Can average on multiple conditions using ``E.all`` or ``E.any``. Any column expression that returns a boolean is allowed for condition.
from etl_toolkit import E, F
df = spark.createDataFrame([
{"value": 100, "color": "red"},
{"value": 1000, "color": "red"},
])
display(
df.groupBy("color")
.agg(
E.avg_if(
E.all(
F.col("value") > 100,
F.col("value") < 10000,
),
F.col("value")
).alias("output")
)
)
+--------------+--------------+
|color |output |
+--------------+--------------+
|red | 1000|
+--------------+--------------+
"""
value = normalize_column(value)
otherwise = normalize_literal(otherwise)
return F.avg(F.when(condition, value).otherwise(otherwise))