import uuid
from typing import List, Union, Optional
import pyspark.sql.functions as F
from pyspark.sql import types as T
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit.expressions.core import normalize_column
def _is_complex_type(datatype: T.DataType) -> bool:
"""Helper function to check if a type is complex (array, map, struct)"""
return isinstance(datatype, (T.ArrayType, T.MapType, T.StructType, T.VariantType))
def _normalize_column_type(
col: F.Column, schema: Optional[T.StructType] = None, col_name: str = None
) -> F.Column:
"""
Helper function to normalize column types to strings for UUID generation
Converts complex types to JSON strings first
"""
if schema and col_name:
field = [f for f in schema.fields if f.name == col_name][0]
if _is_complex_type(field.dataType):
return F.to_json(col).cast("string")
return col.cast("string")
def _get_column_name(col: F.Column) -> str:
"""Extract the column name from a Column object"""
col_str = str(col)
# Extract what's between the quotes in Column<'name'>
return col_str.split("'")[1]
@track_usage
[docs]
def uuid5(
*columns: Union[str, F.Column, List[Union[str, F.Column]]],
schema: Optional[T.StructType] = None,
namespace=uuid.NAMESPACE_OID,
separator: str = "-",
null_placeholder: str = "\0",
) -> F.Column:
"""
Generates a UUIDv5 from the provided columns and namespace, using a custom separator (default "-").
This function creates a RFC 4122/9562 compliant UUIDv5 string using PySpark. It concatenates the input columns
with the specified separator, then uses this concatenated string along with the provided namespace to generate the UUID.
The function can accept individual column names, Column objects, or a list of these. If a list is provided as the
first argument, it will be used as the source of columns.
.. tip:: For complex types (arrays, structs, maps, variants), you must pass in your dataframe schema with the ``schema`` parameter
to ensure proper JSON conversion.
:param columns: The input columns to use for generating the UUID. Can be string column names, Column objects,
or a list of these. If strings are provided, they are converted to Column objects.
:param schema: Optional StructType schema that defines the data types of the input columns. If provided,
complex types will automatically be converted to JSON strings.
:param namespace: The namespace to use for the UUID generation. Defaults to uuid.NAMESPACE_OID.
:param separator: The separator to use when concatenating columns. Defaults to "-".
:param null_placeholder: The placeholder to use for null values. Defaults to "\0" (null byte).
:return: A Column containing the generated UUIDv5 string.
Examples
--------
.. code-block:: python
:caption: Generate a UUIDv5 from multiple columns
from etl_toolkit import E, F
df = spark.createDataFrame([
{"col1": "value1", "col2": "value2", "col3": "value3"},
])
result = df.withColumn("uuid", E.uuid5("col1", "col2", "col3", separator="|"))
display(result)
+-------+-------+-------+------------------------------------+
|col1 |col2 |col3 |uuid |
+-------+-------+-------+------------------------------------+
|value1 |value2 |value3 |70234258-cd49-5512-b42b-2a2336284bde|
+-------+-------+-------+------------------------------------+
.. code-block:: python
:caption: Generate a UUIDv5 from multiple columns with a custom separator character. This affects the UUID generated.
from etl_toolkit import E, F
df = spark.createDataFrame([
{"col1": "value1", "col2": "value2", "col3": "value3"},
])
result = df.withColumn("uuid", E.uuid5("col1", "col2", "col3", separator="|"))
display(result)
+-------+-------+-------+------------------------------------+
|col1 |col2 |col3 |uuid |
+-------+-------+-------+------------------------------------+
|value1 |value2 |value3 |2c4cbae4-03c8-57e8-abd3-83dc81568625|
+-------+-------+-------+------------------------------------+
.. code-block:: python
:caption: Generate a UUIDv5 from multiple columns while using a custom namespace. This affects the UUID generated.
from etl_toolkit import E, F
import uuid
df = spark.createDataFrame([
{"col1": "value1", "col2": "value2", "col3": "value3"},
])
custom_namespace = uuid.uuid4() # returned uuid.UUID("e7950cee-b08d-497e-8eec-564cbabbd81e")
result = df.withColumn("uuid", E.uuid5("col1", "col2", "col3", namespace=custom_namespace))
display(result)
+-------+-------+-------+------------------------------------+
|col1 |col2 |col3 |uuid |
+-------+-------+-------+------------------------------------+
|value1 |value2 |value3 |2f245ebb-3af3-5268-b734-c27a453fcd87|
+-------+-------+-------+------------------------------------+
.. code-block:: python
:caption: Generate a UUIDv5 from multiple columns
from etl_toolkit import E, F
df = spark.createDataFrame([
{"col1": "value1", "col2": "value2", "col3": "value3"},
])
result = df.withColumn("uuid", E.uuid5("col1", "col2", "col3", separator="|"))
display(result)
+-------+-------+-------+------------------------------------+
|col1 |col2 |col3 |uuid |
+-------+-------+-------+------------------------------------+
|value1 |value2 |value3 |70234258-cd49-5512-b42b-2a2336284bde|
+-------+-------+-------+------------------------------------+
.. code-block:: python
:caption: Using schema with complex types
from etl_toolkit import E, F
from pyspark.sql.types import StructType, StructField, ArrayType, StringType
# Define schema with complex types
schema = StructType([
StructField("string_col", StringType(), True),
StructField("array_col", ArrayType(StringType()), True),
StructField("map_col", MapType(StringType(), StringType()), True)
])
df = spark.createDataFrame([
{
"string_col": "value1",
"array_col": ["a", "b"],
"map_col": {"key": "value"}
}
], schema=schema)
# Pass schema to handle complex types automatically
result = df.withColumn(
"uuid",
E.uuid5("string_col", "array_col", "map_col", schema=df.schema)
)
display(result)
+----------+-----------+--------------------+------------------------------------+
|string_col|array_col |map_col |uuid |
+----------+-----------+--------------------+------------------------------------+
|value1 |[a, b] |{"key": "value"} |1befbc2d-441e-59a5-a2c4-6fec02aa078e|
+----------+-----------+--------------------+------------------------------------+
"""
if not columns:
raise Exception("No columns passed!")
# If the first argument is a list, use that as the columns
if isinstance(columns[0], list):
columns = columns[0]
if isinstance(columns[0], str):
columns = [F.col(c) for c in columns]
# First normalize column types to strings and handle complex types
# This must be done before concatenation
normalized_columns = []
for col in columns:
col_name = _get_column_name(col)
# Convert to string first (handles complex types if schema provided)
normalized_col = _normalize_column_type(col, schema, col_name)
# Then handle nulls
normalized_col = F.coalesce(normalized_col, F.lit(null_placeholder))
normalized_columns.append(normalized_col)
# Now we can safely concatenate the normalized strings
name = F.concat_ws(separator, *normalized_columns)
name = normalize_column(name)
# Cast name to binary before concatenation
name_bytes = F.encode(name, "UTF-8")
# Generate SHA1 hash
sha1_hash = F.sha1(F.concat(F.lit(namespace.bytes), name_bytes))
# Format UUID components
return F.concat_ws(
"-",
F.substring(sha1_hash, 1, 8), # Time Low
F.substring(sha1_hash, 9, 4), # Time Mid
F.concat(F.lit("5"), F.substring(sha1_hash, 14, 3)), # Time High and Version
F.concat(
F.lower( # We use lower here because F.hex returns Alphabetical characters as capitals
F.concat(
F.hex(
# Take the 17th "char", convert to bytes and ensure the Most Significant bit is high
# And reconvert to hex to merge with the rest of the substring
F.conv(F.substring(sha1_hash, 17, 1), 16, 10)
.cast("int")
.bitwiseAND(F.lit(0x3))
.bitwiseOR(F.lit(0x8))
),
F.substring(sha1_hash, 18, 3),
)
)
), # Variant
F.substring(sha1_hash, 21, 12), # Rest of String
)