Table creation functions#

The ETL Toolkit has special functions to create and update tables that are superior to the baseline versions found in yipit_databricks_utils.

Tip

It is required to use these functions to when creating tables when working in Golden Path Repos. There are specific features that are designed to work well when on the Golden Path and simplify common operations when managing data pipelines.

Batch Tables#

Most data pipelines will create tables via a “batch” operation (i.e. overwrite the entire table contents during each run). The ETL toolkit has specific functions to making batch operations highly performant and easy to manage.

etl_toolkit.create_table(*dfs, catalog_name=None, database_name=None, table_name=None, **kwargs)[source]#

Creates a new table or overwrites an existing table with the provided DataFrame(s).

The function supports two modes of operation: 1. Path-based (default): Uses the notebook path to determine table location 2. Manual override: Uses explicitly provided catalog, database, and table names (only for specific

engineering usage, not recommended otherwise)

This function is designed to simplify the process of creating and managing tables in a Databricks environment. It resolves the table name based on the current notebook path, following a specific folder structure convention as laid out in the Golden Path standards. If multiple DataFrames are provided, they are appended to the table sequentially.

Parameters:
  • dfs (pyspark.sql.connect.dataframe.DataFrame | list[pyspark.sql.connect.dataframe.DataFrame]) – One or more DataFrames to be written to the table. If multiple DataFrames are provided, they are appended to the table in the order they are given.

  • catalog_name (Optional[str]) – The name of the catalog to create the table in. If not provided, the catalog name will be derived from the notebook path.

  • database_name (Optional[str]) – The name of the database to create the table in. If not provided, the database name will be derived from the notebook path.

  • table_name (Optional[str]) – The name of the table to create. If not provided, the table name will be derived from the notebook path.

  • kwargs – Additional keyword arguments to be passed to the underlying create_table_ydbu function. Notable parameters include: - table_comment: A string comment to be added to the table metadata. - file_retention_days: Integer that dictates how many days of lookback you want to retain for your tables. - minimum_versions: Integer that says how many historical table versions you want to keep accessible.

Warning

Using this create_table function requires following the Golden Path standards.

Ensure that your notebook is located in the correct folder structure for proper table name resolution. create_table will use the folder structure to determine the catalog, database, and table name. Incorrect pathing will result in errors.

The expected path structure, as outlined in the Golden Path standards, is: /<ticker>/pipelines/<catalog>/<database>/<table>

For example, a notebook path of /uber/pipelines/yd_production/uber_gold/coverage that calls create_table will generate a table named yd_production.uber_gold.coverage

Tip

When multiple DataFrames are provided, they are appended to the table sequentially. The first DataFrame creates the table, and subsequent DataFrames are appended. This allows for flexible data loading patterns and more performant table creation as it avoids ineffecient union operations.

Examples#

Creating a table with a single DataFrame#
from etl_toolkit import create_table

df = spark.createDataFrame([
    {"id": 1, "name": "Leopold"},
    {"id": 2, "name": "Molly"}
])

create_table(df)
Creating a table with multiple DataFrames#
from etl_toolkit import create_table

df1 = spark.createDataFrame([
    {"id": 1, "name": "Leopold"}
])

df2 = spark.createDataFrame([
    {"id": 2, "name": "Molly"}
])

create_table(df1, df2)
Creating a table with a custom comment#
from etl_toolkit import create_table

df = spark.createDataFrame([
    {"id": 1, "name": "Leopold"},
    {"id": 2, "name": "Molly"}
    {"id": 3, "name": "Stephen"}
])

create_table(df, table_comment="This table contains information about characters in a novel.")
Creating a table with custom overrides#
from etl_toolkit import create_table

df = spark.createDataFrame([
    {"id": 1, "name": "Leopold"},
    {"id": 2, "name": "Molly"}
    {"id": 3, "name": "Stephen"}
])

create_table(
    df,
    catalog_name='my_catalog',
    database_name='my_database_gold',
    table_name='my_table'
)

Standard Metrics Tables#

These are specific table creation functions for the Standard Metrics Workflow process. They should not be used outside of these workflows.

etl_toolkit.writer.create_standard_metric_kpi_table(input_df, entity_config, standard_metric_meta, standard_metric_config, calendar_df, catalog_name=DEFAULT_CATALOG, dry_run=False)[source]#

Create standard metric analysis (unified_kpi) table for a given input DataFrame and configuration.

