Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions realtime-box/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Realtime Box

Workflow templates for **Treasure AI RT 2.0** features.

## Contents

- [Lookup Catalog Sync](./lookup-catalog-sync) — Sync tables from `cdp_lookup_catalog` to RT 2.0 internal storage for use in real-time personalization.
21 changes: 21 additions & 0 deletions realtime-box/lookup-catalog-sync/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Lookup Catalog Sync Workflow

Digdag workflow templates to sync tables from the `cdp_lookup_catalog` database to RT 2.0 internal storage. Only changed records are uploaded on each run (hash-based change detection).

## Variants

| Variant | When to use |
|---------|-------------|
| [manual/](./manual) | Fewer than 5 tables, schemas change infrequently, or you need explicit control over column selection. |
| [table-discovery/](./table-discovery) | 5 or more tables, schemas change frequently, or you prefer zero-maintenance automatic table detection. Requires an additional feature flag — contact Treasure AI Support. |

## Common Requirements

- A `cdp_lookup_catalog` database must exist in Data Workbench with lookup tables already created.
- Each lookup table must have its primary key as the **first column** (unique, non-null, not named `time`, type `string`/`int`/`long`).
- A TD API key must be stored in the workflow project's Secrets as `td.apikey`.
- The `reactor_importer_endpoint` and `reactor_instance` values must be obtained from Treasure AI Support or your Customer Success Manager.

## Documentation

