Source code for etl_toolkit.writer.standard_metrics_consolidated_workflow

from typing import List, Optional, Union, Dict, Any, Literal, cast, Tuple

from pyspark.sql import DataFrame, Column
from pyspark.sql import functions as F

from yipit_databricks_utils.helpers.telemetry import track_usage
from yipit_databricks_utils.helpers.pyspark_utils import get_spark_session

from etl_toolkit import E
from etl_toolkit.exceptions import InvalidInputException
from etl_toolkit.analyses.standard_metrics.config import (
    entity_configuration,
    standard_metric_metadata,
    standard_metric_configuration,
)
from etl_toolkit.analyses.standard_metrics.unified_kpi import (
    standard_metric_unified_kpi,
)
from etl_toolkit.analyses.standard_metrics.unified_metrics import (
    standard_metrics_unified,
    standard_metric_unified_table_configuration,
)
from etl_toolkit.analyses.standard_metrics.data_download import (
    standard_metric_data_download,
)
from etl_toolkit.analyses.standard_metrics.metric_feed import (
    standard_metric_feed,
    standard_metric_live_feed,
)
from etl_toolkit.analyses.standard_metrics.helpers import _validate_unified_kpi_input_df
from etl_toolkit.writer.table_create import create_table


# Constants
DEFAULT_CATALOG = "yd_production"
REQUIRED_CALENDAR_COLUMNS = [
    "date",
    "week_period_start",
    "week_period_end",
    "month_period_start",
    "month_period_end",
    "quarter_period_start",
    "quarter_period_end",
    "year_period_start",
    "year_period_end",
]


def _normalize_string(input_string: str) -> str:
    """
    Normalize a string by converting to lowercase, replacing spaces with underscores,
    and removing special characters.

    :param input_string: The string to normalize
    :return: A normalized string suitable for use in database and table names

    Examples
    ^^^^^^^^

    .. code-block:: python
        :caption: Normalize a string for database naming

        from etl_toolkit.analyses.standard_metrics.consolidated_workflow import _normalize_string

        normalized = _normalize_string("Gross Food Sales (%)")
        # Returns "gross_food_sales_pct"
    """
    if not isinstance(input_string, str):
        raise InvalidInputException(
            f"Expected string input, got {type(input_string).__name__}"
        )

    return (
        input_string.lower()
        .replace(" ", "_")
        .replace(",", "_")
        .replace(".com", "_com")
        .replace(".", "")
        .replace("-", "")
        .replace("(", "")
        .replace(")", "")
        .replace("[", "")
        .replace("]", "")
        .replace("___", "_")
        .replace("__", "_")
        .replace("%", "pct")
        .replace("+", "plus")
    )


def _get_metric_reporting_database(entity_config: entity_configuration) -> str:
    """
    Generate a standardized database name for metric reporting based on entity configuration.

    :param entity_config: The entity configuration
    :return: A standardized database name for the entity's metrics

    Examples
    ^^^^^^^^

    .. code-block:: python
        :caption: Generate database name for an entity

        from etl_toolkit import A

        entity_config = A.entity_configuration(
            top_level_entity_name="U.S. Food Delivery",
            top_level_entity_ticker=None,
            exchange=None,
            entity_name=None
        )

        database_name = _get_metric_reporting_database(entity_config)
        # Returns "us_food_delivery_reporting_gold"
    """
    if not entity_config:
        raise InvalidInputException("Entity configuration must be provided")

    database = _normalize_string(
        f"{entity_config.top_level_entity_ticker or entity_config.top_level_entity_name}"
        f"_{entity_config.exchange or ''}"
        f"_{entity_config.entity_name or ''}_reporting_gold"
    )
    return database