This function creates a unified_kpi table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging.

Parameters:
  • input_df (pyspark.sql.DataFrame) – Input DataFrame containing the data to analyze

  • entity_config (etl_toolkit.analyses.standard_metrics.config.entity_configuration) – Entity configuration for the metric

  • standard_metric_meta (etl_toolkit.analyses.standard_metrics.config.standard_metric_metadata) – Metadata defining the metric’s properties

  • standard_metric_config (etl_toolkit.analyses.standard_metrics.config.standard_metric_configuration) – Configuration defining how the metric should be aggregated and analyzed

  • calendar_df (pyspark.sql.DataFrame) – DataFrame containing calendar information for the analyses

  • catalog_name (str) – Name of the catalog where tables will be created, defaults to “yd_production”

  • dry_run (bool) – If True, shows the results without creating tables

Returns:

A tuple containing (table_config, unified_kpi_df)

Return type:

Tuple[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration, pyspark.sql.DataFrame]

Examples#

Create a unified KPI table for Gross Food Sales#
from etl_toolkit import A

# Define configurations
entity_config = A.entity_configuration(
    top_level_entity_name="U.S. Food Delivery",
    top_level_entity_ticker=None,
    exchange=None,
    entity_name=None
)

standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Gross Food Sales",
    company_comparable_kpi=False,
    display_period_granularity="MONTH",
    currency="USD"
)

standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="gfs_scaled",
    source_input_date_column="order_date",
    aggregate_function="SUM",
    growth_rate_type="CAGR",
    slice_columns=["company", "metro_tier", "metro"]
)

# Create unified_kpi table for a metric and return both the table configuration and the actual DataFrame
table_config, unified_kpi_df = A.create_standard_metric_kpi_table(
    input_df,
    entity_config,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)
etl_toolkit.writer.create_standard_metric_data_download_table(entity_config, standard_metric_meta, standard_metric_config, table_config=None, unified_kpi_df=None, catalog_name=DEFAULT_CATALOG, dry_run=False)[source]#

Create data download table from unified KPI data.

This function creates a data_download table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging.

Parameters:
  • entity_config (etl_toolkit.analyses.standard_metrics.config.entity_configuration) – Entity configuration for the metric

  • standard_metric_meta (etl_toolkit.analyses.standard_metrics.config.standard_metric_metadata) – Metadata defining the metric’s properties

  • standard_metric_config (etl_toolkit.analyses.standard_metrics.config.standard_metric_configuration) – Configuration defining how the metric should be aggregated and analyzed

  • table_config (Optional[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration]) – Configuration of the unified_kpi table (provide this or unified_kpi_df)

  • unified_kpi_df (Optional[pyspark.sql.DataFrame]) – DataFrame containing the unified_kpi data (provide this or table_config)

  • catalog_name (str) – Name of the catalog where tables will be created, defaults to “yd_production”

  • dry_run (bool) – If True, shows the results without creating tables

Returns:

A tuple containing (table_config, data_download_df)

Return type:

Tuple[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration, pyspark.sql.DataFrame]

Examples#

Create a data download table from a unified KPI table#
from etl_toolkit import A

# Define configurations
entity_config = A.entity_configuration(
    top_level_entity_name="U.S. Food Delivery",
    top_level_entity_ticker=None,
    exchange=None,
    entity_name=None
)

standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Gross Food Sales",
    company_comparable_kpi=False,
    display_period_granularity="MONTH",
    currency="USD"
)

standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="gfs_scaled",
    source_input_date_column="order_date",
    aggregate_function="SUM",
    growth_rate_type="CAGR",
    slice_columns=["company", "metro_tier", "metro"]
)

