Source code for etl_toolkit.expressions.uuid5

import uuid
from typing import List, Union, Optional

import pyspark.sql.functions as F
from pyspark.sql import types as T
from yipit_databricks_utils.helpers.telemetry import track_usage

from etl_toolkit.expressions.core import normalize_column


def _is_complex_type(datatype: T.DataType) -> bool:
    """Helper function to check if a type is complex (array, map, struct)"""
    return isinstance(datatype, (T.ArrayType, T.MapType, T.StructType, T.VariantType))


def _normalize_column_type(
    col: F.Column, schema: Optional[T.StructType] = None, col_name: str = None
) -> F.Column:
    """
    Helper function to normalize column types to strings for UUID generation
    Converts complex types to JSON strings first
    """
    if schema and col_name:
        field = [f for f in schema.fields if f.name == col_name][0]
        if _is_complex_type(field.dataType):
            return F.to_json(col).cast("string")
    return col.cast("string")


def _get_column_name(col: F.Column) -> str:
    """Extract the column name from a Column object"""
    col_str = str(col)
    # Extract what's between the quotes in Column<'name'>
    return col_str.split("'")[1]


@track_usage
[docs] def uuid5( *columns: Union[str, F.Column, List[Union[str, F.Column]]], schema: Optional[T.StructType] = None, namespace=uuid.NAMESPACE_OID, separator: str = "-", null_placeholder: str = "\0", ) -> F.Column: """ Generates a UUIDv5 from the provided columns and namespace, using a custom separator (default "-"). This function creates a RFC 4122/9562 compliant UUIDv5 string using PySpark. It concatenates the input columns with the specified separator, then uses this concatenated string along with the provided namespace to generate the UUID. The function can accept individual column names, Column objects, or a list of these. If a list is provided as the first argument, it will be used as the source of columns. .. tip:: For complex types (arrays, structs, maps, variants), you must pass in your dataframe schema with the ``schema`` parameter to ensure proper JSON conversion. :param columns: The input columns to use for generating the UUID. Can be string column names, Column objects, or a list of these. If strings are provided, they are converted to Column objects. :param schema: Optional StructType schema that defines the data types of the input columns. If provided, complex types will automatically be converted to JSON strings. :param namespace: The namespace to use for the UUID generation. Defaults to uuid.NAMESPACE_OID. :param separator: The separator to use when concatenating columns. Defaults to "-". :param null_placeholder: The placeholder to use for null values. Defaults to "\0" (null byte). :return: A Column containing the generated UUIDv5 string. Examples -------- .. code-block:: python :caption: Generate a UUIDv5 from multiple columns from etl_toolkit import E, F df = spark.createDataFrame([ {"col1": "value1", "col2": "value2", "col3": "value3"}, ]) result = df.withColumn("uuid", E.uuid5("col1", "col2", "col3", separator="|")) display(result) +-------+-------+-------+------------------------------------+ |col1 |col2 |col3 |uuid | +-------+-------+-------+------------------------------------+ |value1 |value2 |value3 |70234258-cd49-5512-b42b-2a2336284bde| +-------+-------+-------+------------------------------------+ .. code-block:: python :caption: Generate a UUIDv5 from multiple columns with a custom separator character. This affects the UUID generated. from etl_toolkit import E, F df = spark.createDataFrame([ {"col1": "value1", "col2": "value2", "col3": "value3"}, ]) result = df.withColumn("uuid", E.uuid5("col1", "col2", "col3", separator="|")) display(result) +-------+-------+-------+------------------------------------+ |col1 |col2 |col3 |uuid | +-------+-------+-------+------------------------------------+ |value1 |value2 |value3 |2c4cbae4-03c8-57e8-abd3-83dc81568625| +-------+-------+-------+------------------------------------+ .. code-block:: python :caption: Generate a UUIDv5 from multiple columns while using a custom namespace. This affects the UUID generated. from etl_toolkit import E, F import uuid df = spark.createDataFrame([ {"col1": "value1", "col2": "value2", "col3": "value3"}, ]) custom_namespace = uuid.uuid4() # returned uuid.UUID("e7950cee-b08d-497e-8eec-564cbabbd81e") result = df.withColumn("uuid", E.uuid5("col1", "col2", "col3", namespace=custom_namespace)) display(result) +-------+-------+-------+------------------------------------+ |col1 |col2 |col3 |uuid | +-------+-------+-------+------------------------------------+ |value1 |value2 |value3 |2f245ebb-3af3-5268-b734-c27a453fcd87| +-------+-------+-------+------------------------------------+ .. code-block:: python :caption: Generate a UUIDv5 from multiple columns from etl_toolkit import E, F df = spark.createDataFrame([ {"col1": "value1", "col2": "value2", "col3": "value3"}, ]) result = df.withColumn("uuid", E.uuid5("col1", "col2", "col3", separator="|")) display(result) +-------+-------+-------+------------------------------------+ |col1 |col2 |col3 |uuid | +-------+-------+-------+------------------------------------+ |value1 |value2 |value3 |70234258-cd49-5512-b42b-2a2336284bde| +-------+-------+-------+------------------------------------+ .. code-block:: python :caption: Using schema with complex types from etl_toolkit import E, F from pyspark.sql.types import StructType, StructField, ArrayType, StringType # Define schema with complex types schema = StructType([ StructField("string_col", StringType(), True), StructField("array_col", ArrayType(StringType()), True), StructField("map_col", MapType(StringType(), StringType()), True) ]) df = spark.createDataFrame([ { "string_col": "value1", "array_col": ["a", "b"], "map_col": {"key": "value"} } ], schema=schema) # Pass schema to handle complex types automatically result = df.withColumn( "uuid", E.uuid5("string_col", "array_col", "map_col", schema=df.schema) ) display(result) +----------+-----------+--------------------+------------------------------------+ |string_col|array_col |map_col |uuid | +----------+-----------+--------------------+------------------------------------+ |value1 |[a, b] |{"key": "value"} |1befbc2d-441e-59a5-a2c4-6fec02aa078e| +----------+-----------+--------------------+------------------------------------+ """ if not columns: raise Exception("No columns passed!") # If the first argument is a list, use that as the columns if isinstance(columns[0], list): columns = columns[0] if isinstance(columns[0], str): columns = [F.col(c) for c in columns] # First normalize column types to strings and handle complex types # This must be done before concatenation normalized_columns = [] for col in columns: col_name = _get_column_name(col) # Convert to string first (handles complex types if schema provided) normalized_col = _normalize_column_type(col, schema, col_name) # Then handle nulls normalized_col = F.coalesce(normalized_col, F.lit(null_placeholder)) normalized_columns.append(normalized_col) # Now we can safely concatenate the normalized strings name = F.concat_ws(separator, *normalized_columns) name = normalize_column(name) # Cast name to binary before concatenation name_bytes = F.encode(name, "UTF-8") # Generate SHA1 hash sha1_hash = F.sha1(F.concat(F.lit(namespace.bytes), name_bytes)) # Format UUID components return F.concat_ws( "-", F.substring(sha1_hash, 1, 8), # Time Low F.substring(sha1_hash, 9, 4), # Time Mid F.concat(F.lit("5"), F.substring(sha1_hash, 14, 3)), # Time High and Version F.concat( F.lower( # We use lower here because F.hex returns Alphabetical characters as capitals F.concat( F.hex( # Take the 17th "char", convert to bytes and ensure the Most Significant bit is high # And reconvert to hex to merge with the rest of the substring F.conv(F.substring(sha1_hash, 17, 1), 16, 10) .cast("int") .bitwiseAND(F.lit(0x3)) .bitwiseOR(F.lit(0x8)) ), F.substring(sha1_hash, 18, 3), ) ) ), # Variant F.substring(sha1_hash, 21, 12), # Rest of String )