Source code for etl_toolkit.analyses.ordering
from pyspark.sql import functions as F, DataFrame, Column
from yipit_databricks_utils.helpers.telemetry import track_usage
from etl_toolkit.expressions.core import normalize_column
@track_usage
[docs]
def shift_columns(df: DataFrame, priority_columns: list[str | Column]) -> DataFrame:
"""
Utility transformation that shifts any specified columns of the dataframe to the beginning of the schema
This can be useful for performance optimization (ex: improving join performance on key columns and clustering).
The re-ordered olumns can include new columns that don't already exist on the dataframe. These will be added in addition to any existing columns in the order of the ``priority_columns`` list
:param df: The dataframe to reorder columns on
:param priority_columns: A list of Column(s) or string(s) to shift to the beginning of the schema. If strings are provided, they will be resolved as Columns.
Examples
-----------
.. code-block:: python
:caption: Re-order columns by specifying which columns should come at the beginning of the schema.
All remaining columns will preserve the original order of the dataframe.
from etl_toolkit import E, F, A
from datetime import date
df = spark.createDataFrame([
{"color": "red", "value": 1000, "date": date(2023, 1, 1)},
{"color": "blue", "value": 50, "date": date(2023, 1, 1)},
])
display(
A.shift_columns(
df,
priority_columns=["date"],
)
)
+--------------+--------------+--------------+
|date |color |value |
+--------------+--------------+--------------+
| 2023-01-01|red | 1000|
+--------------+--------------+--------------+
| 2023-01-01|blue | 50|
+--------------+--------------+--------------+
.. code-block:: python
:caption: New derived columns can be introduced in the ``priority_columns`` list.
These must be derived used other columns of the provided dataframe.
from etl_toolkit import E, F, A
from datetime import date
df = spark.createDataFrame([
{"color": "red", "value": 1000, "date": date(2023, 1, 1)},
{"color": "blue", "value": 50, "date": date(2023, 1, 1)},
])
display(
A.shift_columns(
df,
priority_columns=["date", F.upper("color").alias("label")],
)
)
+--------------+--------------+--------------+--------------+
|date |label |color |output |
+--------------+--------------+--------------+--------------+
| 2023-01-01|RED |red | 1000|
+--------------+--------------+--------------+--------------+
| 2023-01-01|BLUE |blue | 50|
+--------------+--------------+--------------+--------------+
"""
priority_columns = [normalize_column(col) for col in priority_columns]
priority_column_names = df.select(priority_columns).columns
return df.select(
priority_columns
+ [column for column in df.columns if column not in priority_column_names]
)