Mutable Table functions#
The ETL Toolkit has special functions to create and update tables that are using the ``MutableTable` component from Atlas data apps. Mutable tables are useful when there needs to human-entered data or updates to databricks tables. This should be a preferred method over using google sheets as mutable tables are performant, durable, and preserve change tracking of human edits.
Tip
It is required to use these functions to when creating tables as they will automatically add the necessary meatadata fields and settings to work seamlessly with data apps using this component.
Table Operations#
There is a dedicated function to initialize a mutable table in databricks and subsequently sync or merge in human-edits into a mutable table inside databricks. The idea is that sync operations should be run periodically in accordance to pipeline needs while humans can edit the mutable table as needed.
- etl_toolkit.create_mutable_table(*dfs, primary_column_name='uuid', url_filter_column_names=None, column_definitions=None, ag_grid_kwargs=None, dash_grid_options=None, color_scale=None, catalog_name=None, database_name=None, table_name=None, **kwargs)[source]#
Create a table that supports the
MutableTabledata app structure. Can start from an initial series of dataframes that will have metadata fields added to it automatically. - Dataframes may be empty so long as they have a defined schema - Additonal data app options can be passed in from this function and will be stored as table properties. These will be fetched when the data app renders to control MutableTable behavior. - After the table is created, visit the data app while specifying?table_name=<table_name>to see the table in the data appTip
If using optional arguments, it is helpful to understand Dash AG grid and the options that are available to customize table behavior.
- Parameters:
dfs (pyspark.sql.connect.dataframe.DataFrame | list[pyspark.sql.connect.dataframe.DataFrame]) – One or more DataFrames to be written to the table. If multiple DataFrames are provided, they are appended to the table in the order they are given.
primary_column_name (str) – The column name (
str) of the table to be used as primary key. It must be a UUID, as new records in the originate from the data app will be a UUID4.url_filter_column_names (Optional[list[str]]) – List of column names that exist on the table that can be used to filter the table down via URL parameters when viewing in the data app.
column_definitions (Optional[list[dict]]) – List of column definitions descriped as a list of dictionaries that affect the table behavior in the data app. This matches the syntaks of column definitions for AG Grid generally. If not specified, the table’s schema is used to establish reasonable defaults for AG Grid.
ag_grid_kwargs (Optional[dict]) – Optional dict of keyword argument values to be passed as
kwargstoAgGridwhen rendering in the data app.dash_grid_options (Optional[dict]) – Optional dict of configuration values to be passed to the
dashGridOptionsargument ofAgGridwhen rendering in the data app.color_scale (Optional[dict]) – Optional dict to override color styles for
pending,staged,committed, orsyncedrows. Each key should map to a css-valid color property.catalog_name (Optional[str]) – The name of the catalog to create the table in. If not provided, the catalog name will be derived from the notebook path.
database_name (Optional[str]) – The name of the database to create the table in. If not provided, the database name will be derived from the notebook path.
table_name (Optional[str]) – The name of the table to create. If not provided, the table name will be derived from the notebook path.
Examples#
Creating a mutable table#import uuid from etl_toolkit import create_mutable_table df = spark.createDataFrame([ {"uuid": str(uuid.uuid4()), "name": "Leopold"}, {"uuid": str(uuid.uuid4()), "name": "Molly"} ]) create_mutable_table(df, primary_column_name="uuid",)
Creating a mutable table with custom configurations#import uuid from etl_toolkit import create_mutable_table df = spark.createDataFrame([ {"uuid": str(uuid.uuid4()), "name": "Leopold"}, {"uuid": str(uuid.uuid4()), "name": "Molly"} ]) create_mutable_table( df, primary_column_name="uuid", url_filter_column_names=["name"], column_definitions=[ { "field": "uuid", "editable": False, "cellDataType": "string", } ], )
- etl_toolkit.sync_mutable_table(changelog_table_name, catalog_name=None, database_name=None, table_name=None, connection_info=None)[source]#
Run this operation on a mutable table to sync committed changes from the data app to the delta table. - After syncing, any queries on the delta table will be up to date with edits from the data app - Only committed changes are synced. Staged changes are ignored. - After comitting, the data app state will be updated to reflect the committed changes are now in a “synced” state
- Parameters:
changelog_table_name – The name of the data app changelog table.
catalog_name (Optional[str]) – The name of the catalog to create the table in. If not provided, the catalog name will be derived from the notebook path.
database_name (Optional[str]) – The name of the database to create the table in. If not provided, the database name will be derived from the notebook path.
table_name (Optional[str]) – The name of the table to create. If not provided, the table name will be derived from the notebook path.
connection_info (dict) – Dictionary of postges connection credentials. These are used to authenticate to the database instance.
Warning
Make sure to have
psycopg[binary]==3.2.9installed in the environment when running these functions.Tip
If using lakebase, the user should be a client ID for the service principal permissioned to connect to the database. The password should be an oauth token for the service principal given the workspace the lakebase instance lives in.
Examples#
Syncing a sync_mutable_table#from etl_toolkit import sync_mutable_table connection_info = { "dbname": "databricks_postgres", "user": client_id, "password": token, "port": 5432, "host": lakebase_host, } sync_mutable_table( changlog_table_name="yd_dbr_example_pg.public.changelog_records", connection_info=connection_info, )