Table creation functions#
The ETL Toolkit has special functions to create and update tables that are superior to the baseline versions found in yipit_databricks_utils.
Tip
It is required to use these functions to when creating tables when working in Golden Path Repos. There are specific features that are designed to work well when on the Golden Path and simplify common operations when managing data pipelines.
Batch Tables#
Most data pipelines will create tables via a “batch” operation (i.e. overwrite the entire table contents during each run). The ETL toolkit has specific functions to making batch operations highly performant and easy to manage.
- etl_toolkit.create_table(*dfs, catalog_name=None, database_name=None, table_name=None, **kwargs)[source]#
Creates a new table or overwrites an existing table with the provided DataFrame(s).
The function supports two modes of operation: 1. Path-based (default): Uses the notebook path to determine table location 2. Manual override: Uses explicitly provided catalog, database, and table names (only for specific
engineering usage, not recommended otherwise)
This function is designed to simplify the process of creating and managing tables in a Databricks environment. It resolves the table name based on the current notebook path, following a specific folder structure convention as laid out in the Golden Path standards. If multiple DataFrames are provided, they are appended to the table sequentially.
- Parameters:
dfs (pyspark.sql.connect.dataframe.DataFrame | list[pyspark.sql.connect.dataframe.DataFrame]) – One or more DataFrames to be written to the table. If multiple DataFrames are provided, they are appended to the table in the order they are given.
catalog_name (Optional[str]) – The name of the catalog to create the table in. If not provided, the catalog name will be derived from the notebook path.
database_name (Optional[str]) – The name of the database to create the table in. If not provided, the database name will be derived from the notebook path.
table_name (Optional[str]) – The name of the table to create. If not provided, the table name will be derived from the notebook path.
kwargs – Additional keyword arguments to be passed to the underlying create_table_ydbu function. Notable parameters include: - table_comment: A string comment to be added to the table metadata. - file_retention_days: Integer that dictates how many days of lookback you want to retain for your tables. - minimum_versions: Integer that says how many historical table versions you want to keep accessible.
Warning
Using this
create_tablefunction requires following the Golden Path standards.Ensure that your notebook is located in the correct folder structure for proper table name resolution.
create_tablewill use the folder structure to determine the catalog, database, and table name. Incorrect pathing will result in errors.The expected path structure, as outlined in the Golden Path standards, is:
/<ticker>/pipelines/<catalog>/<database>/<table>For example, a notebook path of
/uber/pipelines/yd_production/uber_gold/coveragethat callscreate_tablewill generate a table named yd_production.uber_gold.coverageTip
When multiple DataFrames are provided, they are appended to the table sequentially. The first DataFrame creates the table, and subsequent DataFrames are appended. This allows for flexible data loading patterns and more performant table creation as it avoids ineffecient union operations.
Examples#
Creating a table with a single DataFrame#from etl_toolkit import create_table df = spark.createDataFrame([ {"id": 1, "name": "Leopold"}, {"id": 2, "name": "Molly"} ]) create_table(df)
Creating a table with multiple DataFrames#from etl_toolkit import create_table df1 = spark.createDataFrame([ {"id": 1, "name": "Leopold"} ]) df2 = spark.createDataFrame([ {"id": 2, "name": "Molly"} ]) create_table(df1, df2)
Creating a table with a custom comment#from etl_toolkit import create_table df = spark.createDataFrame([ {"id": 1, "name": "Leopold"}, {"id": 2, "name": "Molly"} {"id": 3, "name": "Stephen"} ]) create_table(df, table_comment="This table contains information about characters in a novel.")
Creating a table with custom overrides#from etl_toolkit import create_table df = spark.createDataFrame([ {"id": 1, "name": "Leopold"}, {"id": 2, "name": "Molly"} {"id": 3, "name": "Stephen"} ]) create_table( df, catalog_name='my_catalog', database_name='my_database_gold', table_name='my_table' )
Standard Metrics Tables#
These are specific table creation functions for the Standard Metrics Workflow process. They should not be used outside of these workflows.
- etl_toolkit.writer.create_standard_metric_kpi_table(input_df, entity_config, standard_metric_meta, standard_metric_config, calendar_df, catalog_name=DEFAULT_CATALOG, dry_run=False)[source]#
Create standard metric analysis (unified_kpi) table for a given input DataFrame and configuration.
This function creates a unified_kpi table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging.
- Parameters:
input_df (pyspark.sql.DataFrame) – Input DataFrame containing the data to analyze
entity_config (etl_toolkit.analyses.standard_metrics.config.entity_configuration) – Entity configuration for the metric
standard_metric_meta (etl_toolkit.analyses.standard_metrics.config.standard_metric_metadata) – Metadata defining the metric’s properties
standard_metric_config (etl_toolkit.analyses.standard_metrics.config.standard_metric_configuration) – Configuration defining how the metric should be aggregated and analyzed
calendar_df (pyspark.sql.DataFrame) – DataFrame containing calendar information for the analyses
catalog_name (str) – Name of the catalog where tables will be created, defaults to “yd_production”
dry_run (bool) – If True, shows the results without creating tables
- Returns:
A tuple containing (table_config, unified_kpi_df)
- Return type:
Tuple[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration, pyspark.sql.DataFrame]
Examples#
Create a unified KPI table for Gross Food Sales#from etl_toolkit import A # Define configurations entity_config = A.entity_configuration( top_level_entity_name="U.S. Food Delivery", top_level_entity_ticker=None, exchange=None, entity_name=None ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Gross Food Sales", company_comparable_kpi=False, display_period_granularity="MONTH", currency="USD" ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="gfs_scaled", source_input_date_column="order_date", aggregate_function="SUM", growth_rate_type="CAGR", slice_columns=["company", "metro_tier", "metro"] ) # Create unified_kpi table for a metric and return both the table configuration and the actual DataFrame table_config, unified_kpi_df = A.create_standard_metric_kpi_table( input_df, entity_config, standard_metric_metadata, standard_metric_configuration, calendar_df )
- etl_toolkit.writer.create_standard_metric_data_download_table(entity_config, standard_metric_meta, standard_metric_config, table_config=None, unified_kpi_df=None, catalog_name=DEFAULT_CATALOG, dry_run=False)[source]#
Create data download table from unified KPI data.
This function creates a data_download table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging.
- Parameters:
entity_config (etl_toolkit.analyses.standard_metrics.config.entity_configuration) – Entity configuration for the metric
standard_metric_meta (etl_toolkit.analyses.standard_metrics.config.standard_metric_metadata) – Metadata defining the metric’s properties
standard_metric_config (etl_toolkit.analyses.standard_metrics.config.standard_metric_configuration) – Configuration defining how the metric should be aggregated and analyzed
table_config (Optional[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration]) – Configuration of the unified_kpi table (provide this or unified_kpi_df)
unified_kpi_df (Optional[pyspark.sql.DataFrame]) – DataFrame containing the unified_kpi data (provide this or table_config)
catalog_name (str) – Name of the catalog where tables will be created, defaults to “yd_production”
dry_run (bool) – If True, shows the results without creating tables
- Returns:
A tuple containing (table_config, data_download_df)
- Return type:
Tuple[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration, pyspark.sql.DataFrame]
Examples#
Create a data download table from a unified KPI table#from etl_toolkit import A # Define configurations entity_config = A.entity_configuration( top_level_entity_name="U.S. Food Delivery", top_level_entity_ticker=None, exchange=None, entity_name=None ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Gross Food Sales", company_comparable_kpi=False, display_period_granularity="MONTH", currency="USD" ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="gfs_scaled", source_input_date_column="order_date", aggregate_function="SUM", growth_rate_type="CAGR", slice_columns=["company", "metro_tier", "metro"] ) # First create unified_kpi table table_config, unified_kpi_df = A.create_standard_metric_kpi_table( input_df, entity_config, standard_metric_metadata, standard_metric_configuration, calendar_df ) # Then create data_download table data_download_table_config, data_download_df = A.create_standard_metric_data_download_table( entity_config, standard_metric_metadata, standard_metric_configuration, table_config=table_config )
- etl_toolkit.writer.create_standard_metric_feed_table(entity_config, standard_metric_meta, standard_metric_config, table_config=None, unified_kpi_df=None, catalog_name=DEFAULT_CATALOG, dry_run=False)[source]#
Create metric feed table from unified KPI data.
This function creates a metric_feed table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging.
- Parameters:
entity_config (etl_toolkit.analyses.standard_metrics.config.entity_configuration) – Entity configuration for the metric
standard_metric_meta (etl_toolkit.analyses.standard_metrics.config.standard_metric_metadata) – Metadata defining the metric’s properties
standard_metric_config (etl_toolkit.analyses.standard_metrics.config.standard_metric_configuration) – Configuration defining how the metric should be aggregated and analyzed
table_config (Optional[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration]) – Configuration of the unified_kpi table (provide this or unified_kpi_df)
unified_kpi_df (Optional[pyspark.sql.DataFrame]) – DataFrame containing the unified_kpi data (provide this or table_config)
catalog_name (str) – Name of the catalog where tables will be created, defaults to “yd_production”
dry_run (bool) – If True, shows the results without creating tables
- Returns:
A tuple containing (table_config, metric_feed_df)
- Return type:
Tuple[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration, pyspark.sql.DataFrame]
Examples#
Create a metric feed table from a unified KPI table#from etl_toolkit import A # Define configurations entity_config = A.entity_configuration( top_level_entity_name="U.S. Food Delivery", top_level_entity_ticker=None, exchange=None, entity_name=None ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Gross Food Sales", company_comparable_kpi=False, display_period_granularity="MONTH", currency="USD" ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="gfs_scaled", source_input_date_column="order_date", aggregate_function="SUM", growth_rate_type="CAGR", slice_columns=["company", "metro_tier", "metro"] ) # First create unified_kpi table table_config, unified_kpi_df = A.create_standard_metric_kpi_table( input_df, entity_config, standard_metric_metadata, standard_metric_configuration, calendar_df ) # Then create metric_feed table metric_feed_table_config, metric_feed_df = A.create_standard_metric_feed_table( entity_config, standard_metric_metadata, standard_metric_configuration, table_config=table_config )
- etl_toolkit.writer.create_standard_metric_live_feed_table(entity_config, standard_metric_meta, standard_metric_config, table_config=None, unified_kpi_df=None, catalog_name=DEFAULT_CATALOG, dry_run=False)[source]#
Create legacy live feed table from unified KPI data.
This function creates a live_feed table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging.
- Parameters:
entity_config (etl_toolkit.analyses.standard_metrics.config.entity_configuration) – Entity configuration for the metric
standard_metric_meta (etl_toolkit.analyses.standard_metrics.config.standard_metric_metadata) – Metadata defining the metric’s properties
standard_metric_config (etl_toolkit.analyses.standard_metrics.config.standard_metric_configuration) – Configuration defining how the metric should be aggregated and analyzed
table_config (Optional[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration]) – Configuration of the unified_kpi table (provide this or unified_kpi_df)
unified_kpi_df (Optional[pyspark.sql.DataFrame]) – DataFrame containing the unified_kpi data (provide this or table_config)
catalog_name (str) – Name of the catalog where tables will be created, defaults to “yd_production”
dry_run (bool) – If True, shows the results without creating tables
- Returns:
A tuple containing (table_config, live_feed_df)
- Return type:
Tuple[etl_toolkit.analyses.standard_metrics.unified_metrics.standard_metric_unified_table_configuration, pyspark.sql.DataFrame]
Examples#
Create a live feed table from a unified KPI table#from etl_toolkit import A # Define configurations entity_config = A.entity_configuration( top_level_entity_name="U.S. Food Delivery", top_level_entity_ticker=None, exchange=None, entity_name=None ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Gross Food Sales", company_comparable_kpi=False, display_period_granularity="MONTH", currency="USD" ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="gfs_scaled", source_input_date_column="order_date", aggregate_function="SUM", growth_rate_type="CAGR", slice_columns=["company", "metro_tier", "metro"] ) # First create unified_kpi table table_config, unified_kpi_df = A.create_standard_metric_kpi_table( input_df, entity_config, standard_metric_metadata, standard_metric_configuration, calendar_df ) # Then create live_feed table live_feed_table_config, live_feed_df = A.create_standard_metric_live_feed_table( entity_config, standard_metric_metadata, standard_metric_configuration, table_config=table_config )