Source code for etl_toolkit.expressions.aggregate

from pyspark.sql import functions as F, Column
from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit.expressions.core import normalize_column, normalize_literal


@track_usage
[docs] def sum_if( condition: Column, value: Column | str, otherwise: Column | int | float = None ) -> Column: """ .. role:: python(code) :language: python Function to sum a column based on a specified condition For example, :python:`E.sum_if(F.col('x') > 10, F.col('x'))` will sum values of column x where x is greater than 10. The optional keyword argument determines the default value when the condition is False for a row. Defaults to NULL. :param condition: Boolean expression that when True for a row, includes the value expression in the sum calculation :param value: Value column that is used to calculate the sum. If passed as a string, it will be treated as a Column. :param otherwise: Default value to use when the condition is False. If not specified, `None` (NULL) will be used. If not passing a Column, it will be treated as a literal. Examples ----------- .. code-block:: python :caption: Aggregating values within a limit from etl_toolkit import E, F df = spark.createDataFrame([ {"value": 100, "color": "red"}, {"value": 1000, "color": "red"}, ]) display( df.groupBy("color") .agg( E.sum_if(F.col("value") < 1000, F.col("value")).alias("output") ) ) +--------------+--------------+ |color |output | +--------------+--------------+ |red | 100| +--------------+--------------+ .. code-block:: python :caption: Can sum on multiple conditions using ``E.all`` or ``E.any``. Any column expression that returns a boolean is allowed for condition. from etl_toolkit import E, F df = spark.createDataFrame([ {"value": 100, "color": "red"}, {"value": 1000, "color": "red"}, ]) display( df.groupBy("color") .agg( E.sum_if( E.all( F.col("value") > 100, F.col("value") < 10000, ), F.col("value") ).alias("output") ) ) +--------------+--------------+ |color |output | +--------------+--------------+ |red | 1000| +--------------+--------------+ """ value = normalize_column(value) otherwise = normalize_literal(otherwise) return F.sum(F.when(condition, value).otherwise(otherwise))
@track_usage def count_if(condition: Column) -> Column: """ .. role:: python(code) :language: python Function to count a column for occurences of a specified condition For example, :python:`E.count_if(F.col('x') > 10, F.col('x'))` will count values where x is greater than 10. :param condition: Boolean expression that when True for a row, includes the value expression in the sum calculation Examples ----------- .. code-block:: python :caption: Aggregating values within a limit from etl_toolkit import E, F df = spark.createDataFrame([ {"value": 100, "color": "red"}, {"value": 1000, "color": "red"}, ]) display( df.groupBy("color") .agg( E.count_if(F.col("value") < 1000, F.col("value")).alias("output") ) ) +--------------+--------------+ |color |output | +--------------+--------------+ |red | 1| +--------------+--------------+ .. code-block:: python :caption: Can count on multiple conditions using ``E.all`` or ``E.any``. Any column expression that returns a boolean is allowed for condition. from etl_toolkit import E, F df = spark.createDataFrame([ {"value": 100, "color": "red"}, {"value": 1000, "color": "red"}, ]) display( df.groupBy("color") .agg( E.count_if( E.all( F.col("value") > 100, F.col("value") < 10000, ), F.col("value") ).alias("output") ) ) +--------------+--------------+ |color |output | +--------------+--------------+ |red | 1| +--------------+--------------+ """ return F.count(F.when(condition, F.lit(1))) @track_usage
[docs] def avg_if( condition: Column, value: Column | str, otherwise: Column | int | float = None ) -> Column: """ .. role:: python(code) :language: python Function to average a column based on a specified condition For example, :python:`E.avg_if(F.col('x') > 10, F.col('x'))` will average values of column x where x is greater than 10. The optional keyword argument determines the default value when the condition is False for a row. Defaults to NULL. :param condition: Boolean expression that when True for a row, includes the value expression in the average calculation :param value: Value column that is used to calculate the average. If passed as a string, it will be treated as a Column. :param otherwise: Default value to use when the condition is False. If not specified, `None` (NULL) will be used. If not passing a Column, it will be treated as a literal. Examples ----------- .. code-block:: python :caption: Aggregating values within a limit from etl_toolkit import E, F df = spark.createDataFrame([ {"value": 100, "color": "red"}, {"value": 1000, "color": "red"}, ]) display( df.groupBy("color") .agg( E.avg_if(F.col("value") < 1000, F.col("value")).alias("output") ) ) +--------------+--------------+ |color |output | +--------------+--------------+ |red | 100| +--------------+--------------+ .. code-block:: python :caption: Can average on multiple conditions using ``E.all`` or ``E.any``. Any column expression that returns a boolean is allowed for condition. from etl_toolkit import E, F df = spark.createDataFrame([ {"value": 100, "color": "red"}, {"value": 1000, "color": "red"}, ]) display( df.groupBy("color") .agg( E.avg_if( E.all( F.col("value") > 100, F.col("value") < 10000, ), F.col("value") ).alias("output") ) ) +--------------+--------------+ |color |output | +--------------+--------------+ |red | 1000| +--------------+--------------+ """ value = normalize_column(value) otherwise = normalize_literal(otherwise) return F.avg(F.when(condition, value).otherwise(otherwise))