Source code for etl_toolkit.expressions.time

from datetime import datetime
from typing import List, Literal, Optional

from pyspark.sql import functions as F, Column, DataFrame
from dateutil import parser
from pyspark.sql.types import DateType, TimestampType
from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit.expressions.core import normalize_column
from etl_toolkit.exceptions import InvalidInputException


# Mapping of day of week to numerical index used in pyspark
# See https://docs.databricks.com/en/sql/language-manual/functions/weekday.html for details
DAY_OF_WEEK_OFFSET_MAPPING = {
    "MONDAY": 0,
    "TUESDAY": 1,
    "WEDNESDAY": 2,
    "THURSDAY": 3,
    "FRIDAY": 4,
    "SATURDAY": 5,
    "SUNDAY": 6,
}


@track_usage
[docs] def quarter_label(date_column: str | Column) -> Column: """ Function that generates a human-readable quarter label given a date column, ex: "2024-01-15" -> "1Q24" These labels are commonly used for investor reporting assets. :param date_column: A date-type column that is used as the input to generate the corresponding quarter labels Examples -------- .. code-block:: python :caption: Generate a Quarter label from a given column in a dataframe from etl_toolkit import E, F from datetime import date df = spark.createDataFrame([{"order_date": date(2024, 1, 15)}]) display( df.select(quarter_label("order_date").alias("label")) ) +-------+ |label | +-------+ |1Q24 | +-------+ """ date_column = normalize_column(date_column) label = F.concat( F.quarter(date_column).cast("string"), F.lit("Q"), F.date_format(date_column, "yy"), ) return label
class DatePatternGroups: """ Organized date format patterns for parsing, grouped by common characteristics inspired by dateutil's systematic approach. """ # ISO format variants ISO_PATTERNS = [ "yyyy-MM-dd", # 2024-03-25 "yyyy-MM-dd HH:mm:ss", # 2024-03-25 14:30:00 "yyyy-MM-dd'T'HH:mm:ss", # 2024-03-25T14:30:00 "yyyy-MM-dd'T'HH:mm:ss.SSS", # 2024-03-25T14:30:00.000 "yyyy-MM-dd'T'HH:mm:ssXXX", # 2024-03-25T14:30:00+01:00 "yyyy-MM-dd'T'HH:mm:ss.SSSZ", # 2024-03-25T14:30:00.000Z "yyyy-DDD", # 2024-084 (ordinal date) ] # Common Western formats with varying delimiters WESTERN_DELIMITED = [ "MM/dd/yyyy", # 03/25/2024 "MM-dd-yyyy", # 03-25-2024 "yyyy/MM/dd", # 2024/03/25 "yyyy.MM.dd", # 2024.03.25 "MM/dd/yy", # 03/25/24 "M/d/yy", # 3/25/24 "M/d/yyyy", # 5/14/2024 "M-d-yyyy", # 5-14-2024 "dd/MM/yyyy", # 25/03/2024 "dd-MM-yyyy", # 25-03-2024 "dd.MM.yyyy", # 25.03.2024 "d-M-yyyy", # 25-3-2024 "d.M.yyyy", # 25.3.2024 "d/M/yyyy", # 25/3/2024 "MM/dd/yy", # 03/25/24 "M/d/yy", # 3/25/24 ] # Month name patterns MONTH_NAME_PATTERNS = [ "MMMM d, yyyy", # March 25, 2024 "MMMM d yyyy", # March 25 2024 "d MMMM yyyy", # 25 March 2024 "MMM d yyyy", # Mar 25 2024 "MMM d, yyyy", # Mar 25, 2024 "d MMM yyyy", # 25 Mar 2024 "dd-MMM-yyyy", # 25-Mar-2024 "yyyy MMM dd", # 2024 Mar 25 "d-MMM-yy", # 25-Mar-24 ] # Compact formats COMPACT_PATTERNS = [ "yyyyMMdd", # 20240325 "dd-MM-yy", # 25-03-24 "yy-MM-dd", # 24-03-25 ] # Localized formats LOCALIZED_PATTERNS = [ "yyyy年MM月dd日", # Japanese "yyyy年M月d日", # Japanese (short) "yyyy년 MM월 dd일", # Korean "yyyy년 M월 d일", # Korean (short) "d 'de' MMMM 'de' yyyy", # Spanish/Portuguese "d MMMM yyyy 'г.'", # Russian "d. MMMM yyyy", # German/Scandinavian ] # Patterns with time components TIME_PATTERNS = [ "yyyy-MM-dd HH:mm:ss.SSS", # 2024-03-25 14:30:00.000 "MMMM d, yyyy h:mm a", # March 25, 2024 2:30 PM ] # Add more specific parsing patterns ENHANCED_MONTH_NAME_PATTERNS = [ "d MMMM yyyy", # 4 November 2023 "d MMM yyyy", # 4 Nov 2023 "MMM d, yyyy", # Nov 4, 2023 "MMM-d-yyyy", # May-14-2022 "d-MMM-yyyy", # 14-May-2022 "d-MMMM-yyyy", # 14-November-2022 "MMMM d, yyyy", # November 4, 2023 "dd-MMM-yy", # 14-May-22 "MMM. d yyyy", # Aug. 4 2024 "MMMM d'th' yyyy", # April 4th 2024 "MMMM d'nd' yyyy", # April 2nd 2024 "MMMM d'rd' yyyy", # April 3rd 2024 "MMMM d'st' yyyy", # April 1st 2024 "d'th' MMMM yyyy", # 4th April 2024 "d'nd' MMMM yyyy", # 2nd April 2024 "d'rd' MMMM yyyy", # 3rd April 2024 "d'st' MMMM yyyy", # 1st April 2024 "d MMM. yyyy", # 4 Aug. 2024 ] @classmethod def get_all_patterns(cls) -> List[str]: """ Returns all date patterns in a prioritized order. ISO patterns first, followed by common Western formats, then increasingly specific or localized formats. """ return ( cls.ISO_PATTERNS + cls.WESTERN_DELIMITED + cls.MONTH_NAME_PATTERNS + cls.COMPACT_PATTERNS + cls.LOCALIZED_PATTERNS + cls.TIME_PATTERNS + cls.ENHANCED_MONTH_NAME_PATTERNS ) @classmethod def get_parsing_expressions(cls, date_column: Column) -> List[Column]: """ Returns a list of PySpark SQL expressions for parsing the date column using all available patterns. :param date_column: The column containing date strings to parse :return: List of parsing expressions using try_to_timestamp """ # Start with default timestamp parsing expressions = [F.try_to_timestamp(date_column)] # Add pattern-specific parsing expressions expressions.extend( [ F.try_to_timestamp(date_column, F.lit(pattern)) for pattern in cls.get_all_patterns() ] ) return expressions def _parse_to_timestamp(date_column: str | Column) -> Column: """ Helper function that handles the core timestamp parsing logic shared between normalize_date and normalize_timestamp functions. :param date_column: The text column containing the date or timestamp string :return: A timestamp column with the parsed result """ date_column = normalize_column(date_column) date_exprs = DatePatternGroups.get_parsing_expressions(date_column) return F.coalesce(*date_exprs) @track_usage
[docs] def normalize_date(date_column: str | Column) -> Column: """ Parse a date from a string column, attempting to handle various formats. Returns null for unparseable dates. Always returns a DATE type. :param date_column: The text column containing the date or timestamp string :return: A date column with the parsed result Examples -------- .. code-block:: python :caption: Parse various date formats into a standardized date column from etl_toolkit import E, F df = spark.createDataFrame([ {"date_string": "2024-03-25"}, {"date_string": "03/25/2024"}, {"date_string": "March 25, 2024"}, {"date_string": "1616630400"}, ]) display( df.withColumn("parsed_date", E.normalize_date("date_string")) ) +--------------------+------------+ | date_string |parsed_date | +--------------------+------------+ | 2024-03-25 | 2024-03-25 | +--------------------+------------+ | 03/25/2024 | 2024-03-25 | +--------------------+------------+ | March 25, 2024 | 2024-03-25 | +--------------------+------------+ | 1616630400 | 2021-03-25 | +--------------------+------------+ """ return F.to_date(_parse_to_timestamp(date_column))
@track_usage
[docs] def normalize_timestamp(date_column: str | Column) -> Column: """ Parse a timestamp from a string column, attempting to handle various formats. Returns null for unparseable dates. Always returns a TIMESTAMP type. :param date_column: The text column containing the date or timestamp string :return: A timestamp column with the parsed result Examples -------- .. code-block:: python :caption: Parse various formats into a standardized timestamp column from etl_toolkit import E, F df = spark.createDataFrame([ {"date_string": "2024-03-25T14:30:00"}, {"date_string": "March 25, 2024 2:30 PM"}, {"date_string": "1616630400"}, ]) display( df.withColumn("parsed_ts", E.normalize_timestamp("date_string")) ) +-------------------------+-------------------------+ | date_string | parsed_ts | +-------------------------+-------------------------+ | 2024-03-25T14:30:00 | 2024-03-25 14:30:00.0 | +-------------------------+-------------------------+ | March 25, 2024 2:30 PM | 2024-03-25 14:30:00.0 | +-------------------------+-------------------------+ | 1616630400 | 2021-03-25 00:00:00.0 | +-------------------------+-------------------------+ """ return _parse_to_timestamp(date_column)
@track_usage
[docs] def date_trunc( periodicity: Literal["YEAR", "HALF", "QUARTER", "MONTH", "WEEK", "DAY"], date_column: str | Column, start_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, end_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, calendar=None, ): """ Truncates a date column to the nearest date that matches the given periodicity. For example, this expression can truncate a date column to its nearest week, month, quarter, year, etc. .. tip:: ``E.date_trunc`` expands on the built-in ``F.date_trunc`` to address some common gaps. For example: - It allows weeks to start on a different day of the week if specified. - It returns a ``date`` type instead of a ``timestamp`` type :param periodicity: The date periodicity to trunc the ``date_column`` by. :param date_column: The date column to transform given the ``periodicity``. :param start_day_of_week: Optionally set what the start day of week should be. Only used with a ``WEEK`` periodicity value. For example, if ``start_day_of_week="MONDAY"``, then the ``date_column`` is truncated to the nearest prior date that falls on a Monday. :param end_day_of_week: Similar to ``start_day_of_week`` can specify the ending day of the week to achieve the same effect. For example, if ``end_day_of_week="SUNDAY"``, then the ``date_column`` is truncated to the nearest prior date that falls on a Monday so that each week ends on a Sunday. Cannot specify both ``start_day_of_week`` and ``end_day_of_week``. :return: A date column with the date values rounded down to the nearest date given the ``periodicity`` (i.e. the first week, month, quarter, etc. that the date value would belong to). If not specified (default), the behavior will match built-in pyspark ``F.date_trunc``. Examples -------- .. code-block:: python :caption: Truncate dates to the nearest week, but have weeks start on Mondays using ``start_day_of_week`` argument. from etl_toolkit import E, F df = spark.createDataFrame([ {"date": date(2024, 10, 1)}, {"date": date(2024, 10, 2)}, {"date": date(2024, 10, 8)}, {"date": date(2024, 10, 14)}, ]) display( df.withColumns({ "week_start": E.date_trunc("WEEK", "date", start_day_of_week="SUNDAY"), "month_start": E.date_trunc("MONTH", "date"), "quarter_start": E.date_trunc("QUARTER", "date"), "year_start": E.date_trunc("YEAR", "date"), }) ) +--------------------+------------+------------+--------------+------------+ |date |week_start |month_start |quarter_start |year_start | +--------------------+------------+------------+--------------+------------+ |2024-10-01 |2024-09-29 |2024-10-01 |2024-10-01 |2024-01-01 | +--------------------+------------+------------+--------------+------------+ |2024-10-02 |2024-09-29 |2024-10-01 |2024-10-01 |2024-01-01 | +--------------------+------------+------------+--------------+------------+ |2024-10-08 |2024-10-06 |2024-10-01 |2024-10-01 |2024-01-01 | +--------------------+------------+------------+--------------+------------+ |2024-10-14 |2024-10-13 |2024-10-01 |2024-10-01 |2024-01-01 | +--------------------+------------+------------+--------------+------------+ """ # Normalize inputs date_column = normalize_column(date_column) _validate_day_of_week_bounds( start_day_of_week=start_day_of_week, end_day_of_week=end_day_of_week, ) start_day_of_week, end_day_of_week = _normalize_day_of_week_bounds( start_day_of_week=start_day_of_week, end_day_of_week=end_day_of_week, ) match periodicity.upper(): # Use custom logic for WEEK periodicity when start_day_of_week is specified case "WEEK": if start_day_of_week is not None: trunc_column = F.when( # - if start_day_of_week < current day of week, # Subtract offset difference (i.e. weekday(date) - weekday(start_day_of_week)) DAY_OF_WEEK_OFFSET_MAPPING[start_day_of_week] <= F.weekday(date_column), F.date_add( date_column, DAY_OF_WEEK_OFFSET_MAPPING[start_day_of_week] - F.weekday(date_column), ), ).otherwise( # - if start_day_of_week > current day of week, # Subtract 7 - abs(offset difference) F.date_add( date_column, F.abs( DAY_OF_WEEK_OFFSET_MAPPING[start_day_of_week] - F.weekday(date_column) ) - 7, ), ) else: trunc_column = F.date_trunc(periodicity, date_column).cast("date") # All remaining periodicities should fallback to base pyspark date_trunc logic case _: trunc_column = F.date_trunc(periodicity, date_column).cast("date") return trunc_column
@track_usage
[docs] def date_end( periodicity: Literal["YEAR", "QUARTER", "MONTH", "WEEK", "DAY"], date_column: str | Column, start_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, end_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, calendar=None, ): """ Converts a date column to the latest date that is within the given periodicity. For example, this expression can transform a date column to the last day of it's given week, month, quarter, year, etc. .. tip:: ``E.date_end`` expands on the ``E.date_trunc`` function. It supports similar options and and is effectively the inverse of ``E.date_trunc``. :param periodicity: The date periodicity to modify the ``date_column`` by. :param date_column: The date column to transform given the ``periodicity``. :param start_day_of_week: Optionally set what the start day of week should be. Only used with a ``WEEK`` periodicity value. For example, if ``start_day_of_week="MONDAY"``, then the ``date_column`` is converted to the last day of the week it falls on, assuming weeks start on Monday and end on Sunday. :param end_day_of_week: Optionally set what the end day of week should be. Only used with a ``WEEK`` periodicity value. For example, if ``end_day_of_week="SUNDAY"``, then the ``date_column`` is converted to the last day of the week it falls on, assuming weeks start on Monday and end on Sunday. Cannot specify both ``start_day_of_week`` and ``end_day_of_week``. :return: A date column with the date values representing the latest date given the ``periodicity`` (i.e. the the week, month, quarter, etc.) that the input date value would belong to). Examples -------- .. code-block:: python :caption: Convert dates to the latest week, month, quarter, and year. Weeks start on Mondays using ``start_day_of_week`` argument. from etl_toolkit import E, F df = spark.createDataFrame([ {"date": date(2024, 10, 1)}, {"date": date(2024, 10, 2)}, {"date": date(2024, 10, 8)}, {"date": date(2024, 10, 14)}, ]) display( df.withColumns({ "week_end": E.date_end("WEEK", "date", start_day_of_week="SUNDAY"), "month_end": E.date_end("MONTH", "date"), "quarter_end": E.date_end("QUARTER", "date"), "year_end": E.date_end("YEAR", "date"), }) ) +--------------------+------------+------------+------------+------------+ |date |week_end |month_end |quarter_end |year_end | +--------------------+------------+------------+------------+------------+ |2024-10-01 |2024-10-05 |2024-10-31 |2024-12-31 |2024-12-31 | +--------------------+------------+------------+------------+------------+ |2024-10-02 |2024-10-05 |2024-10-31 |2024-12-31 |2024-12-31 | +--------------------+------------+------------+------------+------------+ |2024-10-08 |2024-10-12 |2024-10-31 |2024-12-31 |2024-12-31 | +--------------------+------------+------------+------------+------------+ |2024-10-14 |2024-10-19 |2024-10-31 |2024-12-31 |2024-12-31 | +--------------------+------------+------------+------------+------------+ """ date_column = normalize_column(date_column) _validate_day_of_week_bounds( start_day_of_week=start_day_of_week, end_day_of_week=end_day_of_week, ) start_day_of_week, end_day_of_week = _normalize_day_of_week_bounds( start_day_of_week=start_day_of_week, end_day_of_week=end_day_of_week, ) trunc_column = date_trunc( periodicity=periodicity, date_column=date_column, start_day_of_week=start_day_of_week, calendar=None, ) match periodicity.upper(): case "DAY": return trunc_column case "WEEK": return F.date_add(trunc_column, 6) case "MONTH": return F.date_add(F.add_months(trunc_column, 1), -1) case "QUARTER": return F.date_add(F.add_months(trunc_column, 3), -1) case "YEAR": return F.date_add(F.add_months(trunc_column, 12), -1) case _: raise InvalidInputException( f"Invalid periodicity supplied: {periodicity}, must be one of: " '["YEAR", "QUARTER", "MONTH", "WEEK", "DAY"]' )
@track_usage
[docs] def next_complete_period( periodicity: Literal["YEAR", "QUARTER", "MONTH", "WEEK", "DAY"], date_column: str | Column, start_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, end_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, calendar=None, ) -> Column: """ Converts the input ``date_column`` to the next earliest date that startsa new period, given the ``periodicity`` specified. For example, this can return the start of the next complete week, month, quarter, year, etc. If the value in the ``date_column`` already starts a given period, the value will be returned without any transformation. :param periodicity: The date periodicity to modify the ``date_column`` by. :param date_column: The date column to transform given the ``periodicity``. :param start_day_of_week: Optionally set what the start day of week should be. Only used with a ``WEEK`` periodicity value. For example, if ``start_day_of_week="MONDAY"``, then the ``date_column`` is converted to the next date that begins a week, assuming weeks start on Monday and end on Sunday. :param end_day_of_week: Optionally set what the end day of week should be. Only used with a ``WEEK`` periodicity value. For example, if ``end_day_of_week="SUNDAY"``, then the ``date_column`` is converted to the next date that begins a week, assuming weeks start on Monday and end on Sunday. Cannot specify both ``start_day_of_week`` and ``end_day_of_week``. Examples -------- .. code-block:: python :caption: Convert dates to the next complete period, given the periodicity. Notice how for `PERIODICITY=='WEEK'`, the input date falls in the middle of a week, so the start of next week is returned. Similar case for `PERIODICITY=='YEAR'`. However for `PERIODICITY=='MONTH'` the input date falls at the start of the month already, so the current value is returned. Similar case for `PERIODICITY=='QUARTER'` from etl_toolkit import E, F df = spark.createDataFrame([ {"date": date(2024, 10, 1)}, ]) display( df.withColumns({ "next_complete_week": E.next_complete_period("WEEK", "date", start_day_of_week="SUNDAY"), "next_complete_month": E.next_complete_period("MONTH", "date"), "next_complete_quarter": E.next_complete_period("QUARTER", "date"), "next_complete_year": E.next_complete_period("YEAR", "date"), }) ) +--------------------+------------------+-------------------+---------------------+------------------+ |date |next_complete_week|next_complete_month|next_complete_quarter|next_complete_year| +--------------------+------------------+-------------------+---------------------+------------------+ |2024-10-01 |2024-10-06 |2024-10-01 |2024-10-01 |2025-01-01 | +--------------------+------------------+-------------------+---------------------+------------------+ """ date_column = normalize_column(date_column) _validate_day_of_week_bounds( start_day_of_week=start_day_of_week, end_day_of_week=end_day_of_week, ) start_day_of_week, end_day_of_week = _normalize_day_of_week_bounds( start_day_of_week=start_day_of_week, end_day_of_week=end_day_of_week, ) # When periods span more than 1 day, # take the day prior to the provided date to account for falling on the start of a new period adjusted_date = F.date_add(date_column, -1) match periodicity.upper(): case "DAY": return F.date_add(date_column, 1) case "WEEK": # Take the end of that week and then add a day to get the subsequent week return F.date_add( date_end("WEEK", adjusted_date, end_day_of_week=end_day_of_week), 1 ) case "MONTH": return F.add_months(date_trunc("MONTH", adjusted_date), 1) case "QUARTER": return F.add_months(date_trunc("QUARTER", adjusted_date), 3) case "YEAR": return F.add_months(date_trunc("YEAR", adjusted_date), 12) case _: raise InvalidInputException( f"Invalid periodicity supplied: {periodicity}, must be one of: " '["YEAR", "QUARTER", "MONTH", "WEEK", "DAY"]' )
# Month name patterns for case insensitive matching MONTH_PATTERNS = { "january|jan\.?": "01", "february|feb\.?": "02", "march|mar\.?": "03", "april|apr\.?": "04", "may": "05", "june|jun\.?": "06", "july|jul\.?": "07", "august|aug\.?": "08", "september|sep\.?|sept\.?": "09", "october|oct\.?": "10", "november|nov\.?": "11", "december|dec\.?": "12", } def _validate_day_of_week_bounds( start_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, end_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, ): if start_day_of_week is not None: start_day_of_week = start_day_of_week.upper() if start_day_of_week not in DAY_OF_WEEK_OFFSET_MAPPING: raise InvalidInputException( f"Invalid start_day_of_week: {start_day_of_week}, must be one of: " '["MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY"]' ) if end_day_of_week is not None: end_day_of_week = end_day_of_week.upper() if end_day_of_week not in DAY_OF_WEEK_OFFSET_MAPPING: raise InvalidInputException( f"Invalid end_day_of_week: {end_day_of_week}, must be one of: " '["MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY"]' ) if ( (start_day_of_week is not None) and (end_day_of_week is not None) and ( ( DAY_OF_WEEK_OFFSET_MAPPING[end_day_of_week] - DAY_OF_WEEK_OFFSET_MAPPING[start_day_of_week] ) % 7 ) != 6 ): raise InvalidInputException( f"When specifying both start_day_of_week or end_day_of_week, they must fully cover the week. " "Choose a start and end day of week that is valid." ) def _normalize_day_of_week_bounds( start_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, end_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, ) -> (str, str): # Captialize day_of_week inputs and make sure to return both the start and end period if start_day_of_week is not None: start_day_of_week = start_day_of_week.upper() end_day_of_week_offset = (DAY_OF_WEEK_OFFSET_MAPPING[start_day_of_week] + 6) % 7 for day_of_week, day_of_week_offset in DAY_OF_WEEK_OFFSET_MAPPING.items(): if day_of_week_offset == end_day_of_week_offset: end_day_of_week = day_of_week break if end_day_of_week is not None: end_day_of_week = end_day_of_week.upper() start_day_of_week_offset = (DAY_OF_WEEK_OFFSET_MAPPING[end_day_of_week] - 6) % 7 for day_of_week, day_of_week_offset in DAY_OF_WEEK_OFFSET_MAPPING.items(): if day_of_week_offset == start_day_of_week_offset: start_day_of_week = day_of_week break return start_day_of_week, end_day_of_week def _get_day_of_week_bounds( start_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, end_day_of_week: Literal[ "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" ] = None, ): if start_day_of_week is not None: start_day_of_week = start_day_of_week.upper() end_day_of_week_offset = (DAY_OF_WEEK_OFFSET_MAPPING[start_day_of_week] + 6) % 7 for day_of_week, day_of_week_offset in DAY_OF_WEEK_OFFSET_MAPPING.items(): if day_of_week_offset == end_day_of_week_offset: end_day_of_week = day_of_week break if end_day_of_week is not None: end_day_of_week = end_day_of_week.upper() start_day_of_week_offset = (DAY_OF_WEEK_OFFSET_MAPPING[end_day_of_week] - 6) % 7 for day_of_week, day_of_week_offset in DAY_OF_WEEK_OFFSET_MAPPING.items(): if day_of_week_offset == start_day_of_week_offset: start_day_of_week = day_of_week break return start_day_of_week, end_day_of_week DATE_PATTERNS = [ # Most precise ISO formats r"^\d{4}-\d{2}-\d{2}(?=\D|$)", # Exact ISO date at start of string r"\d{4}-\d{2}-\d{2}(?=\D|$)", # ISO date anywhere, ensuring non-digit follows r"(?<!\d)\d{4}-\d{2}-\d{2}(?!\d)", # Purely ISO date without time r"(?<!\d)\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:\.\d{3})?(?:Z|[+-]\d{2}:?\d{2})?(?!\d)", # ISO with time # Full context month, day, year formats r"(?<!\d)\d{1,2}/\d{1,2}/\d{2,4}(?!\d)", # Slash pattern for M/D/(YY|YYYY) r"(?<!\d)\d{1,2}-\d{1,2}-\d{2,4}(?!\d)", # Dash pattern for M-D-(YY|YYYY) r"(?<!\d)\d{1,2}\.\d{1,2}\.\d{2,4}(?!\d)", # Dot pattern for M.D.(YY|YYYY) # International and reversed formats r"(?<!\d)\d{1,2}/\d{1,2}/\d{4}(?!\d)", # International DD/MM/YYYY r"(?<!\d)\d{1,2}-\d{1,2}-\d{4}(?!\d)", # DD-MM-YYYY format r"(?<!\d)\d{1,2}\.\d{1,2}\.\d{4}(?!\d)", # DD.MM.YYYY format # Month name + day + year formats r"(?i)(?<!\w)(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|June?|July?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)(?:\s+\d{1,2}(?:,\s*|\s+)\d{2,4})(?!\w)", r"(?i)\d{1,2}\s+(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|June?|July?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)(?:\s*,?)?\s+\d{4}(?!\d)", # Month name variations r"(?i)(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)-\d{1,2}-\d{4}(?!\d)", r"(?i)(?<!\d)\d{1,2}-(?:JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)-\d{4}(?!\d)", # Month name + year formats r"(?i)(?<!\w)(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|June?|July?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)(?:\s+\d{2,4})(?!\w)", # Partial and flexible formats r"\d{4}-\d{2}(?!\d)", # ISO 8601 partial date formats (YYYY-MM) r"(?i)(?<!\w)(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[-\s]\d{2,4}(?!\w)", # Short and contextual patterns r"(?<!\d)\d{1,2}[-/]\d{1,2}(?!\d)", # Short date patterns without year r"(?<!\d)\d{1,2}/\d{1,2}(?!\d)", # Dates without year, assuming current year # Additional flexible patterns r"(?i)(?<!\w)(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|June?|July?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)[-\s]?\d{2}(?!\w)", r"(?<![a-zA-Z0-9])\d{1,2}[-/\.]\d{1,2}[-/\.]\d{2,4}(?![a-zA-Z0-9])", # Month abbreviation with period r"(?i)(?<!\w)(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\.?\s+\d{1,2}(?:,\s*|\s+)\d{4}(?!\w)", # Ordinal dates (4th, 5th, etc.) r"(?i)(?<!\w)(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|June?|July?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s+\d{1,2}(?:st|nd|rd|th)(?:,\s*|\s+)\d{4}(?!\w)", # Day before month r"(?i)(?<!\d)\d{1,2}\s+(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|June?|July?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\.?\s+\d{4}(?!\d)", # Short month names at start r"(?i)(?<!\w)(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Sept|Oct|Nov|Dec)\.?\s+\d{1,2}(?:,\s*|\s+)\d{4}(?!\w)" # Add this pattern at the end for Month YYYY format r"(?i)(?<!\w|\d)(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}(?!\d|\s*\d)", # June 2024 r"(?i)(?<!\w|\d)(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2}(?:st|nd|rd|th)(?:\s*,\s*|\s+)\d{4}(?!\d|\w)", # April 4th 2024 r"(?i)(?<!\w|\d)\d{1,2}(?:st|nd|rd|th)\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)(?:\s*,\s*|\s+)\d{4}(?!\d|\w)", # 4th April 2024 r"(?i)(?<!\w|\d)(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Sept|Oct|Nov|Dec)\.?\s+\d{1,2}(?:st|nd|rd|th)(?:\s*,\s*|\s+)\d{4}(?!\d|\w)", # Apr 4th 2024 ] def _infer_month_year_dates(text_column: Column) -> List[Column]: """ Helper function that generates expressions for inferring dates from month-year text. Returns a list of date expressions that should be combined with the main date array. """ month_year_expr = [] for month_pattern, month_num in MONTH_PATTERNS.items(): # Pattern for just month and year (no day) month_year_pattern = ( f"(?i)(?<!\\w|\\d)({month_pattern})\\s+(\\d{{4}})(?!\\d|\\s*\\d)" ) # Extract year year = F.regexp_extract(text_column, month_year_pattern, 2) matched = F.regexp_extract(text_column, month_year_pattern, 0) != "" # For pure month-year matches, generate start and end of month start_date = F.when( matched, F.concat(year, F.lit("-"), F.lit(month_num), F.lit("-01")) ) end_date = F.when( matched, F.date_format(date_end("MONTH", normalize_date(start_date)), "yyyy-MM-dd"), ) month_year_expr.extend( [ F.when(matched, start_date).otherwise(None), F.when(matched, end_date).otherwise(None), ] ) return month_year_expr def _infer_partial_dates(text_column: Column) -> List[Column]: """ Helper function that finds partial dates (like 9/24) and infers their year from context. Returns list of expressions for inferred complete dates. """ # First find any 4-digit years in the text year_pattern = r"\b\d{4}\b" year = F.regexp_extract(text_column, year_pattern, 0) has_year = year != "" # Pattern for partial dates: M/D or MM/DD partial_pattern = r"(?<!\d)(\d{1,2})/(\d{1,2})(?!\d{2,})" # Get the partial dates matches = F.regexp_extract_all(text_column, partial_pattern, 0) # For each potential match position (try first 3 matches) partial_date_expr = [] for i in range(3): month = F.regexp_extract(text_column, partial_pattern, 1) day = F.regexp_extract(text_column, partial_pattern, 2) matched = F.regexp_extract(text_column, partial_pattern, 0) != "" # Construct full date when we have a year in context inferred_date = F.when( matched & has_year, F.concat( year, F.lit("-"), F.lpad(month, 2, "0"), F.lit("-"), F.lpad(day, 2, "0") ), ) partial_date_expr.append(F.when(matched, inferred_date).otherwise(None)) # Remove this match from text for next iteration text_column = F.regexp_replace(text_column, partial_pattern, "") return partial_date_expr def _preprocess_date_text(text_column: Column) -> Column: # Remove commas and normalize spaces around dashes text = F.regexp_replace(text_column, ",", " ") text = F.regexp_replace(text, "\\s+", " ") # Add space between numbers and letters # This handles patterns like "09jan2024" -> "09 jan 2024" text = F.regexp_replace(text, "(?i)(\\d+)([a-z]+)", "$1 $2") text = F.regexp_replace(text, "(?i)([a-z]+)(\\d+)", "$1 $2") return text @track_usage
[docs] def parse_all_dates( text_column: str | Column, patterns: Optional[List[str]] = None, infer_dates: bool = False, debug: bool = False, ) -> Column: """ Extracts and parses date-like strings from a text column using multiple regex patterns. This function attempts to find and parse multiple date formats from a given text column, returning an array of parsed DATE objects. The parsing leverages the ETL Toolkit's `normalize_date` function to convert extracted strings into valid date objects. :param text_column: A column containing text to extract dates from. Can be a string column name or a PySpark Column. :param patterns: Optional list of custom regex patterns to use for date extraction. If not provided, a predefined set of comprehensive date patterns is used. :param infer_dates: If True, enables month-year inference to expand strings like "June 2024" into start/end dates. If False, keeps basic date extraction behavior. Defaults to False. :param debug: If True, returns the raw string array of the dates extracted, not the converted dates. Defaults to False. This is primarily for testing improvements to this function. :return: A PySpark Column containing an array of parsed dates in ascending order. Returns an empty array if no valid dates are found. Examples ^^^^^^^^ .. code-block:: python :caption: Basic usage of parse_all_dates from etl_toolkit import E, F df = spark.createDataFrame([ {"text": "Meeting scheduled for 2024-03-25 and 05/15/2024"}, {"text": "Conference on February 10, 2023"}, {"text": "Due in June 2024"}, ]) result_df = df.withColumn( "parsed_dates", E.parse_all_dates("text", infer_dates=True) ) display(result_df) +---------------------------------------------------------+-----------------------------+ |text |parsed_dates | +---------------------------------------------------------+-----------------------------+ |Meeting scheduled for 2024-03-25 and 05/15/2024 |[2024-03-25, 2024-05-15] | |Conference on February 10, 2023 |[2023-02-10] | |Due in June 2024 |[2024-06-01, 2024-06-30] | +---------------------------------------------------------+-----------------------------+ .. code-block:: python :caption: Using custom regex patterns custom_patterns = [ r'\b\d{1,2}/\d{1,2}/\d{4}\b', # Only match full M/D/YYYY format r'\b\d{4}-\d{2}-\d{2}\b' # Only match YYYY-MM-DD format ] result_df = df.withColumn( "parsed_dates", E.parse_all_dates("text", patterns=custom_patterns) ) Caution ------- - The function attempts to parse dates using various patterns, but not all possible date formats may be captured. - Ambiguous date formats (e.g., MM/DD/YYYY vs DD/MM/YYYY) are parsed using a best-effort approach and may require additional context or validation. """ # Validate input text_column = normalize_column(text_column) # Standardize month names first text_column = _preprocess_date_text(text_column) # Use provided patterns or default patterns date_patterns = patterns or DATE_PATTERNS # Extract matches using each pattern matches = [] for pattern in date_patterns: extracted = F.regexp_extract_all(text_column, F.lit(pattern), 0) matches.append(extracted) # Combine and deduplicate extracted strings raw_strings_array = F.array_distinct(F.flatten(F.array(*matches))) if infer_dates: # Add month-year inferred dates month_year_dates = _infer_month_year_dates(text_column) partial_dates = _infer_partial_dates(text_column) inferred_dates_array = F.array_union( F.array(*partial_dates), F.array(*month_year_dates) ) raw_strings_array = F.array_union(raw_strings_array, inferred_dates_array) if debug: return raw_strings_array # Parse extracted strings into dates parsed_date_array = F.transform( raw_strings_array, lambda date_str: normalize_date(date_str) ) # Filter out nulls valid_dates = F.filter(F.array_distinct(parsed_date_array), lambda x: x.isNotNull()) result_array = F.array_sort(valid_dates) return result_array
@track_usage
[docs] def parse_date_range(text_column: str | Column | List[str | Column]) -> Column: """ Parse a date range from text, returning a struct with start and end dates. This function attempts to extract date ranges from free-form text by finding and parsing date references. It returns a struct with ``start_date`` and ``end_date`` fields. If exactly one date is found, that date is used for both start and end. If multiple dates are found, the first is used as the start and the second as the end. :param text_column: Text column(s) to parse dates from. Can be a single column, list of columns, or Column expressions. :return: Struct column with 'start_date' and 'end_date' fields. Both fields will be NULL if no valid dates are found in the text. Examples ^^^^^^^^ .. code-block:: python :caption: Parse date ranges from text from etl_toolkit import E, F df = spark.createDataFrame([ {"text": "Operating Supplies 3/13/23 - 3/20/23"}, {"text": "Due in June 2024"}, {"text": "Meeting on 2024-03-25"}, {"text": "No dates here"}, ]) # Get date range as a struct result_df = df.withColumn( "date_range", E.parse_date_range("text") ) # Or unpack into separate columns result_df = df.withColumns({ "start_date": E.parse_date_range("text").start_date, "end_date": E.parse_date_range("text").end_date, }) display(result_df) +----------------------------------------+------------+------------+ |text |start_date |end_date | +----------------------------------------+------------+------------+ |Operating Supplies 3/13/23 - 3/20/23 |2023-03-13 |2023-03-20 | |Due in June 2024 |2024-06-01 |2024-06-30 | |Meeting on 2024-03-25 |2024-03-25 |2024-03-25 | |No dates here |NULL |NULL | +----------------------------------------+------------+------------+ Note ---- - The function automatically enables date inference to handle cases like "June 2024" - When multiple columns are provided, they are concatenated with spaces before parsing - The function handles a wide variety of date formats including partial dates and month names """ # Convert input to list if single column columns = [text_column] if isinstance(text_column, (str, Column)) else text_column # Combine columns into single text combined_text = F.concat_ws(" ", *[F.col(col) for col in columns]) # Extract all dates extracted_dates = parse_all_dates(combined_text, infer_dates=True) # Create struct with start and end dates return F.struct( F.when(F.size(extracted_dates) >= 1, extracted_dates.getItem(0)) .otherwise(None) .alias("start_date"), F.when(F.size(extracted_dates) >= 2, extracted_dates.getItem(1)) .when(F.size(extracted_dates) == 1, extracted_dates.getItem(0)) .otherwise(None) .alias("end_date"), )