diff --git a/realtime-box/README.md b/realtime-box/README.md new file mode 100644 index 00000000..5d7104d0 --- /dev/null +++ b/realtime-box/README.md @@ -0,0 +1,7 @@ +# Realtime Box + +Workflow templates for **Treasure AI RT 2.0** features. + +## Contents + +- [Lookup Catalog Sync](./lookup-catalog-sync) — Sync tables from `cdp_lookup_catalog` to RT 2.0 internal storage for use in real-time personalization. diff --git a/realtime-box/lookup-catalog-sync/README.md b/realtime-box/lookup-catalog-sync/README.md new file mode 100644 index 00000000..14726d1b --- /dev/null +++ b/realtime-box/lookup-catalog-sync/README.md @@ -0,0 +1,21 @@ +# Lookup Catalog Sync Workflow + +Digdag workflow templates to sync tables from the `cdp_lookup_catalog` database to RT 2.0 internal storage. Only changed records are uploaded on each run (hash-based change detection). + +## Variants + +| Variant | When to use | +|---------|-------------| +| [manual/](./manual) | Fewer than 5 tables, schemas change infrequently, or you need explicit control over column selection. | +| [table-discovery/](./table-discovery) | 5 or more tables, schemas change frequently, or you prefer zero-maintenance automatic table detection. Requires an additional feature flag — contact Treasure AI Support. | + +## Common Requirements + +- A `cdp_lookup_catalog` database must exist in Data Workbench with lookup tables already created. +- Each lookup table must have its primary key as the **first column** (unique, non-null, not named `time`, type `string`/`int`/`long`). +- A TD API key must be stored in the workflow project's Secrets as `td.apikey`. +- The `reactor_importer_endpoint` and `reactor_instance` values must be obtained from Treasure AI Support or your Customer Success Manager. + +## Documentation + +See the [Lookup Catalog Sync Workflow](https://docs.treasuredata.com/products/customer-data-platform/real-time/lookup-catalog-workflow) page in the Treasure AI documentation portal for full setup instructions, scheduling recommendations, and troubleshooting. diff --git a/realtime-box/lookup-catalog-sync/manual/README.md b/realtime-box/lookup-catalog-sync/manual/README.md new file mode 100644 index 00000000..2aa5c2c8 --- /dev/null +++ b/realtime-box/lookup-catalog-sync/manual/README.md @@ -0,0 +1,23 @@ +# Lookup Catalog Sync — Manual Configuration + +Use this variant when you have a small number of tables and want explicit control over which columns are synced. + +## Setup + +1. Copy this directory into your TD Workflow project. +2. Edit `lookup_catalog_sync.dig`: + - Set `reactor_importer_endpoint` (obtain from Treasure AI Support). + - Set `reactor_instance` (obtain from Treasure AI Support). + - Define each table under `tables:` with `name`, `key_column`, and `col_expr`. +3. Store your TD API key in Secrets as `td.apikey`. +4. Set a schedule and run once manually to verify the initial upload. + +## Files + +| File | Description | +|------|-------------| +| `lookup_catalog_sync.dig` | Main workflow — iterates over configured tables | +| `queries/initialize_digest.sql` | Creates digest tracking table if not exists | +| `queries/extract_updated.sql` | Extracts changed records by hash comparison | +| `queries/check_count.sql` | Counts changed records | +| `queries/update_digest.sql` | Updates digest table after upload | diff --git a/realtime-box/lookup-catalog-sync/manual/lookup_catalog_sync.dig b/realtime-box/lookup-catalog-sync/manual/lookup_catalog_sync.dig new file mode 100644 index 00000000..382fe0b3 --- /dev/null +++ b/realtime-box/lookup-catalog-sync/manual/lookup_catalog_sync.dig @@ -0,0 +1,91 @@ +_export: + # Obtain these values from Treasure AI Support or your Customer Success Manager + reactor_importer_endpoint: "https://" + reactor_instance: "" + + td: + database: cdp_lookup_catalog + + # Define each table to sync. + # key_column: primary key column name (must be the first column in the table) + # col_expr: SQL expression for each non-key column to include in the JSON payload + tables: + - name: limited_time_sale + key_column: product_id + col_expr: >- + '"discount_rate":' || json_format(cast(discount_rate AS json)), + '"sale_end":' || json_format(cast(sale_end AS json)), + '"banner_html":' || json_format(cast(banner_html AS json)) + + # Add more tables as needed: + # - name: user_product_coupon + # key_column: user_product_id + # col_expr: >- + # '"has_coupon":' || json_format(cast(has_coupon AS json)), + # '"coupon_code":' || json_format(cast(coupon_code AS json)) + ++sync_all_tables: + for_each>: + table_config: ${tables} + _parallel: + limit: 2 + _do: + +log_start: + echo>: "Processing ${table_config.name}" + + +init_digests: + td>: queries/initialize_digest.sql + database: ${td.database} + + +extract_updated: + td>: queries/extract_updated.sql + database: ${td.database} + + +check_count: + td>: queries/check_count.sql + database: ${td.database} + store_last_results: true + + +upload_if_needed: + if>: ${td.last_results.row_count > 0} + _do: + +log_upload: + echo>: "Uploading ${td.last_results.row_count} changed records for ${table_config.name}" + + +upload: + td>: + database: ${td.database} + query: | + SELECT + ${table_config.key_column}, + payload + FROM _wf_${table_config.name}_updated + result_url: | + { + "type": "rest", + "method": "POST", + "authorization": "${secret:td.apikey}", + "endpoint": "${reactor_importer_endpoint}/internal/lookup-catalog/bulk-load", + "headers": "{\"x-reactor-instance-name\": \"${reactor_instance}\", \"x-lookup-table-name\":\"${table_config.name}\"}", + "parallelism": "10", + "page_size": 1000 + } + + +update_digests: + td>: queries/update_digest.sql + database: ${td.database} + + +rename_digests: + td_ddl>: + rename_tables: + - from: _wf_${table_config.name}_digests_new + to: _wf_${table_config.name}_digests + + _else_do: + +log_no_updates: + echo>: "No changes for ${table_config.name}" + + +cleanup: + td_ddl>: + drop_tables: + - _wf_${table_config.name}_updated diff --git a/realtime-box/lookup-catalog-sync/manual/queries/check_count.sql b/realtime-box/lookup-catalog-sync/manual/queries/check_count.sql new file mode 100644 index 00000000..c90924db --- /dev/null +++ b/realtime-box/lookup-catalog-sync/manual/queries/check_count.sql @@ -0,0 +1,2 @@ +SELECT COUNT(*) AS row_count +FROM _wf_${table_config.name}_updated diff --git a/realtime-box/lookup-catalog-sync/manual/queries/extract_updated.sql b/realtime-box/lookup-catalog-sync/manual/queries/extract_updated.sql new file mode 100644 index 00000000..8e37483c --- /dev/null +++ b/realtime-box/lookup-catalog-sync/manual/queries/extract_updated.sql @@ -0,0 +1,19 @@ +DROP TABLE IF EXISTS _wf_${table_config.name}_updated; + +CREATE TABLE _wf_${table_config.name}_updated AS +WITH src AS ( + SELECT + ${table_config.key_column}, + '{' || array_join(ARRAY[${table_config.col_expr}], ',') || '}' AS payload + FROM cdp_lookup_catalog.${table_config.name} +) +SELECT + src.${table_config.key_column}, + src.payload +FROM src +WHERE NOT EXISTS ( + SELECT 1 + FROM _wf_${table_config.name}_digests dig + WHERE dig.${table_config.key_column} = src.${table_config.key_column} + AND dig.payload_xxhash64 = from_big_endian_64(xxhash64(to_utf8(src.payload))) +) diff --git a/realtime-box/lookup-catalog-sync/manual/queries/initialize_digest.sql b/realtime-box/lookup-catalog-sync/manual/queries/initialize_digest.sql new file mode 100644 index 00000000..544b1ecd --- /dev/null +++ b/realtime-box/lookup-catalog-sync/manual/queries/initialize_digest.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS _wf_${table_config.name}_digests AS +SELECT + CAST(NULL AS VARCHAR) AS ${table_config.key_column}, + CAST(NULL AS BIGINT) AS payload_xxhash64 +LIMIT 0 diff --git a/realtime-box/lookup-catalog-sync/manual/queries/update_digest.sql b/realtime-box/lookup-catalog-sync/manual/queries/update_digest.sql new file mode 100644 index 00000000..0dd54217 --- /dev/null +++ b/realtime-box/lookup-catalog-sync/manual/queries/update_digest.sql @@ -0,0 +1,12 @@ +DROP TABLE IF EXISTS _wf_${table_config.name}_digests_new; + +CREATE TABLE _wf_${table_config.name}_digests_new AS +SELECT + ${table_config.key_column}, + COALESCE( + from_big_endian_64(xxhash64(to_utf8(upd.payload))), + dig.payload_xxhash64 + ) AS payload_xxhash64 +FROM _wf_${table_config.name}_updated upd +FULL OUTER JOIN _wf_${table_config.name}_digests dig + USING (${table_config.key_column}) diff --git a/realtime-box/lookup-catalog-sync/table-discovery/README.md b/realtime-box/lookup-catalog-sync/table-discovery/README.md new file mode 100644 index 00000000..9da528d5 --- /dev/null +++ b/realtime-box/lookup-catalog-sync/table-discovery/README.md @@ -0,0 +1,23 @@ +# Lookup Catalog Sync — Table Discovery + +Use this variant when you manage many tables or want the workflow to automatically detect new tables added to `cdp_lookup_catalog`. + +**Requires an additional feature flag.** Contact Treasure AI Support to enable it before using this variant. + +## Setup + +1. Copy this directory into your TD Workflow project. +2. Edit `lookup_catalog_sync.dig`: + - Set `reactor_importer_endpoint` (obtain from Treasure AI Support). + - Set `reactor_instance` (obtain from Treasure AI Support). +3. Store your TD API key in Secrets as `td.apikey`. +4. Set a schedule and run once manually to verify the initial upload. + +## Files + +| File | Description | +|------|-------------| +| `lookup_catalog_sync.dig` | Main workflow — discovers and syncs all tables automatically | +| `sync_table.dig` | Reusable single-table sync logic called by the main workflow | +| `scripts/generate_sql.py` | Python script that generates type-aware extract SQL per table | +| `queries/discover_tables.sql` | Discovers eligible tables, excludes `_wf_*` internal tables | diff --git a/realtime-box/lookup-catalog-sync/table-discovery/lookup_catalog_sync.dig b/realtime-box/lookup-catalog-sync/table-discovery/lookup_catalog_sync.dig new file mode 100644 index 00000000..0d0e1b82 --- /dev/null +++ b/realtime-box/lookup-catalog-sync/table-discovery/lookup_catalog_sync.dig @@ -0,0 +1,44 @@ +_export: + # Obtain these values from Treasure AI Support or your Customer Success Manager + reactor_importer_endpoint: "https://" + reactor_instance: "" + + td: + database: cdp_lookup_catalog + + # Leave empty to sync all discovered tables. + # Set to a specific table name to sync only that table (useful for testing). + p_table_name: "" + + batch_size: 1000 + parallelism: 10 + ++main: + +init_digest_tables: + td_for_each>: queries/discover_tables.sql + _do: + +create_digest: + td>: + database: ${td.database} + query: | + CREATE TABLE IF NOT EXISTS _wf_${td.each.table_name}_digests ( + _key varchar, + payload_xxhash64 bigint, + time bigint + ) + + +sync_tables: + if>: ${p_table_name != ""} + _do: + +sync_single: + call>: sync_table.dig + _export: + table_name: ${p_table_name} + _else_do: + +sync_all: + td_for_each>: queries/discover_tables.sql + _do: + +sync_each: + call>: sync_table.dig + _export: + table_name: ${td.each.table_name} diff --git a/realtime-box/lookup-catalog-sync/table-discovery/queries/discover_tables.sql b/realtime-box/lookup-catalog-sync/table-discovery/queries/discover_tables.sql new file mode 100644 index 00000000..8cddca8c --- /dev/null +++ b/realtime-box/lookup-catalog-sync/table-discovery/queries/discover_tables.sql @@ -0,0 +1,8 @@ +-- Discover all eligible tables in cdp_lookup_catalog. +-- Excludes internal workflow tables (_wf_* prefix). +SELECT table_name +FROM information_schema.tables +WHERE table_schema = '${td.database}' + AND table_name NOT LIKE '_wf_%' + AND table_type = 'BASE TABLE' +ORDER BY table_name diff --git a/realtime-box/lookup-catalog-sync/table-discovery/scripts/generate_sql.py b/realtime-box/lookup-catalog-sync/table-discovery/scripts/generate_sql.py new file mode 100644 index 00000000..40486ea7 --- /dev/null +++ b/realtime-box/lookup-catalog-sync/table-discovery/scripts/generate_sql.py @@ -0,0 +1,132 @@ +""" +Generates type-aware extract SQL for Lookup Catalog sync. + +Each column is serialized to JSON with type-specific handling: + - array(varchar): JSON array of strings with NULL element preservation + - array(bigint/integer): JSON array of integers + - array(double/real): JSON array of clean decimals (avoids floating-point artifacts) + - double/real (scalar): clean decimal (avoids floating-point artifacts) + - Other unsupported array types: raises ValueError + - All other types: string with quote escaping, NULL-safe + +The key column (first column) and the 'time' column are excluded from the payload. +""" + +import json +import digdag + + +def _sql_null_wrap(col, inner_expr): + return f"CASE WHEN {col} IS NULL THEN 'null' ELSE {inner_expr} END" + + +def _sql_string_expr(col): + return _sql_null_wrap( + col, + f"'\"' || replace(CAST({col} AS VARCHAR), '\"', '\\\\\"') || '\"'" + ) + + +def _sql_numeric_expr(col): + return _sql_null_wrap(col, f"CAST({col} AS VARCHAR)") + + +def _sql_float_expr(col): + safe_cast = f"TRY(CAST({col} AS DECIMAL(30,10)))" + return _sql_null_wrap( + col, + f"COALESCE(regexp_replace(CAST({safe_cast} AS VARCHAR), '\\.?0+$', ''), CAST({col} AS VARCHAR))" + ) + + +def _sql_array_varchar_expr(col): + elem = "CASE WHEN x IS NULL THEN 'null' ELSE '\"' || replace(x, '\"', '\\\\\"') || '\"' END" + return _sql_null_wrap(col, f"'[' || array_join(transform({col}, x -> {elem}), ',') || ']'") + + +def _sql_array_integer_expr(col): + elem = "CASE WHEN x IS NULL THEN 'null' ELSE CAST(x AS VARCHAR) END" + return _sql_null_wrap(col, f"'[' || array_join(transform({col}, x -> {elem}), ',') || ']'") + + +def _sql_array_float_expr(col): + safe = "TRY(CAST(x AS DECIMAL(30,10)))" + elem = f"CASE WHEN x IS NULL THEN 'null' ELSE COALESCE(regexp_replace(CAST({safe} AS VARCHAR), '\\.?0+$', ''), CAST(x AS VARCHAR)) END" + return _sql_null_wrap(col, f"'[' || array_join(transform({col}, x -> {elem}), ',') || ']'") + + +def _sql_expr_for_type(col, data_type): + t = data_type.lower() + if t.startswith("array(varchar"): + return _sql_array_varchar_expr(col) + if t.startswith(("array(bigint", "array(integer")): + return _sql_array_integer_expr(col) + if t.startswith(("array(double", "array(real")): + return _sql_array_float_expr(col) + if t.startswith("array("): + raise ValueError(f"Unsupported array column type '{data_type}' for column '{col}'. Flatten before ingestion.") + if t in ("double", "real"): + return _sql_float_expr(col) + # Default: string with quote escaping + return _sql_null_wrap( + col, + f"CASE WHEN TRY(CAST({col} AS DOUBLE)) IS NOT NULL THEN CAST({col} AS VARCHAR) " + f"ELSE '\"' || replace(CAST({col} AS VARCHAR), '\"', '\\\\\"') || '\"' END" + ) + + +def generate_extract_sql(database, table_name, columns, column_types=None, **kwargs): + if isinstance(columns, str): + columns = json.loads(columns) + if isinstance(column_types, str): + column_types = json.loads(column_types) + + if not columns: + raise ValueError(f"No columns found for table {table_name}") + + if column_types and len(column_types) != len(columns): + raise ValueError( + f"column_types length ({len(column_types)}) does not match " + f"columns length ({len(columns)}) for table {table_name}" + ) + + key_column = columns[0] + excluded = {key_column.lower(), "time"} + + json_parts = [] + for i, col in enumerate(columns): + if col.lower() in excluded: + continue + data_type = column_types[i] if column_types else "varchar" + expr = _sql_expr_for_type(col, data_type) + json_parts.append(f" '\"{col}\":' || {expr}") + + payload_expr = ",\n".join(json_parts) + + sql = f"""DROP TABLE IF EXISTS _wf_{table_name}_updated; + +CREATE TABLE _wf_{table_name}_updated AS +SELECT + {key_column}, + payload +FROM ( + SELECT + {key_column}, + '{{' || array_join( + ARRAY[ +{payload_expr} + ], + ',' + ) || '}}' AS payload + FROM {database}.{table_name} +) src +WHERE NOT EXISTS ( + SELECT 1 + FROM _wf_{table_name}_digests dig + WHERE dig._key = src.{key_column} + AND dig.payload_xxhash64 = from_big_endian_64(xxhash64(to_utf8(src.payload))) +) +""" + + digdag.env.store({"extract_sql": sql, "key_column": key_column}) + return {"extract_sql": sql, "key_column": key_column} diff --git a/realtime-box/lookup-catalog-sync/table-discovery/sync_table.dig b/realtime-box/lookup-catalog-sync/table-discovery/sync_table.dig new file mode 100644 index 00000000..2afdb2b2 --- /dev/null +++ b/realtime-box/lookup-catalog-sync/table-discovery/sync_table.dig @@ -0,0 +1,78 @@ +# Reusable single-table sync logic. +# Parameters: +# table_name: Name of the table to sync (set by the calling workflow) + ++get_columns: + td>: + database: ${td.database} + store_last_results: true + query: | + SELECT + array_agg(column_name ORDER BY ordinal_position) AS columns, + array_agg(data_type ORDER BY ordinal_position) AS column_types + FROM information_schema.columns + WHERE table_schema = '${td.database}' + AND table_name = '${table_name}' + ++generate_sql: + py>: scripts.generate_sql.generate_extract_sql + database: ${td.database} + table_name: ${table_name} + columns: ${td.last_results.columns} + column_types: ${td.last_results.column_types} + docker: + image: "digdag/digdag-python:3.10.1" + ++extract_updated: + td>: + database: ${td.database} + query: ${extract_sql} + ++count_updated: + td>: + database: ${td.database} + store_last_results: true + query: | + SELECT COUNT(*) AS row_count FROM _wf_${table_name}_updated + ++upload_if_needed: + if>: ${td.last_results.row_count > 0} + _do: + +log_upload: + echo>: "Uploading ${td.last_results.row_count} changed records for ${table_name}" + + +upload: + td>: + database: ${td.database} + query: | + SELECT ${key_column}, payload FROM _wf_${table_name}_updated + result_url: | + { + "type": "rest", + "method": "POST", + "authorization": "${secret:td.apikey}", + "endpoint": "${reactor_importer_endpoint}/internal/lookup-catalog/bulk-load", + "headers": "{\"x-reactor-instance-name\": \"${reactor_instance}\", \"x-lookup-table-name\": \"${table_name}\"}", + "parallelism": ${parallelism}, + "page_size": ${batch_size} + } + + +update_digests: + td>: + database: ${td.database} + query: | + INSERT INTO _wf_${table_name}_digests (_key, payload_xxhash64, time) + SELECT + ${key_column}, + from_big_endian_64(xxhash64(to_utf8(payload))) AS payload_xxhash64, + CAST(to_unixtime(current_timestamp) AS bigint) AS time + FROM _wf_${table_name}_updated + + _else_do: + +log_no_updates: + echo>: "No changes for ${table_name}" + ++cleanup: + td_ddl>: + drop_tables: + - _wf_${table_name}_updated