See the [Lookup Catalog Sync Workflow](https://docs.treasuredata.com/products/customer-data-platform/real-time/lookup-catalog-workflow) page in the Treasure AI documentation portal for full setup instructions, scheduling recommendations, and troubleshooting.
23 changes: 23 additions & 0 deletions realtime-box/lookup-catalog-sync/manual/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Lookup Catalog Sync — Manual Configuration

Use this variant when you have a small number of tables and want explicit control over which columns are synced.

## Setup

1. Copy this directory into your TD Workflow project.
2. Edit `lookup_catalog_sync.dig`:
- Set `reactor_importer_endpoint` (obtain from Treasure AI Support).
- Set `reactor_instance` (obtain from Treasure AI Support).
- Define each table under `tables:` with `name`, `key_column`, and `col_expr`.
3. Store your TD API key in Secrets as `td.apikey`.
4. Set a schedule and run once manually to verify the initial upload.

## Files

| File | Description |
|------|-------------|
| `lookup_catalog_sync.dig` | Main workflow — iterates over configured tables |
| `queries/initialize_digest.sql` | Creates digest tracking table if not exists |
| `queries/extract_updated.sql` | Extracts changed records by hash comparison |
| `queries/check_count.sql` | Counts changed records |
| `queries/update_digest.sql` | Updates digest table after upload |
91 changes: 91 additions & 0 deletions realtime-box/lookup-catalog-sync/manual/lookup_catalog_sync.dig
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
_export:
# Obtain these values from Treasure AI Support or your Customer Success Manager
reactor_importer_endpoint: "https://<your-endpoint>"
reactor_instance: "<your-reactor-instance>"

td:
database: cdp_lookup_catalog

# Define each table to sync.
# key_column: primary key column name (must be the first column in the table)
# col_expr: SQL expression for each non-key column to include in the JSON payload
tables:
- name: limited_time_sale
key_column: product_id
col_expr: >-
'"discount_rate":' || json_format(cast(discount_rate AS json)),
'"sale_end":' || json_format(cast(sale_end AS json)),
'"banner_html":' || json_format(cast(banner_html AS json))

# Add more tables as needed:
# - name: user_product_coupon
# key_column: user_product_id
# col_expr: >-
# '"has_coupon":' || json_format(cast(has_coupon AS json)),
# '"coupon_code":' || json_format(cast(coupon_code AS json))

+sync_all_tables:
for_each>:
table_config: ${tables}
_parallel:
limit: 2
_do:
+log_start:
echo>: "Processing ${table_config.name}"

+init_digests:
td>: queries/initialize_digest.sql
database: ${td.database}

+extract_updated:
td>: queries/extract_updated.sql
database: ${td.database}

+check_count:
td>: queries/check_count.sql
database: ${td.database}
store_last_results: true

+upload_if_needed:
if>: ${td.last_results.row_count > 0}
_do:
+log_upload:
echo>: "Uploading ${td.last_results.row_count} changed records for ${table_config.name}"

+upload:
td>:
database: ${td.database}
query: |
SELECT
${table_config.key_column},
payload
FROM _wf_${table_config.name}_updated
result_url: |
{
"type": "rest",
"method": "POST",
"authorization": "${secret:td.apikey}",
"endpoint": "${reactor_importer_endpoint}/internal/lookup-catalog/bulk-load",
"headers": "{\"x-reactor-instance-name\": \"${reactor_instance}\", \"x-lookup-table-name\":\"${table_config.name}\"}",
"parallelism": "10",
"page_size": 1000
}

+update_digests:
td>: queries/update_digest.sql
database: ${td.database}

+rename_digests:
td_ddl>:
rename_tables:
- from: _wf_${table_config.name}_digests_new
to: _wf_${table_config.name}_digests

_else_do:
+log_no_updates:
echo>: "No changes for ${table_config.name}"

+cleanup:
td_ddl>:
drop_tables:
- _wf_${table_config.name}_updated
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
SELECT COUNT(*) AS row_count
FROM _wf_${table_config.name}_updated
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
DROP TABLE IF EXISTS _wf_${table_config.name}_updated;

CREATE TABLE _wf_${table_config.name}_updated AS
WITH src AS (
SELECT
${table_config.key_column},
'{' || array_join(ARRAY[${table_config.col_expr}], ',') || '}' AS payload
FROM cdp_lookup_catalog.${table_config.name}
)
SELECT
src.${table_config.key_column},
src.payload
FROM src
WHERE NOT EXISTS (
SELECT 1
FROM _wf_${table_config.name}_digests dig
WHERE dig.${table_config.key_column} = src.${table_config.key_column}
AND dig.payload_xxhash64 = from_big_endian_64(xxhash64(to_utf8(src.payload)))
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS _wf_${table_config.name}_digests AS
SELECT
CAST(NULL AS VARCHAR) AS ${table_config.key_column},
CAST(NULL AS BIGINT) AS payload_xxhash64
LIMIT 0
12 changes: 12 additions & 0 deletions realtime-box/lookup-catalog-sync/manual/queries/update_digest.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
DROP TABLE IF EXISTS _wf_${table_config.name}_digests_new;

CREATE TABLE _wf_${table_config.name}_digests_new AS
SELECT
${table_config.key_column},
COALESCE(
from_big_endian_64(xxhash64(to_utf8(upd.payload))),
dig.payload_xxhash64
) AS payload_xxhash64
FROM _wf_${table_config.name}_updated upd
FULL OUTER JOIN _wf_${table_config.name}_digests dig
USING (${table_config.key_column})
23 changes: 23 additions & 0 deletions realtime-box/lookup-catalog-sync/table-discovery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Lookup Catalog Sync — Table Discovery

Use this variant when you manage many tables or want the workflow to automatically detect new tables added to `cdp_lookup_catalog`.

**Requires an additional feature flag.** Contact Treasure AI Support to enable it before using this variant.

## Setup

1. Copy this directory into your TD Workflow project.
2. Edit `lookup_catalog_sync.dig`:
- Set `reactor_importer_endpoint` (obtain from Treasure AI Support).
- Set `reactor_instance` (obtain from Treasure AI Support).
3. Store your TD API key in Secrets as `td.apikey`.
4. Set a schedule and run once manually to verify the initial upload.

## Files

| File | Description |
|------|-------------|
| `lookup_catalog_sync.dig` | Main workflow — discovers and syncs all tables automatically |
| `sync_table.dig` | Reusable single-table sync logic called by the main workflow |
| `scripts/generate_sql.py` | Python script that generates type-aware extract SQL per table |
| `queries/discover_tables.sql` | Discovers eligible tables, excludes `_wf_*` internal tables |
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
_export:
# Obtain these values from Treasure AI Support or your Customer Success Manager
reactor_importer_endpoint: "https://<your-endpoint>"
reactor_instance: "<your-reactor-instance>"

td:
database: cdp_lookup_catalog

# Leave empty to sync all discovered tables.
# Set to a specific table name to sync only that table (useful for testing).
p_table_name: ""

batch_size: 1000
parallelism: 10

+main:
+init_digest_tables:
td_for_each>: queries/discover_tables.sql
_do:
+create_digest:
td>:
database: ${td.database}
query: |
CREATE TABLE IF NOT EXISTS _wf_${td.each.table_name}_digests (
_key varchar,
payload_xxhash64 bigint,
time bigint
)

+sync_tables:
if>: ${p_table_name != ""}
_do:
+sync_single:
call>: sync_table.dig
_export:
table_name: ${p_table_name}
_else_do:
+sync_all:
td_for_each>: queries/discover_tables.sql
_do:
+sync_each:
call>: sync_table.dig
_export:
table_name: ${td.each.table_name}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Discover all eligible tables in cdp_lookup_catalog.
-- Excludes internal workflow tables (_wf_* prefix).
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '${td.database}'
AND table_name NOT LIKE '_wf_%'
AND table_type = 'BASE TABLE'
ORDER BY table_name
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Generates type-aware extract SQL for Lookup Catalog sync.

Each column is serialized to JSON with type-specific handling:
- array(varchar): JSON array of strings with NULL element preservation
- array(bigint/integer): JSON array of integers
- array(double/real): JSON array of clean decimals (avoids floating-point artifacts)
- double/real (scalar): clean decimal (avoids floating-point artifacts)
- Other unsupported array types: raises ValueError
- All other types: string with quote escaping, NULL-safe

The key column (first column) and the 'time' column are excluded from the payload.
"""

import json
import digdag


def _sql_null_wrap(col, inner_expr):
return f"CASE WHEN {col} IS NULL THEN 'null' ELSE {inner_expr} END"


def _sql_string_expr(col):
return _sql_null_wrap(
col,
f"'\"' || replace(CAST({col} AS VARCHAR), '\"', '\\\\\"') || '\"'"
)


def _sql_numeric_expr(col):
return _sql_null_wrap(col, f"CAST({col} AS VARCHAR)")


def _sql_float_expr(col):
safe_cast = f"TRY(CAST({col} AS DECIMAL(30,10)))"
return _sql_null_wrap(
col,
f"COALESCE(regexp_replace(CAST({safe_cast} AS VARCHAR), '\\.?0+$', ''), CAST({col} AS VARCHAR))"
)


def _sql_array_varchar_expr(col):
elem = "CASE WHEN x IS NULL THEN 'null' ELSE '\"' || replace(x, '\"', '\\\\\"') || '\"' END"
return _sql_null_wrap(col, f"'[' || array_join(transform({col}, x -> {elem}), ',') || ']'")


def _sql_array_integer_expr(col):
elem = "CASE WHEN x IS NULL THEN 'null' ELSE CAST(x AS VARCHAR) END"
return _sql_null_wrap(col, f"'[' || array_join(transform({col}, x -> {elem}), ',') || ']'")


def _sql_array_float_expr(col):
safe = "TRY(CAST(x AS DECIMAL(30,10)))"
elem = f"CASE WHEN x IS NULL THEN 'null' ELSE COALESCE(regexp_replace(CAST({safe} AS VARCHAR), '\\.?0+$', ''), CAST(x AS VARCHAR)) END"
return _sql_null_wrap(col, f"'[' || array_join(transform({col}, x -> {elem}), ',') || ']'")


def _sql_expr_for_type(col, data_type):
t = data_type.lower()
if t.startswith("array(varchar"):
return _sql_array_varchar_expr(col)
if t.startswith(("array(bigint", "array(integer")):
return _sql_array_integer_expr(col)
if t.startswith(("array(double", "array(real")):
return _sql_array_float_expr(col)
if t.startswith("array("):
raise ValueError(f"Unsupported array column type '{data_type}' for column '{col}'. Flatten before ingestion.")
if t in ("double", "real"):
return _sql_float_expr(col)
# Default: string with quote escaping
return _sql_null_wrap(
col,
f"CASE WHEN TRY(CAST({col} AS DOUBLE)) IS NOT NULL THEN CAST({col} AS VARCHAR) "
f"ELSE '\"' || replace(CAST({col} AS VARCHAR), '\"', '\\\\\"') || '\"' END"
)


def generate_extract_sql(database, table_name, columns, column_types=None, **kwargs):
if isinstance(columns, str):
columns = json.loads(columns)
if isinstance(column_types, str):
column_types = json.loads(column_types)

if not columns:
raise ValueError(f"No columns found for table {table_name}")

if column_types and len(column_types) != len(columns):
raise ValueError(
f"column_types length ({len(column_types)}) does not match "
f"columns length ({len(columns)}) for table {table_name}"
)

key_column = columns[0]
excluded = {key_column.lower(), "time"}

json_parts = []
for i, col in enumerate(columns):
if col.lower() in excluded:
continue
data_type = column_types[i] if column_types else "varchar"
expr = _sql_expr_for_type(col, data_type)
json_parts.append(f" '\"{col}\":' || {expr}")

payload_expr = ",\n".join(json_parts)

sql = f"""DROP TABLE IF EXISTS _wf_{table_name}_updated;

CREATE TABLE _wf_{table_name}_updated AS
SELECT
{key_column},
payload
FROM (
SELECT
{key_column},
'{{' || array_join(
ARRAY[
{payload_expr}
],
','
) || '}}' AS payload
FROM {database}.{table_name}
) src
WHERE NOT EXISTS (
SELECT 1
FROM _wf_{table_name}_digests dig
WHERE dig._key = src.{key_column}
AND dig.payload_xxhash64 = from_big_endian_64(xxhash64(to_utf8(src.payload)))
)
"""

digdag.env.store({"extract_sql": sql, "key_column": key_column})
return {"extract_sql": sql, "key_column": key_column}
Loading