from typing import List, Optional, Union, Dict, Any, Literal, cast, Tuple
from pyspark.sql import DataFrame, Column
from pyspark.sql import functions as F
from yipit_databricks_utils.helpers.telemetry import track_usage
from yipit_databricks_utils.helpers.pyspark_utils import get_spark_session
from etl_toolkit import E
from etl_toolkit.exceptions import InvalidInputException
from etl_toolkit.analyses.standard_metrics.config import (
entity_configuration,
standard_metric_metadata,
standard_metric_configuration,
)
from etl_toolkit.analyses.standard_metrics.unified_kpi import (
standard_metric_unified_kpi,
)
from etl_toolkit.analyses.standard_metrics.unified_metrics import (
standard_metrics_unified,
standard_metric_unified_table_configuration,
)
from etl_toolkit.analyses.standard_metrics.data_download import (
standard_metric_data_download,
)
from etl_toolkit.analyses.standard_metrics.metric_feed import (
standard_metric_feed,
standard_metric_live_feed,
)
from etl_toolkit.analyses.standard_metrics.helpers import _validate_unified_kpi_input_df
from etl_toolkit.writer.table_create import create_table
# Constants
DEFAULT_CATALOG = "yd_production"
REQUIRED_CALENDAR_COLUMNS = [
"date",
"week_period_start",
"week_period_end",
"month_period_start",
"month_period_end",
"quarter_period_start",
"quarter_period_end",
"year_period_start",
"year_period_end",
]
def _normalize_string(input_string: str) -> str:
"""
Normalize a string by converting to lowercase, replacing spaces with underscores,
and removing special characters.
:param input_string: The string to normalize
:return: A normalized string suitable for use in database and table names
Examples
^^^^^^^^
.. code-block:: python
:caption: Normalize a string for database naming
from etl_toolkit.analyses.standard_metrics.consolidated_workflow import _normalize_string
normalized = _normalize_string("Gross Food Sales (%)")
# Returns "gross_food_sales_pct"
"""
if not isinstance(input_string, str):
raise InvalidInputException(
f"Expected string input, got {type(input_string).__name__}"
)
return (
input_string.lower()
.replace(" ", "_")
.replace(",", "_")
.replace(".com", "_com")
.replace(".", "")
.replace("-", "")
.replace("(", "")
.replace(")", "")
.replace("[", "")
.replace("]", "")
.replace("___", "_")
.replace("__", "_")
.replace("%", "pct")
.replace("+", "plus")
)
def _get_metric_reporting_database(entity_config: entity_configuration) -> str:
"""
Generate a standardized database name for metric reporting based on entity configuration.
:param entity_config: The entity configuration
:return: A standardized database name for the entity's metrics
Examples
^^^^^^^^
.. code-block:: python
:caption: Generate database name for an entity
from etl_toolkit import A
entity_config = A.entity_configuration(
top_level_entity_name="U.S. Food Delivery",
top_level_entity_ticker=None,
exchange=None,
entity_name=None
)
database_name = _get_metric_reporting_database(entity_config)
# Returns "us_food_delivery_reporting_gold"
"""
if not entity_config:
raise InvalidInputException("Entity configuration must be provided")
database = _normalize_string(
f"{entity_config.top_level_entity_ticker or entity_config.top_level_entity_name}"
f"_{entity_config.exchange or ''}"
f"_{entity_config.entity_name or ''}_reporting_gold"
)
return database
def _get_table_name_suffix(
metric_name: str, slice_columns: Optional[List[Union[str, Column]]] = None
) -> str:
"""
Generate a standardized table name suffix based on metric name and slice columns.
:param metric_name: The name of the metric
:param slice_columns: Optional list of columns to slice by
:return: A standardized table name suffix
Examples
^^^^^^^^
.. code-block:: python
:caption: Generate table name suffix
from etl_toolkit.analyses.standard_metrics.consolidated_workflow import _get_table_name_suffix
suffix = _get_table_name_suffix("Gross Food Sales", ["company", "metro"])
# Returns "gross_food_sales_company_metro"
"""
metric_name_normalized = _normalize_string(metric_name)
if not slice_columns:
return metric_name_normalized
metric_slices_normalized = "_".join(
[_normalize_string(str(col)) for col in slice_columns]
)
return f"{metric_name_normalized}{'_' if metric_slices_normalized else ''}{metric_slices_normalized}"
def _validate_inputs(
entity_config: Optional[entity_configuration] = None,
standard_metric_meta: Optional[standard_metric_metadata] = None,
standard_metric_config: Optional[standard_metric_configuration] = None,
input_df: Optional[DataFrame] = None,
calendar_df: Optional[DataFrame] = None,
catalog_name: Optional[str] = None,
dry_run: Optional[bool] = None,
):
"""
Validate common inputs for KPI workflow functions.
:param entity_config: Optional entity configuration to validate
:param standard_metric_meta: Optional key metric metadata to validate
:param standard_metric_config: Optional key metric configuration to validate
:param input_df: Optional input DataFrame to validate
:param calendar_df: Optional calendar DataFrame to validate
:param catalog_name: Optional catalog name to validate
:param dry_run: Optional dry run flag to validate
"""
if entity_config is not None and not entity_config:
raise InvalidInputException("entity_config must be provided")
if standard_metric_meta is not None and not standard_metric_meta:
raise InvalidInputException("standard_metric_meta must be provided")
if standard_metric_config is not None and not standard_metric_config:
raise InvalidInputException("standard_metric_config must be provided")
if catalog_name is not None and not isinstance(catalog_name, str):
raise InvalidInputException(
f"catalog_name must be a string, got {type(catalog_name).__name__}"
)
if dry_run is not None and not isinstance(dry_run, bool):
raise InvalidInputException(
f"dry_run must be a boolean, got {type(dry_run).__name__}"
)
@track_usage
[docs]
def create_standard_metric_kpi_table(
input_df: DataFrame,
entity_config: entity_configuration,
standard_metric_meta: standard_metric_metadata,
standard_metric_config: standard_metric_configuration,
calendar_df: DataFrame,
catalog_name: str = DEFAULT_CATALOG,
dry_run: bool = False,
) -> Tuple[standard_metric_unified_table_configuration, DataFrame]:
"""
Create standard metric analysis (unified_kpi) table for a given input DataFrame and configuration.
This function creates a unified_kpi table for a metric and returns
both the table configuration and the actual DataFrame created for easier debugging.
:param input_df: Input DataFrame containing the data to analyze
:param entity_config: Entity configuration for the metric
:param standard_metric_meta: Metadata defining the metric's properties
:param standard_metric_config: Configuration defining how the metric should be aggregated and analyzed
:param calendar_df: DataFrame containing calendar information for the analyses
:param catalog_name: Name of the catalog where tables will be created, defaults to "yd_production"
:param dry_run: If True, shows the results without creating tables
:return: A tuple containing (table_config, unified_kpi_df)
Examples
^^^^^^^^
.. code-block:: python
:caption: Create a unified KPI table for Gross Food Sales
from etl_toolkit import A
# Define configurations
entity_config = A.entity_configuration(
top_level_entity_name="U.S. Food Delivery",
top_level_entity_ticker=None,
exchange=None,
entity_name=None
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Gross Food Sales",
company_comparable_kpi=False,
display_period_granularity="MONTH",
currency="USD"
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="gfs_scaled",
source_input_date_column="order_date",
aggregate_function="SUM",
growth_rate_type="CAGR",
slice_columns=["company", "metro_tier", "metro"]
)
# Create unified_kpi table for a metric and return both the table configuration and the actual DataFrame
table_config, unified_kpi_df = A.create_standard_metric_kpi_table(
input_df,
entity_config,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
"""
# Validate inputs
_validate_inputs(
entity_config=entity_config,
standard_metric_meta=standard_metric_meta,
standard_metric_config=standard_metric_config,
input_df=input_df,
calendar_df=calendar_df,
catalog_name=catalog_name,
dry_run=dry_run,
)
spark = get_spark_session()
# Set destination database and table names
destination_database = _get_metric_reporting_database(entity_config)
table_name_suffix = _get_table_name_suffix(
standard_metric_meta.metric_name, standard_metric_config.slice_columns
)
destination_table = f"metric_unified_kpi__{table_name_suffix}"
# Create unified_kpi table
unified_kpi_df = standard_metric_unified_kpi(
input_df,
entity_config,
standard_metric_meta,
standard_metric_config,
calendar_df,
)
if not dry_run:
create_table(
unified_kpi_df,
database_name=destination_database,
table_name=destination_table,
catalog_name=catalog_name,
)
unified_kpi_df = spark.table(
f"{catalog_name}.{destination_database}.{destination_table}"
)
# Add unified_kpi table configuration
unified_kpi_table_location = (
f"{catalog_name}.{destination_database}.{destination_table}"
)
table_config = standard_metric_unified_table_configuration(
entity_config,
unified_kpi_table_location,
standard_metric_meta.metric_name,
table_type="unified_kpi",
)
# Return both table configuration and DataFrame
return table_config, unified_kpi_df
@track_usage
[docs]
def create_standard_metric_data_download_table(
entity_config: entity_configuration,
standard_metric_meta: standard_metric_metadata,
standard_metric_config: standard_metric_configuration,
table_config: Optional[standard_metric_unified_table_configuration] = None,
unified_kpi_df: Optional[DataFrame] = None,
catalog_name: str = DEFAULT_CATALOG,
dry_run: bool = False,
) -> Tuple[standard_metric_unified_table_configuration, DataFrame]:
"""
Create data download table from unified KPI data.
This function creates a data_download table for a metric and returns
both the table configuration and the actual DataFrame created for easier debugging.
:param entity_config: Entity configuration for the metric
:param standard_metric_meta: Metadata defining the metric's properties
:param standard_metric_config: Configuration defining how the metric should be aggregated and analyzed
:param table_config: Configuration of the unified_kpi table (provide this or unified_kpi_df)
:param unified_kpi_df: DataFrame containing the unified_kpi data (provide this or table_config)
:param catalog_name: Name of the catalog where tables will be created, defaults to "yd_production"
:param dry_run: If True, shows the results without creating tables
:return: A tuple containing (table_config, data_download_df)
Examples
^^^^^^^^
.. code-block:: python
:caption: Create a data download table from a unified KPI table
from etl_toolkit import A
# Define configurations
entity_config = A.entity_configuration(
top_level_entity_name="U.S. Food Delivery",
top_level_entity_ticker=None,
exchange=None,
entity_name=None
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Gross Food Sales",
company_comparable_kpi=False,
display_period_granularity="MONTH",
currency="USD"
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="gfs_scaled",
source_input_date_column="order_date",
aggregate_function="SUM",
growth_rate_type="CAGR",
slice_columns=["company", "metro_tier", "metro"]
)
# First create unified_kpi table
table_config, unified_kpi_df = A.create_standard_metric_kpi_table(
input_df,
entity_config,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
# Then create data_download table
data_download_table_config, data_download_df = A.create_standard_metric_data_download_table(
entity_config,
standard_metric_metadata,
standard_metric_configuration,
table_config=table_config
)
"""
# Validate inputs
_validate_inputs(
entity_config=entity_config,
standard_metric_meta=standard_metric_meta,
standard_metric_config=standard_metric_config,
catalog_name=catalog_name,
dry_run=dry_run,
)
if not unified_kpi_df and not table_config:
raise InvalidInputException(
"Either unified_kpi_df or table_config must be provided"
)
spark = get_spark_session()
# Set destination database and table names
destination_database = _get_metric_reporting_database(entity_config)
table_name_suffix = _get_table_name_suffix(
standard_metric_meta.metric_name, standard_metric_config.slice_columns
)
destination_table = f"metric_data_download__{table_name_suffix}"
# Create data_download table - use the DataFrame directly if unified_kpi_df is provided
if unified_kpi_df is None and table_config is not None:
unified_kpi_df = spark.table(table_config.table_location)
data_download_df = standard_metric_data_download(unified_kpi_df, entity_config)
if not dry_run:
create_table(
data_download_df,
database_name=destination_database,
table_name=destination_table,
catalog_name=catalog_name,
)
data_download_df = spark.table(
f"{catalog_name}.{destination_database}.{destination_table}"
)
# Add data_download table configuration
table_location = f"{catalog_name}.{destination_database}.{destination_table}"
table_config = standard_metric_unified_table_configuration(
entity_config,
table_location,
standard_metric_meta.metric_name,
table_type="data_download",
)
# Return both table configuration and DataFrame
return table_config, data_download_df
@track_usage
[docs]
def create_standard_metric_feed_table(
entity_config: entity_configuration,
standard_metric_meta: standard_metric_metadata,
standard_metric_config: standard_metric_configuration,
table_config: Optional[standard_metric_unified_table_configuration] = None,
unified_kpi_df: Optional[DataFrame] = None,
catalog_name: str = DEFAULT_CATALOG,
dry_run: bool = False,
) -> Tuple[standard_metric_unified_table_configuration, DataFrame]:
"""
Create metric feed table from unified KPI data.
This function creates a metric_feed table for a metric and returns
both the table configuration and the actual DataFrame created for easier debugging.
:param entity_config: Entity configuration for the metric
:param standard_metric_meta: Metadata defining the metric's properties
:param standard_metric_config: Configuration defining how the metric should be aggregated and analyzed
:param table_config: Configuration of the unified_kpi table (provide this or unified_kpi_df)
:param unified_kpi_df: DataFrame containing the unified_kpi data (provide this or table_config)
:param catalog_name: Name of the catalog where tables will be created, defaults to "yd_production"
:param dry_run: If True, shows the results without creating tables
:return: A tuple containing (table_config, metric_feed_df)
Examples
^^^^^^^^
.. code-block:: python
:caption: Create a metric feed table from a unified KPI table
from etl_toolkit import A
# Define configurations
entity_config = A.entity_configuration(
top_level_entity_name="U.S. Food Delivery",
top_level_entity_ticker=None,
exchange=None,
entity_name=None
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Gross Food Sales",
company_comparable_kpi=False,
display_period_granularity="MONTH",
currency="USD"
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="gfs_scaled",
source_input_date_column="order_date",
aggregate_function="SUM",
growth_rate_type="CAGR",
slice_columns=["company", "metro_tier", "metro"]
)
# First create unified_kpi table
table_config, unified_kpi_df = A.create_standard_metric_kpi_table(
input_df,
entity_config,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
# Then create metric_feed table
metric_feed_table_config, metric_feed_df = A.create_standard_metric_feed_table(
entity_config,
standard_metric_metadata,
standard_metric_configuration,
table_config=table_config
)
"""
# Validate inputs
_validate_inputs(
entity_config=entity_config,
standard_metric_meta=standard_metric_meta,
standard_metric_config=standard_metric_config,
catalog_name=catalog_name,
dry_run=dry_run,
)
if not unified_kpi_df and not table_config:
raise InvalidInputException(
"Either unified_kpi_df or table_config must be provided"
)
spark = get_spark_session()
# Set destination database and table names
destination_database = _get_metric_reporting_database(entity_config)
table_name_suffix = _get_table_name_suffix(
standard_metric_meta.metric_name, standard_metric_config.slice_columns
)
destination_table = f"metric_feed__{table_name_suffix}"
# Create metric_feed table - use the DataFrame directly if unified_kpi_df is provided
if unified_kpi_df is None and table_config is not None:
unified_kpi_df = spark.table(table_config.table_location)
metric_feed_df = standard_metric_feed(unified_kpi_df, entity_config)
if not dry_run:
create_table(
metric_feed_df,
database_name=destination_database,
table_name=destination_table,
catalog_name=catalog_name,
)
metric_feed_df = spark.table(
f"{catalog_name}.{destination_database}.{destination_table}"
)
# Add metric_feed table configuration
table_location = f"{catalog_name}.{destination_database}.{destination_table}"
table_config = standard_metric_unified_table_configuration(
entity_config,
table_location,
standard_metric_meta.metric_name,
table_type="metric_feed",
)
# Return both table configuration and DataFrame
return table_config, metric_feed_df
@track_usage
[docs]
def create_standard_metric_live_feed_table(
entity_config: entity_configuration,
standard_metric_meta: standard_metric_metadata,
standard_metric_config: standard_metric_configuration,
table_config: Optional[standard_metric_unified_table_configuration] = None,
unified_kpi_df: Optional[DataFrame] = None,
catalog_name: str = DEFAULT_CATALOG,
dry_run: bool = False,
) -> Tuple[standard_metric_unified_table_configuration, DataFrame]:
"""
Create legacy live feed table from unified KPI data.
This function creates a live_feed table for a metric and returns
both the table configuration and the actual DataFrame created for easier debugging.
:param entity_config: Entity configuration for the metric
:param standard_metric_meta: Metadata defining the metric's properties
:param standard_metric_config: Configuration defining how the metric should be aggregated and analyzed
:param table_config: Configuration of the unified_kpi table (provide this or unified_kpi_df)
:param unified_kpi_df: DataFrame containing the unified_kpi data (provide this or table_config)
:param catalog_name: Name of the catalog where tables will be created, defaults to "yd_production"
:param dry_run: If True, shows the results without creating tables
:return: A tuple containing (table_config, live_feed_df)
Examples
^^^^^^^^
.. code-block:: python
:caption: Create a live feed table from a unified KPI table
from etl_toolkit import A
# Define configurations
entity_config = A.entity_configuration(
top_level_entity_name="U.S. Food Delivery",
top_level_entity_ticker=None,
exchange=None,
entity_name=None
)
standard_metric_metadata = A.standard_metric_metadata(
metric_name="Gross Food Sales",
company_comparable_kpi=False,
display_period_granularity="MONTH",
currency="USD"
)
standard_metric_configuration = A.standard_metric_configuration(
source_input_column="gfs_scaled",
source_input_date_column="order_date",
aggregate_function="SUM",
growth_rate_type="CAGR",
slice_columns=["company", "metro_tier", "metro"]
)
# First create unified_kpi table
table_config, unified_kpi_df = A.create_standard_metric_kpi_table(
input_df,
entity_config,
standard_metric_metadata,
standard_metric_configuration,
calendar_df
)
# Then create live_feed table
live_feed_table_config, live_feed_df = A.create_standard_metric_live_feed_table(
entity_config,
standard_metric_metadata,
standard_metric_configuration,
table_config=table_config
)
"""
# Validate inputs
_validate_inputs(
entity_config=entity_config,
standard_metric_meta=standard_metric_meta,
standard_metric_config=standard_metric_config,
catalog_name=catalog_name,
dry_run=dry_run,
)
if not unified_kpi_df and not table_config:
raise InvalidInputException(
"Either unified_kpi_df or table_config must be provided"
)
spark = get_spark_session()
# Set destination database and table names
destination_database = _get_metric_reporting_database(entity_config)
table_name_suffix = _get_table_name_suffix(
standard_metric_meta.metric_name, standard_metric_config.slice_columns
)
destination_table = f"live_feed__{table_name_suffix}"
# Create live_feed table - use the DataFrame directly if unified_kpi_df is provided
if unified_kpi_df is None and table_config is not None:
unified_kpi_df = spark.table(table_config.table_location)
live_feed_df = standard_metric_live_feed(unified_kpi_df, entity_config)
if not dry_run:
create_table(
live_feed_df,
database_name=destination_database,
table_name=destination_table,
catalog_name=catalog_name,
)
live_feed_df = spark.table(
f"{catalog_name}.{destination_database}.{destination_table}"
)
# Add live_feed table configuration
table_location = f"{catalog_name}.{destination_database}.{destination_table}"
table_config = standard_metric_unified_table_configuration(
entity_config,
table_location,
standard_metric_meta.metric_name,
table_type="live_feed",
)
# Return both table configuration and DataFrame
return table_config, live_feed_df