# First create unified_kpi table
table_config, unified_kpi_df = A.create_standard_metric_kpi_table(
    input_df,
    entity_config,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

# Then create data_download table
data_download_table_config, data_download_df = A.create_standard_metric_data_download_table(
    entity_config,
    standard_metric_metadata,
    standard_metric_configuration,
    table_config=table_config
)
etl_toolkit.writer.create_standard_metric_feed_table(entity_config, standard_metric_meta, standard_metric_config, table_config=None, unified_kpi_df=None, catalog_name=DEFAULT_CATALOG, dry_run=False)[source]#

Create metric feed table from unified KPI data.

This function creates a metric_feed table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging.

Parameters:
  • entity_config (etl_toolkit.analyses.standard_metrics.config.entity_configuration) – Entity configuration for the metric

  • standard_metric_meta (etl_toolkit.analyses.standard_metrics.config.standard_metric_metadata) – Metadata defining the metric’s properties

  • standard_metric_config (etl_toolkit.analyses.standard_metrics.config.standard_metric_configuration) – Configuration defining how the metric should be aggregated and analyzed

  • table_config (Optional[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration]) – Configuration of the unified_kpi table (provide this or unified_kpi_df)

  • unified_kpi_df (Optional[pyspark.sql.DataFrame]) – DataFrame containing the unified_kpi data (provide this or table_config)

  • catalog_name (str) – Name of the catalog where tables will be created, defaults to “yd_production”

  • dry_run (bool) – If True, shows the results without creating tables

Returns:

A tuple containing (table_config, metric_feed_df)

Return type:

Tuple[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration, pyspark.sql.DataFrame]

Examples#

Create a metric feed table from a unified KPI table#
from etl_toolkit import A

# Define configurations
entity_config = A.entity_configuration(
    top_level_entity_name="U.S. Food Delivery",
    top_level_entity_ticker=None,
    exchange=None,
    entity_name=None
)

standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Gross Food Sales",
    company_comparable_kpi=False,
    display_period_granularity="MONTH",
    currency="USD"
)

standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="gfs_scaled",
    source_input_date_column="order_date",
    aggregate_function="SUM",
    growth_rate_type="CAGR",
    slice_columns=["company", "metro_tier", "metro"]
)

# First create unified_kpi table
table_config, unified_kpi_df = A.create_standard_metric_kpi_table(
    input_df,
    entity_config,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

# Then create metric_feed table
metric_feed_table_config, metric_feed_df = A.create_standard_metric_feed_table(
    entity_config,
    standard_metric_metadata,
    standard_metric_configuration,
    table_config=table_config
)
etl_toolkit.writer.create_standard_metric_live_feed_table(entity_config, standard_metric_meta, standard_metric_config, table_config=None, unified_kpi_df=None, catalog_name=DEFAULT_CATALOG, dry_run=False)[source]#

Create legacy live feed table from unified KPI data.

This function creates a live_feed table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging.

Parameters:
  • entity_config (etl_toolkit.analyses.standard_metrics.config.entity_configuration) – Entity configuration for the metric

  • standard_metric_meta (etl_toolkit.analyses.standard_metrics.config.standard_metric_metadata) – Metadata defining the metric’s properties

  • standard_metric_config (etl_toolkit.analyses.standard_metrics.config.standard_metric_configuration) – Configuration defining how the metric should be aggregated and analyzed

  • table_config (Optional[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration]) – Configuration of the unified_kpi table (provide this or unified_kpi_df)

  • unified_kpi_df (Optional[pyspark.sql.DataFrame]) – DataFrame containing the unified_kpi data (provide this or table_config)

  • catalog_name (str) – Name of the catalog where tables will be created, defaults to “yd_production”

  • dry_run (bool) – If True, shows the results without creating tables

Returns:

A tuple containing (table_config, live_feed_df)

Return type:

Tuple[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration, pyspark.sql.DataFrame]

Examples#

Create a live feed table from a unified KPI table#
from etl_toolkit import A

# Define configurations
entity_config = A.entity_configuration(
    top_level_entity_name="U.S. Food Delivery",
    top_level_entity_ticker=None,
    exchange=None,
    entity_name=None
)

standard_metric_metadata = A.standard_metric_metadata(
    metric_name="Gross Food Sales",
    company_comparable_kpi=False,
    display_period_granularity="MONTH",
    currency="USD"
)

standard_metric_configuration = A.standard_metric_configuration(
    source_input_column="gfs_scaled",
    source_input_date_column="order_date",
    aggregate_function="SUM",
    growth_rate_type="CAGR",
    slice_columns=["company", "metro_tier", "metro"]
)

# First create unified_kpi table
table_config, unified_kpi_df = A.create_standard_metric_kpi_table(
    input_df,
    entity_config,
    standard_metric_metadata,
    standard_metric_configuration,
    calendar_df
)

# Then create live_feed table
live_feed_table_config, live_feed_df = A.create_standard_metric_live_feed_table(
    entity_config,
    standard_metric_metadata,
    standard_metric_configuration,
    table_config=table_config
)