def _get_table_name_suffix(
    metric_name: str, slice_columns: Optional[List[Union[str, Column]]] = None
) -> str:
    """
    Generate a standardized table name suffix based on metric name and slice columns.

    :param metric_name: The name of the metric
    :param slice_columns: Optional list of columns to slice by
    :return: A standardized table name suffix

    Examples
    ^^^^^^^^

    .. code-block:: python
        :caption: Generate table name suffix

        from etl_toolkit.analyses.standard_metrics.consolidated_workflow import _get_table_name_suffix

        suffix = _get_table_name_suffix("Gross Food Sales", ["company", "metro"])
        # Returns "gross_food_sales_company_metro"
    """
    metric_name_normalized = _normalize_string(metric_name)

    if not slice_columns:
        return metric_name_normalized

    metric_slices_normalized = "_".join(
        [_normalize_string(str(col)) for col in slice_columns]
    )
    return f"{metric_name_normalized}{'_' if metric_slices_normalized else ''}{metric_slices_normalized}"


def _validate_inputs(
    entity_config: Optional[entity_configuration] = None,
    standard_metric_meta: Optional[standard_metric_metadata] = None,
    standard_metric_config: Optional[standard_metric_configuration] = None,
    input_df: Optional[DataFrame] = None,
    calendar_df: Optional[DataFrame] = None,
    catalog_name: Optional[str] = None,
    dry_run: Optional[bool] = None,
):
    """
    Validate common inputs for KPI workflow functions.

    :param entity_config: Optional entity configuration to validate
    :param standard_metric_meta: Optional key metric metadata to validate
    :param standard_metric_config: Optional key metric configuration to validate
    :param input_df: Optional input DataFrame to validate
    :param calendar_df: Optional calendar DataFrame to validate
    :param catalog_name: Optional catalog name to validate
    :param dry_run: Optional dry run flag to validate
    """
    if entity_config is not None and not entity_config:
        raise InvalidInputException("entity_config must be provided")

    if standard_metric_meta is not None and not standard_metric_meta:
        raise InvalidInputException("standard_metric_meta must be provided")

    if standard_metric_config is not None and not standard_metric_config:
        raise InvalidInputException("standard_metric_config must be provided")

    if catalog_name is not None and not isinstance(catalog_name, str):
        raise InvalidInputException(
            f"catalog_name must be a string, got {type(catalog_name).__name__}"
        )

    if dry_run is not None and not isinstance(dry_run, bool):
        raise InvalidInputException(
            f"dry_run must be a boolean, got {type(dry_run).__name__}"
        )


