from datetime import datetime, date
from typing import Literal, List, Optional
from pyspark.sql import functions as F, DataFrame, Column
from pyspark.sql import SparkSession
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit import expressions as E
from etl_toolkit.exceptions import InvalidInputException
from etl_toolkit.expressions.time import (
_validate_day_of_week_bounds,
_normalize_day_of_week_bounds,
)
# Corresponds with https://docs.databricks.com/en/sql/language-manual/functions/weekday.html
# INTEGER where 0 = Monday and 6 = Sunday.
DAY_OF_WEEK_ADJUSTMENT = {
"MONDAY": 0,
"TUESDAY": 1,
"WEDNESDAY": 2,
"THURSDAY": 3,
"FRIDAY": 4,
"SATURDAY": 5,
"SUNDAY": 6,
}
@track_usage
[docs]
def periods(
start: datetime | date,
end: datetime | date,
steps: int = 1,
step_unit: Literal[
"DAY", "WEEK", "MONTH", "QUARTER", "YEAR", "HOUR", "MINUTE", "SECOND"
] = "DAY",
start_day_of_week: Literal[
"MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY"
] = "MONDAY",
calendar=None,
spark: SparkSession = None,
) -> DataFrame:
"""
Generate a dataframe dynamically based on the start and end values provided,
with 1 row per interval that has elapsed (``step`` * ``step_unit``) time since the previous row.
This function can be useful to generate a series of expected dates or timestamps to join to another dataframe.
The output dataframe also includes larger periodicies to round the period up to (ex: week, month, quarter, etc.)
:param start: The start date or timestamp (datetime) of the dataframe to create. If a ``datetime`` object is supplied the resulting columns will be of timestamp type.
:param end: The end date or timestamp (datetime) of the dataframe to create. If a ``datetime`` object is supplied the resulting columns will be of timestamp type.
:param steps: The number of step_units to use between each row's ``period_start`` and ``period_end``
:param step_unit: The interval of each step to use between each row's ``period_start`` and ``period_end``. A standard calendar is used for quarters.
:param start_day_of_week: Only used when ``step_unit="WEEK"``. Indicates which day of week to start each period by, ex start every week on Saturday or Mondays. The start value should have the same day of week, otherwise an ``InvalidInputException`` will be raised. By default, weeks start on Monday.
:param spark: Spark session to use. Generally, this is **not needed** as the session is automatically generated in databricks. It is used by library developers.
Examples
^^^^^^^^^^^
.. code-block:: python
:caption: Generate a simple date range with 1 day per row between the specified ``start`` and ``end``.
from etl_toolkit import E, F, A
from datetime import date
display(
A.periods(
start=date(2024, 1, 1),
end=date(2024, 1, 5),
)
)
+--------------+--------------+
|period_start |period_end |
+--------------+--------------+
| 2024-01-01| 2024-01-01|
+--------------+--------------+
| 2024-01-02| 2024-01-02|
+--------------+--------------+
| 2024-01-03| 2024-01-03|
+--------------+--------------+
| 2024-01-04| 2024-01-04|
+--------------+--------------+
| 2024-01-05| 2024-01-05|
+--------------+--------------+
.. code-block:: python
:caption: Generate a date range based on a weekly periodicity. Notice how the start and end dates reflect a 7-day duration.
from etl_toolkit import E, F, A
from datetime import date
display(
A.periods(
start=date(2024, 1, 1),
end=date(2024, 1, 14),
step_unit="WEEK",
)
)
+--------------+--------------+
|period_start |period_end |
+--------------+--------------+
| 2024-01-01| 2024-01-07|
+--------------+--------------+
| 2024-01-08| 2024-01-14|
+--------------+--------------+
.. code-block:: python
:caption: Generate a date range based on a weekly periodicity with weeks starting on saturdays.
Notice how the start and end dates reflect a 7-day duration.
from etl_toolkit import E, A, F
from datetime import date, datetime
display(
A.periods(
start=date(2024, 1, 6),
end=date(2024, 1, 19),
step_unit="WEEK",
start_day_of_week="SATURDAY",
)
)
+--------------+--------------+
|period_start |period_end |
+--------------+--------------+
| 2024-01-06| 2024-01-12|
+--------------+--------------+
| 2024-01-13| 2024-01-19|
+--------------+--------------+
.. code-block:: python
:caption: Generate a date range based on a two-month periodicity. This is accomplished by specifying ``steps`` as 2 and a "MONTH" ``step_unit``.
from etl_toolkit import E, F, A
from datetime import date
display(
A.periods(
start=date(2024, 1, 1),
end=date(2024, 6, 30),
step_unit="MONTH",
steps=2,
)
)
+--------------+--------------+
|period_start |period_end |
+--------------+--------------+
| 2024-01-01| 2024-02-29|
+--------------+--------------+
| 2024-03-01| 2024-04-30|
+--------------+--------------+
| 2024-05-01| 2024-06-30|
+--------------+--------------+
"""
if spark is None:
from yipit_databricks_utils.helpers.pyspark_utils import get_spark_session
spark = get_spark_session()
# Spark intervals do not natively support quarter, but can treat as 3 months instead
if step_unit == "QUARTER":
interval = F.expr(f"INTERVAL {steps * 3} MONTH")
else:
interval = F.expr(f"INTERVAL {steps} {step_unit}")
if step_unit in ["HOUR", "MINUTE", "SECOND"]:
cutoff_interval = F.expr("INTERVAL 1 SECOND")
else:
cutoff_interval = F.expr("INTERVAL 1 DAY")
if (
isinstance(start, datetime)
or isinstance(end, datetime)
or step_unit in ["HOUR", "MINUTE", "SECOND"]
):
col_type = "timestamp"
else:
col_type = "date"
# For WEEK step_units, validate the day_of_week matches the
# Validate that weekly input matches on day of week before filling periods
# otherwise joins will be incorrect and duplicate periods will be included
if step_unit == "WEEK":
min_day_of_week_adjustment = start.weekday()
for day_of_week, adjustment in DAY_OF_WEEK_ADJUSTMENT.items():
if (adjustment == min_day_of_week_adjustment) and (
start_day_of_week.upper() != day_of_week
):
raise InvalidInputException(
f"The 'start' value ({start}) falls on {day_of_week}, which does not match the 'start_day_of_week' value ({start_day_of_week}). "
"Select a 'start' and 'start_day_of_week' that falls on the same day of the week to generate periods correctly."
)
# Need to handle case where start/end values are in the middle of a period, which would affect the intervals
# ex: 01-02-2024, 02-01-2024, 03-01-2024
start_column = F.date_trunc(step_unit, F.lit(start)).cast(col_type)
end_column = F.lit(end).cast(col_type)
# If a custom day of week adjustment is used with step_unit="week",
# then offset start value to begin on the correct day of week
if step_unit == "WEEK":
min_day_of_week_adjustment = start.weekday()
if min_day_of_week_adjustment != 0:
adjustment = F.expr(f"INTERVAL {min_day_of_week_adjustment} DAY")
start_column = (start_column + adjustment).cast(col_type)
df = (
spark.range(1)
.select(
F.explode(
F.sequence(
start_column,
end_column,
step=interval,
)
).alias("period_start_temp"),
)
.withColumns(
{
"period_start": (
F.when(F.col("period_start_temp") < F.lit(start), F.lit(start))
.otherwise(F.col("period_start_temp"))
.cast(col_type)
),
"period_end_temp": F.col("period_start_temp")
+ interval
- cutoff_interval,
}
)
.select(
"period_start",
(
F.when(F.col("period_end_temp") > F.lit(end), F.lit(end))
.otherwise(F.col("period_end_temp"))
.cast(col_type)
.alias("period_end")
),
)
)
return df
@track_usage
[docs]
def fill_periods(
df: DataFrame,
date_column: str | Column,
slice_columns: list[str | Column] = None,
steps: int = 1,
step_unit: Literal[
"DAY", "WEEK", "MONTH", "YEAR", "HOUR", "MINUTE", "SECOND"
] = "DAY",
start_day_of_week: Literal[
"MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY"
] = "MONDAY",
end_day_of_week: Literal[
"MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY"
] = None,
start: datetime | date = None,
end: datetime | date = None,
calendar=None,
spark: SparkSession = None,
) -> DataFrame:
"""
Function to transform the provided df so that includes any missing periods as blank rows
given the specified date_column and interval defined as (``steps`` * ``step_unit``).
This is a useful function to normalize data when there is gaps due to missing time periods
which can be important to address when calculating growth rates or other lagged metrics.
The periods are filled between the min/max range of the dataframes `date_column`.
Additionally, this function can fill records based on ``slice_columns``. When specified, the rows will be
added per permuation of slice columns in addition to the defined interval.
In the returned dataframe, the filled date and slices (if any specified) will be provided. Other columns in the dataframe will be NULL.
.. tip:: This is a recommended replacement for the ``fill_periods`` function in ``yipit_databricks_utils``, which is deprecated.
It handles other use cases including variable intervals and slices, while being a more performant spark operation.
:param df: The dataframe to transform to fill in missing periods.
:param date_column: A date or timestamp Column or string that is used to evaluate the min/max range of the fill operation. In addition the filled records will include the date value under this column. If a string is specified, it is normalized as a Column.
:param slice_columns: A list of Columns or strings, that can be used to evaluate the records to fill within each slice. If not specified, only the date column is inspected for gaps in values. If string(s) are specified, it is normalized as a Column.
:param steps: The number of step_units to use between each row's ``period_start`` and ``period_end``
:param step_unit: The interval of each step to use between each row's ``period_start`` and ``period_end``. A standard calendar is used for quarters.
:param start_day_of_week: Only used when ``step_unit="WEEK"``. Indicates which day of week to start each period by, ex start every week on Saturday or Mondays. The input data should also have rows starting on the same day of the week, or an ``InvalidInputException`` will be raised. By default, weeks start on Monday.
:param end_day_of_week: When ``step_unit='WEEK'``, this parameter will ensure periods are aggregated on 7-day intervals that end on this specified day of the week. This parameter should not be specified if ``start_day_of_week`` is specified as it is duplicative.
:param start: The start date or timestamp (datetime) of the earliest period to fill. If a ``datetime`` object is supplied the resulting columns will be of timestamp type. If not specified, the min value of the `date_column` of the input df is used.
:param end: The end date or timestamp (datetime) of the latest period to fill. If a ``datetime`` object is supplied the resulting columns will be of timestamp type. If not specified, the max value of the `date_column` of the input df is used.
:param spark: Spark session to use. Generally, this is **not needed** as the session is automatically generated in databricks. It is used by library developers.
Examples
^^^^^^^^^^^
.. code-block:: python
:caption: Fill missing dates with 1 day intervals between rows of the dataframe.
Notice that the dates are filled based on the overall/min max date of the dataframe.
from etl_toolkit import E, F, A
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2023, 1, 5)},
{"value": 1000, "color": "red", "date": date(2023, 1, 1)},
{"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])
display(
A.fill_periods(
df,
date_column="date",
)
)
+--------------+--------------+--------------+
|date |color |value |
+--------------+--------------+--------------+
| 2023-01-01| red| 1000|
+--------------+--------------+--------------+
| 2023-01-01| blue| 50|
+--------------+--------------+--------------+
| 2023-01-02| null| null|
+--------------+--------------+--------------+
| 2023-01-03| null| null|
+--------------+--------------+--------------+
| 2023-01-04| null| null|
+--------------+--------------+--------------+
| 2023-01-05| red| 50|
+--------------+--------------+--------------+
.. code-block:: python
:caption: Fill missing dates with 1 day intervals while using the slice_columns feature.
Notice that the dates are filled based on the overall/min max date of the dataframe for each unique slice in the dataframe.
from etl_toolkit import E, F, A
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2023, 1, 3)},
{"value": 1000, "color": "red", "date": date(2023, 1, 1)},
{"value": 50, "color": "blue", "date": date(2023, 1, 1)},
])
display(
A.fill_periods(
df,
date_column="date",
slice_columns=["color"],
)
)
+--------------+--------------+--------------+
|date |color |value |
+--------------+--------------+--------------+
| 2023-01-01| blue| 50|
+--------------+--------------+--------------+
| 2023-01-02| blue| null|
+--------------+--------------+--------------+
| 2023-01-03| blue| null|
+--------------+--------------+--------------+
| 2023-01-01| red| 1000|
+--------------+--------------+--------------+
| 2023-01-02| red| null|
+--------------+--------------+--------------+
| 2023-01-03| red| 50|
+--------------+--------------+--------------+
.. code-block:: python
:caption: Fill missing dates with 1 week intervals between rows of the dataframe,
with weeks starting on saturdays.
from etl_toolkit import E, F, A
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 6)},
{"value": 50, "color": "blue", "date": date(2024, 1, 20)},
])
display(
A.fill_periods(
df,
date_column="date",
step_unit="WEEK",
start_day_of_week="SATURDAY",
)
)
+--------------+--------------+--------------+
|date |color |value |
+--------------+--------------+--------------+
| 2024-01-06| red| 100|
+--------------+--------------+--------------+
| 2024-01-13| null| null|
+--------------+--------------+--------------+
| 2024-01-20| blue| 50|
+--------------+--------------+--------------+
.. code-block:: python
:caption: Fill missing dates with 1 week intervals between rows of the dataframe,
but include data through and include Jan. 1, 2024 using the `start` parameter.
from etl_toolkit import E, F, A
from datetime import date
df = spark.createDataFrame([
{"value": 100, "color": "red", "date": date(2024, 1, 8)},
{"value": 50, "color": "blue", "date": date(2024, 1, 22)},
])
display(
A.fill_periods(
df,
date_column="date",
step_unit="WEEK",
start=date(2024, 1, 1),
)
)
+--------------+--------------+--------------+
|date |color |value |
+--------------+--------------+--------------+
| 2024-01-01| null| null|
+--------------+--------------+--------------+
| 2024-01-08| red| 100|
+--------------+--------------+--------------+
| 2024-01-15| null| null|
+--------------+--------------+--------------+
| 2024-01-22| blue| 50|
+--------------+--------------+--------------+
"""
_validate_day_of_week_bounds(
start_day_of_week=start_day_of_week,
end_day_of_week=end_day_of_week,
)
start_day_of_week, end_day_of_week = _normalize_day_of_week_bounds(
start_day_of_week=start_day_of_week,
end_day_of_week=end_day_of_week,
)
date_column = E.normalize_column(date_column)
date_column_string = df.select("*", date_column).columns[-1]
date_range = df.select(
F.min(date_column).alias("min"), F.max(date_column).alias("max")
).first()
min_period_start = date_range.min
if start is not None and start < date_range.min:
min_period_start = start
max_period_end = date_range.max
if end is not None and end > date_range.max:
max_period_end = end
fill_dates = periods(
min_period_start,
max_period_end,
steps=steps,
step_unit=step_unit,
start_day_of_week=start_day_of_week,
spark=spark,
).select(F.col("period_start").alias(date_column_string))
if slice_columns:
slice_columns = [E.normalize_column(col) for col in slice_columns]
unique_slices = df.select(slice_columns).distinct()
fill_dates = fill_dates.crossJoin(unique_slices)
return fill_dates.join(
df,
fill_dates.columns,
how="left",
)