@track_usage
[docs] def create_standard_metric_kpi_table( input_df: DataFrame, entity_config: entity_configuration, standard_metric_meta: standard_metric_metadata, standard_metric_config: standard_metric_configuration, calendar_df: DataFrame, catalog_name: str = DEFAULT_CATALOG, dry_run: bool = False, ) -> Tuple[standard_metric_unified_table_configuration, DataFrame]: """ Create standard metric analysis (unified_kpi) table for a given input DataFrame and configuration. This function creates a unified_kpi table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging. :param input_df: Input DataFrame containing the data to analyze :param entity_config: Entity configuration for the metric :param standard_metric_meta: Metadata defining the metric's properties :param standard_metric_config: Configuration defining how the metric should be aggregated and analyzed :param calendar_df: DataFrame containing calendar information for the analyses :param catalog_name: Name of the catalog where tables will be created, defaults to "yd_production" :param dry_run: If True, shows the results without creating tables :return: A tuple containing (table_config, unified_kpi_df) Examples ^^^^^^^^ .. code-block:: python :caption: Create a unified KPI table for Gross Food Sales from etl_toolkit import A # Define configurations entity_config = A.entity_configuration( top_level_entity_name="U.S. Food Delivery", top_level_entity_ticker=None, exchange=None, entity_name=None ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Gross Food Sales", company_comparable_kpi=False, display_period_granularity="MONTH", currency="USD" ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="gfs_scaled", source_input_date_column="order_date", aggregate_function="SUM", growth_rate_type="CAGR", slice_columns=["company", "metro_tier", "metro"] ) # Create unified_kpi table for a metric and return both the table configuration and the actual DataFrame table_config, unified_kpi_df = A.create_standard_metric_kpi_table( input_df, entity_config, standard_metric_metadata, standard_metric_configuration, calendar_df ) """ # Validate inputs _validate_inputs( entity_config=entity_config, standard_metric_meta=standard_metric_meta, standard_metric_config=standard_metric_config, input_df=input_df, calendar_df=calendar_df, catalog_name=catalog_name, dry_run=dry_run, ) spark = get_spark_session() # Set destination database and table names destination_database = _get_metric_reporting_database(entity_config) table_name_suffix = _get_table_name_suffix( standard_metric_meta.metric_name, standard_metric_config.slice_columns ) destination_table = f"metric_unified_kpi__{table_name_suffix}" # Create unified_kpi table unified_kpi_df = standard_metric_unified_kpi( input_df, entity_config, standard_metric_meta, standard_metric_config, calendar_df, ) if not dry_run: create_table( unified_kpi_df, database_name=destination_database, table_name=destination_table, catalog_name=catalog_name, ) unified_kpi_df = spark.table( f"{catalog_name}.{destination_database}.{destination_table}" ) # Add unified_kpi table configuration unified_kpi_table_location = ( f"{catalog_name}.{destination_database}.{destination_table}" ) table_config = standard_metric_unified_table_configuration( entity_config, unified_kpi_table_location, standard_metric_meta.metric_name, table_type="unified_kpi", ) # Return both table configuration and DataFrame return table_config, unified_kpi_df
@track_usage
[docs] def create_standard_metric_data_download_table( entity_config: entity_configuration, standard_metric_meta: standard_metric_metadata, standard_metric_config: standard_metric_configuration, table_config: Optional[standard_metric_unified_table_configuration] = None, unified_kpi_df: Optional[DataFrame] = None, catalog_name: str = DEFAULT_CATALOG, dry_run: bool = False, ) -> Tuple[standard_metric_unified_table_configuration, DataFrame]: """ Create data download table from unified KPI data. This function creates a data_download table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging. :param entity_config: Entity configuration for the metric :param standard_metric_meta: Metadata defining the metric's properties :param standard_metric_config: Configuration defining how the metric should be aggregated and analyzed :param table_config: Configuration of the unified_kpi table (provide this or unified_kpi_df) :param unified_kpi_df: DataFrame containing the unified_kpi data (provide this or table_config) :param catalog_name: Name of the catalog where tables will be created, defaults to "yd_production" :param dry_run: If True, shows the results without creating tables :return: A tuple containing (table_config, data_download_df) Examples ^^^^^^^^ .. code-block:: python :caption: Create a data download table from a unified KPI table from etl_toolkit import A # Define configurations entity_config = A.entity_configuration( top_level_entity_name="U.S. Food Delivery", top_level_entity_ticker=None, exchange=None, entity_name=None ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Gross Food Sales", company_comparable_kpi=False, display_period_granularity="MONTH", currency="USD" ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="gfs_scaled", source_input_date_column="order_date", aggregate_function="SUM", growth_rate_type="CAGR", slice_columns=["company", "metro_tier", "metro"] ) # First create unified_kpi table table_config, unified_kpi_df = A.create_standard_metric_kpi_table( input_df, entity_config, standard_metric_metadata, standard_metric_configuration, calendar_df ) # Then create data_download table data_download_table_config, data_download_df = A.create_standard_metric_data_download_table( entity_config, standard_metric_metadata, standard_metric_configuration, table_config=table_config ) """ # Validate inputs _validate_inputs( entity_config=entity_config, standard_metric_meta=standard_metric_meta, standard_metric_config=standard_metric_config, catalog_name=catalog_name, dry_run=dry_run, ) if not unified_kpi_df and not table_config: raise InvalidInputException( "Either unified_kpi_df or table_config must be provided" ) spark = get_spark_session() # Set destination database and table names destination_database = _get_metric_reporting_database(entity_config) table_name_suffix = _get_table_name_suffix( standard_metric_meta.metric_name, standard_metric_config.slice_columns ) destination_table = f"metric_data_download__{table_name_suffix}" # Create data_download table - use the DataFrame directly if unified_kpi_df is provided if unified_kpi_df is None and table_config is not None: unified_kpi_df = spark.table(table_config.table_location) data_download_df = standard_metric_data_download(unified_kpi_df, entity_config) if not dry_run: create_table( data_download_df, database_name=destination_database, table_name=destination_table, catalog_name=catalog_name, ) data_download_df = spark.table( f"{catalog_name}.{destination_database}.{destination_table}" ) # Add data_download table configuration table_location = f"{catalog_name}.{destination_database}.{destination_table}" table_config = standard_metric_unified_table_configuration( entity_config, table_location, standard_metric_meta.metric_name, table_type="data_download", ) # Return both table configuration and DataFrame return table_config, data_download_df
@track_usage
[docs] def create_standard_metric_feed_table( entity_config: entity_configuration, standard_metric_meta: standard_metric_metadata, standard_metric_config: standard_metric_configuration, table_config: Optional[standard_metric_unified_table_configuration] = None, unified_kpi_df: Optional[DataFrame] = None, catalog_name: str = DEFAULT_CATALOG, dry_run: bool = False, ) -> Tuple[standard_metric_unified_table_configuration, DataFrame]: """ Create metric feed table from unified KPI data. This function creates a metric_feed table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging. :param entity_config: Entity configuration for the metric :param standard_metric_meta: Metadata defining the metric's properties :param standard_metric_config: Configuration defining how the metric should be aggregated and analyzed :param table_config: Configuration of the unified_kpi table (provide this or unified_kpi_df) :param unified_kpi_df: DataFrame containing the unified_kpi data (provide this or table_config) :param catalog_name: Name of the catalog where tables will be created, defaults to "yd_production" :param dry_run: If True, shows the results without creating tables :return: A tuple containing (table_config, metric_feed_df) Examples ^^^^^^^^ .. code-block:: python :caption: Create a metric feed table from a unified KPI table from etl_toolkit import A # Define configurations entity_config = A.entity_configuration( top_level_entity_name="U.S. Food Delivery", top_level_entity_ticker=None, exchange=None, entity_name=None ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Gross Food Sales", company_comparable_kpi=False, display_period_granularity="MONTH", currency="USD" ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="gfs_scaled", source_input_date_column="order_date", aggregate_function="SUM", growth_rate_type="CAGR", slice_columns=["company", "metro_tier", "metro"] ) # First create unified_kpi table table_config, unified_kpi_df = A.create_standard_metric_kpi_table( input_df, entity_config, standard_metric_metadata, standard_metric_configuration, calendar_df ) # Then create metric_feed table metric_feed_table_config, metric_feed_df = A.create_standard_metric_feed_table( entity_config, standard_metric_metadata, standard_metric_configuration, table_config=table_config ) """ # Validate inputs _validate_inputs( entity_config=entity_config, standard_metric_meta=standard_metric_meta, standard_metric_config=standard_metric_config, catalog_name=catalog_name, dry_run=dry_run, ) if not unified_kpi_df and not table_config: raise InvalidInputException( "Either unified_kpi_df or table_config must be provided" ) spark = get_spark_session() # Set destination database and table names destination_database = _get_metric_reporting_database(entity_config) table_name_suffix = _get_table_name_suffix( standard_metric_meta.metric_name, standard_metric_config.slice_columns ) destination_table = f"metric_feed__{table_name_suffix}" # Create metric_feed table - use the DataFrame directly if unified_kpi_df is provided if unified_kpi_df is None and table_config is not None: unified_kpi_df = spark.table(table_config.table_location) metric_feed_df = standard_metric_feed(unified_kpi_df, entity_config) if not dry_run: create_table( metric_feed_df, database_name=destination_database, table_name=destination_table, catalog_name=catalog_name, ) metric_feed_df = spark.table( f"{catalog_name}.{destination_database}.{destination_table}" ) # Add metric_feed table configuration table_location = f"{catalog_name}.{destination_database}.{destination_table}" table_config = standard_metric_unified_table_configuration( entity_config, table_location, standard_metric_meta.metric_name, table_type="metric_feed", ) # Return both table configuration and DataFrame return table_config, metric_feed_df
@track_usage
[docs] def create_standard_metric_live_feed_table( entity_config: entity_configuration, standard_metric_meta: standard_metric_metadata, standard_metric_config: standard_metric_configuration, table_config: Optional[standard_metric_unified_table_configuration] = None, unified_kpi_df: Optional[DataFrame] = None, catalog_name: str = DEFAULT_CATALOG, dry_run: bool = False, ) -> Tuple[standard_metric_unified_table_configuration, DataFrame]: """ Create legacy live feed table from unified KPI data. This function creates a live_feed table for a metric and returns both the table configuration and the actual DataFrame created for easier debugging. :param entity_config: Entity configuration for the metric :param standard_metric_meta: Metadata defining the metric's properties :param standard_metric_config: Configuration defining how the metric should be aggregated and analyzed :param table_config: Configuration of the unified_kpi table (provide this or unified_kpi_df) :param unified_kpi_df: DataFrame containing the unified_kpi data (provide this or table_config) :param catalog_name: Name of the catalog where tables will be created, defaults to "yd_production" :param dry_run: If True, shows the results without creating tables :return: A tuple containing (table_config, live_feed_df) Examples ^^^^^^^^ .. code-block:: python :caption: Create a live feed table from a unified KPI table from etl_toolkit import A # Define configurations entity_config = A.entity_configuration( top_level_entity_name="U.S. Food Delivery", top_level_entity_ticker=None, exchange=None, entity_name=None ) standard_metric_metadata = A.standard_metric_metadata( metric_name="Gross Food Sales", company_comparable_kpi=False, display_period_granularity="MONTH", currency="USD" ) standard_metric_configuration = A.standard_metric_configuration( source_input_column="gfs_scaled", source_input_date_column="order_date", aggregate_function="SUM", growth_rate_type="CAGR", slice_columns=["company", "metro_tier", "metro"] ) # First create unified_kpi table table_config, unified_kpi_df = A.create_standard_metric_kpi_table( input_df, entity_config, standard_metric_metadata, standard_metric_configuration, calendar_df ) # Then create live_feed table live_feed_table_config, live_feed_df = A.create_standard_metric_live_feed_table( entity_config, standard_metric_metadata, standard_metric_configuration, table_config=table_config ) """ # Validate inputs _validate_inputs( entity_config=entity_config, standard_metric_meta=standard_metric_meta, standard_metric_config=standard_metric_config, catalog_name=catalog_name, dry_run=dry_run, ) if not unified_kpi_df and not table_config: raise InvalidInputException( "Either unified_kpi_df or table_config must be provided" ) spark = get_spark_session() # Set destination database and table names destination_database = _get_metric_reporting_database(entity_config) table_name_suffix = _get_table_name_suffix( standard_metric_meta.metric_name, standard_metric_config.slice_columns ) destination_table = f"live_feed__{table_name_suffix}" # Create live_feed table - use the DataFrame directly if unified_kpi_df is provided if unified_kpi_df is None and table_config is not None: unified_kpi_df = spark.table(table_config.table_location) live_feed_df = standard_metric_live_feed(unified_kpi_df, entity_config) if not dry_run: create_table( live_feed_df, database_name=destination_database, table_name=destination_table, catalog_name=catalog_name, ) live_feed_df = spark.table( f"{catalog_name}.{destination_database}.{destination_table}" ) # Add live_feed table configuration table_location = f"{catalog_name}.{destination_database}.{destination_table}" table_config = standard_metric_unified_table_configuration( entity_config, table_location, standard_metric_meta.metric_name, table_type="live_feed", ) # Return both table configuration and DataFrame return table_config, live_feed_df