From 184c0c2764cf8937345e7c8f7a8692267050b03f Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Sun, 24 May 2026 21:12:02 +0000 Subject: [PATCH 1/3] skills(pipelines): port DLT migration guide from a-d-k MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 of d-a-s #73's TODO #5 — port a-d-k's databricks-spark-declarative-pipelines content into stable skills/databricks-pipelines/. Adds references/dlt-migration.md covering both migration paths (DLT Python → SDP Python via the modern pyspark.pipelines API, and DLT Python → SDP SQL) with side-by-side conversions for decorators, reads, expectations, CDC/SCD, and partitioning → liquid clustering. Source clean — no MCP-tool refs to strip, no docs.databricks.com URLs to rewrite. SKILL.md updates: - bump version to 0.2.0 - new "Migrating from DLT" section pointing at the reference Subsequent phases (separate commits) port the remaining a-d-k content: workflow A/B/C decision matrix (project initialization), per-language performance reference, language-selection rules. Co-authored-by: Isaac --- manifest.json | 57 +-- skills/databricks-pipelines/SKILL.md | 6 +- .../references/dlt-migration.md | 447 ++++++++++++++++++ 3 files changed, 481 insertions(+), 29 deletions(-) create mode 100644 skills/databricks-pipelines/references/dlt-migration.md diff --git a/manifest.json b/manifest.json index f0fe8fe..2489156 100644 --- a/manifest.json +++ b/manifest.json @@ -1,12 +1,12 @@ { "version": "2", - "updated_at": "2026-05-22T20:18:49Z", + "updated_at": "2026-05-24T21:11:52Z", "skills": { "databricks-apps": { "version": "0.1.2", "description": "Databricks Apps development and deployment (evaluates analytics vs synced tables data access)", "repo_dir": "skills", - "updated_at": "2026-05-22T15:54:04Z", + "updated_at": "2026-05-24T21:11:01Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -33,7 +33,7 @@ "version": "0.1.0", "description": "Core Databricks skill for CLI, auth, and data exploration", "repo_dir": "skills", - "updated_at": "2026-05-15T09:44:24Z", + "updated_at": "2026-05-24T21:11:01Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -48,7 +48,7 @@ "version": "0.0.1", "description": "Declarative Automation Bundles (DABs) for deploying and managing Databricks resources", "repo_dir": "skills", - "updated_at": "2026-05-12T15:39:50Z", + "updated_at": "2026-05-24T21:11:01Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -66,7 +66,7 @@ "version": "0.2.0", "description": "Develop and deploy Lakeflow Jobs on Databricks via DABs, Python SDK, or the CLI \u2014 covers all task types, triggers, notifications, and worked examples", "repo_dir": "skills", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-24T21:11:01Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -82,7 +82,7 @@ "version": "0.1.0", "description": "Databricks Lakebase Postgres: projects, scaling, connectivity, synced tables, and Data API", "repo_dir": "skills", - "updated_at": "2026-05-22T15:54:04Z", + "updated_at": "2026-05-24T21:11:01Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -101,7 +101,7 @@ "version": "0.1.0", "description": "Databricks Model Serving endpoint management", "repo_dir": "skills", - "updated_at": "2026-05-22T15:54:04Z", + "updated_at": "2026-05-24T21:11:01Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -111,10 +111,10 @@ ] }, "databricks-pipelines": { - "version": "0.1.0", + "version": "0.2.0", "description": "Databricks Spark Declarative Pipelines (SDP) for ETL and streaming", "repo_dir": "skills", - "updated_at": "2026-05-12T15:39:50Z", + "updated_at": "2026-05-24T21:11:46Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -126,6 +126,7 @@ "references/auto-loader-python.md", "references/auto-loader-sql.md", "references/auto-loader.md", + "references/dlt-migration.md", "references/expectations-python.md", "references/expectations-sql.md", "references/expectations.md", @@ -161,7 +162,7 @@ "version": "0.1.0", "description": "Migrate Databricks workloads from classic compute to serverless compute, including compatibility checks and concrete fixes", "repo_dir": "skills", - "updated_at": "2026-05-12T15:39:50Z", + "updated_at": "2026-05-24T21:11:01Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -178,7 +179,7 @@ "version": "0.0.1", "description": "Create Agent Bricks: Knowledge Assistants (KA) for document Q&A and Supervisor Agents for multi-agent orchestration (MAS).", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:18:49Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "1-knowledge-assistants.md", "2-supervisor-agents.md", @@ -192,7 +193,7 @@ "version": "0.0.1", "description": "Use Databricks built-in AI Functions (ai_classify, ai_extract, ai_summarize, ai_mask, ai_translate, ai_fix_grammar, ai_gen, ai_analyze_sentiment, ai_similarity, ai_parse_document, ai_query, ai_forecast) to add AI capabilities directly to SQL and PySpark pipelines without managing model endpoints. Also covers document parsing and building custom RAG pipelines (parse \u2192 chunk \u2192 index \u2192 query).", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "1-task-functions.md", "2-ai-query.md", @@ -208,7 +209,7 @@ "version": "0.0.1", "description": "Create Databricks AI/BI dashboards. Must use when creating, updating, or deploying Lakeview dashboards as Databricks Dashboard have a unique json structure. CRITICAL: You MUST test ALL SQL queries via CLI BEFORE deploying. Follow guidelines strictly.", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "1-widget-specifications.md", "2-advanced-widget-specifications.md", @@ -225,7 +226,7 @@ "version": "0.0.1", "description": "Builds Databricks applications. Prefers AppKit (TypeScript + React SDK) for new apps; falls back to Python frameworks (Dash, Streamlit, Gradio, Flask, FastAPI, Reflex) when Python is required. Handles OAuth authorization, app resources, SQL warehouse and Lakebase connectivity, model serving, foundation model APIs, and deployment. Use when building web apps, dashboards, ML demos, or REST APIs for Databricks, or when the user mentions AppKit, Streamlit, Dash, Gradio, Flask, FastAPI, Reflex, or Databricks app.", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "1-authorization.md", "2-app-resources.md", @@ -247,7 +248,7 @@ "version": "0.0.1", "description": "Databricks SQL (DBSQL) advanced features and SQL warehouse capabilities. This skill MUST be invoked when the user mentions: \"DBSQL\", \"Databricks SQL\", \"SQL warehouse\", \"SQL scripting\", \"stored procedure\", \"CALL procedure\", \"materialized view\", \"CREATE MATERIALIZED VIEW\", \"pipe syntax\", \"|>\", \"geospatial\", \"H3\", \"ST_\", \"spatial SQL\", \"collation\", \"COLLATE\", \"ai_query\", \"ai_classify\", \"ai_extract\", \"ai_gen\", \"AI function\", \"http_request\", \"remote_query\", \"read_files\", \"Lakehouse Federation\", \"recursive CTE\", \"WITH RECURSIVE\", \"multi-statement transaction\", \"temp table\", \"temporary view\", \"pipe operator\". SHOULD also invoke when the user asks about SQL best practices, data modeling patterns, or advanced SQL features on Databricks.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -264,7 +265,7 @@ "version": "0.0.1", "description": "Databricks documentation reference via llms.txt index. Use when other skills do not cover a topic, looking up unfamiliar Databricks features, or needing authoritative docs on APIs, configurations, or platform capabilities.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -276,7 +277,7 @@ "version": "0.0.1", "description": "Execute code and manage compute on Databricks. Use this skill when the user mentions: \"run code\", \"execute\", \"run on databricks\", \"serverless\", \"no cluster\", \"run python\", \"run scala\", \"run sql\", \"run R\", \"run file\", \"push and run\", \"notebook run\", \"batch script\", \"model training\", \"run script on cluster\", \"create cluster\", \"new cluster\", \"resize cluster\", \"modify cluster\", \"delete cluster\", \"terminate cluster\", \"create warehouse\", \"new warehouse\", \"resize warehouse\", \"delete warehouse\", \"node types\", \"runtime versions\", \"DBR versions\", \"spin up compute\", \"provision cluster\".", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:57:09Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -292,7 +293,7 @@ "version": "0.0.1", "description": "Apache Iceberg tables on Databricks \u2014 Managed Iceberg tables, External Iceberg Reads (fka Uniform), Compatibility Mode, Iceberg REST Catalog (IRC), Iceberg v3, Snowflake interop, PyIceberg, OSS Spark, external engine access and credential vending. Use when creating Iceberg tables, enabling External Iceberg Reads (uniform) on Delta tables (including Streaming Tables and Materialized Views via compatibility mode), configuring external engines to read Databricks tables via Unity Catalog IRC, integrating with Snowflake catalog to read Foreign Iceberg tables", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "1-managed-iceberg-tables.md", "2-uniform-and-compatibility.md", @@ -309,7 +310,7 @@ "version": "0.0.1", "description": "Unity Catalog metric views: define, create, query, and manage governed business metrics in YAML. Use when building standardized KPIs, revenue metrics, order analytics, or any reusable business metrics that need consistent definitions across teams and tools.", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -323,7 +324,7 @@ "version": "0.0.1", "description": "MLflow 3 GenAI agent evaluation. Use when writing mlflow.genai.evaluate() code, creating @scorer functions, using built-in scorers (Guidelines, Correctness, Safety, RetrievalGroundedness), building eval datasets from traces, setting up trace ingestion and production monitoring, aligning judges with MemAlign from domain expert feedback, or running optimize_prompts() with GEPA for automated prompt improvement.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:56:43Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -346,7 +347,7 @@ "version": "0.0.1", "description": "Databricks development guidance including Python SDK, Databricks Connect, CLI, and REST API. Use when working with databricks-sdk, databricks-connect, or Databricks APIs.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -364,7 +365,7 @@ "version": "0.0.1", "description": "Comprehensive guide to Spark Structured Streaming for production workloads. Use when building streaming pipelines, working with Kafka ingestion, implementing Real-Time Mode (RTM), configuring triggers (processingTime, availableNow), handling stateful operations with watermarks, optimizing checkpoints, performing stream-stream or stream-static joins, writing to multiple sinks, or tuning streaming cost and performance.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -385,7 +386,7 @@ "version": "0.0.1", "description": "Generate realistic synthetic data using Spark + Faker (strongly recommended). Supports serverless execution, multiple output formats (Parquet/JSON/CSV/Delta), and scales from thousands to millions of rows. For small datasets (<10K rows), can optionally generate locally and upload to volumes. Use when user mentions 'synthetic data', 'test data', 'generate data', 'demo dataset', 'Faker', or 'sample data'.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -400,7 +401,7 @@ "version": "0.0.1", "description": "Unity Catalog system tables and volumes. Use when querying system tables (audit, lineage, billing) or working with volume file operations (upload, download, list files in /Volumes/).", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "5-system-tables.md", "6-volumes.md", @@ -415,7 +416,7 @@ "version": "0.0.1", "description": "Generate PDF documents from HTML and upload to Unity Catalog volumes. Use for creating test PDFs, demo documents, reports, or evaluation datasets.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:56:43Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -428,7 +429,7 @@ "version": "0.0.1", "description": "Patterns for Databricks Vector Search: create endpoints and indexes, query with filters, manage embeddings. Use when building RAG applications, semantic search, or similarity matching. Covers both storage-optimized and standard endpoints.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -444,7 +445,7 @@ "version": "0.0.1", "description": "Build Zerobus Ingest clients for near real-time data ingestion into Databricks Delta tables via gRPC. Use when creating producers that write directly to Unity Catalog tables without a message bus, working with the Zerobus Ingest SDK in Python/Java/Go/TypeScript/Rust, generating Protobuf schemas from UC tables, or implementing stream-based ingestion with ACK handling and retry logic.", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-24T21:11:00Z", "files": [ "1-setup-and-authentication.md", "2-python-client.md", @@ -461,7 +462,7 @@ "version": "0.0.1", "description": "Build custom Python data sources for Apache Spark using the PySpark DataSource API \u2014 batch and streaming readers/writers for external systems. Use this skill whenever someone wants to connect Spark to an external system (database, API, message queue, custom protocol), build a Spark connector or plugin in Python, implement a DataSourceReader or DataSourceWriter, pull data from or push data to a system via Spark, or work with the PySpark DataSource API in any way. Even if they just say \"read from X in Spark\" or \"write DataFrame to Y\" and there's no native connector, this skill applies.", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-24T21:11:01Z", "files": [ "SKILL.md", "agents/openai.yaml", diff --git a/skills/databricks-pipelines/SKILL.md b/skills/databricks-pipelines/SKILL.md index d08d0b1..c281e4a 100644 --- a/skills/databricks-pipelines/SKILL.md +++ b/skills/databricks-pipelines/SKILL.md @@ -3,7 +3,7 @@ name: databricks-pipelines description: Develop Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables) on Databricks. Use when building batch or streaming data pipelines with Python or SQL. Invoke BEFORE starting implementation. compatibility: Requires databricks CLI (>= v0.292.0) metadata: - version: "0.1.0" + version: "0.2.0" parent: databricks-core --- @@ -168,6 +168,10 @@ Some features require reading multiple skills together: Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables / DLT) is a framework for building batch and streaming data pipelines. +## Migrating from DLT + +If you have an existing DLT pipeline (`import dlt`, `@dlt.table`, `dlt.read(...)`, `dlt.apply_changes(...)`) and want to move to SDP, see [references/dlt-migration.md](references/dlt-migration.md). It covers both migration paths — DLT Python → SDP Python (`from pyspark import pipelines as dp`) and DLT Python → SDP SQL — with side-by-side conversions for the table decorators, reads, expectations, CDC/SCD, and partitioning → liquid clustering. + ## Scaffolding a New Pipeline Project Use `databricks bundle init` with a config file to scaffold non-interactively. This creates a project in the `/` directory: diff --git a/skills/databricks-pipelines/references/dlt-migration.md b/skills/databricks-pipelines/references/dlt-migration.md new file mode 100644 index 0000000..dbde0d9 --- /dev/null +++ b/skills/databricks-pipelines/references/dlt-migration.md @@ -0,0 +1,447 @@ +# Migration Guide: DLT to SDP + +Guide for migrating from Delta Live Tables (DLT) to Spark Declarative Pipelines (SDP). + +**Two migration paths:** +1. **DLT Python → SDP Python** (dlt → dp): Same language, new API +2. **DLT Python → SDP SQL**: Change language for simpler pipelines + +--- + +## Migration Path 1: DLT Python → SDP Python (dlt → dp) + +Use this when staying with Python but moving to the modern `pyspark.pipelines` API. + +### Quick Reference + +| Aspect | Legacy (`dlt`) | Modern (`dp`) | +|--------|---------------|----------------| +| **Import** | `import dlt` | `from pyspark import pipelines as dp` | +| **Table decorator** | `@dlt.table()` | `@dp.table()` | +| **Read table** | `dlt.read("table")` | `spark.read.table("table")` | +| **Read stream** | `dlt.read_stream("table")` | `spark.readStream.table("table")` | +| **CDC/SCD** | `dlt.apply_changes()` | `dp.create_auto_cdc_flow()` | +| **Clustering** | `partition_cols=["date"]` | `cluster_by=["date", "col2"]` | + +### Step-by-Step Migration + +#### Step 1: Update Imports + +```python +# Before +import dlt + +# After +from pyspark import pipelines as dp +``` + +#### Step 2: Update Decorators + +```python +# Before +@dlt.table(name="my_table") + +# After +@dp.table(name="my_table") +``` + +#### Step 3: Update Table Reads + +```python +# Before +@dlt.table(name="silver_events") +def silver_events(): + return dlt.read("bronze_events").filter(...) + +# After +@dp.table(name="silver_events") +def silver_events(): + return spark.read.table("bronze_events").filter(...) +``` + +```python +# Before (streaming) +@dlt.table(name="silver_events") +def silver_events(): + return dlt.read_stream("bronze_events").filter(...) + +# After (streaming) +@dp.table(name="silver_events") +def silver_events(): + return spark.readStream.table("bronze_events").filter(...) +``` + +#### Step 4: Update Expectations + +```python +# Before +@dlt.table(name="silver") +@dlt.expect_or_drop("valid_id", "id IS NOT NULL") + +# After (identical syntax, just change dlt → dp) +@dp.table(name="silver") +@dp.expect_or_drop("valid_id", "id IS NOT NULL") +``` + +#### Step 5: Update CDC/SCD Operations + +```python +# Before +dlt.create_streaming_table("customers_history") +dlt.apply_changes( + target="customers_history", + source="customers_cdc", + keys=["customer_id"], + sequence_by="event_timestamp", + stored_as_scd_type="2" +) + +# After +from pyspark.sql.functions import col + +dp.create_streaming_table("customers_history") +dp.create_auto_cdc_flow( + target="customers_history", + source="customers_cdc", + keys=["customer_id"], + sequence_by=col("event_timestamp"), # Note: use col() + stored_as_scd_type=2 # Note: integer, not string +) +``` + +**Key differences:** +- `apply_changes()` → `create_auto_cdc_flow()` +- `sequence_by` takes a Column object (`col("...")`) not a string +- `stored_as_scd_type` is integer `2` for Type 2, string `"1"` for Type 1 + +#### Step 6: Update Clustering (Partitioning → Liquid Clustering) + +```python +# Before (legacy partitioning) +@dlt.table( + name="bronze_events", + partition_cols=["event_date"], + table_properties={"pipelines.autoOptimize.zOrderCols": "event_type"} +) + +# After (Liquid Clustering) +@dp.table( + name="bronze_events", + cluster_by=["event_date", "event_type"] +) +``` + +### Complete Before/After Example + +**Before (DLT):** +```python +import dlt +from pyspark.sql import functions as F + +@dlt.table(name="bronze_orders", partition_cols=["order_date"]) +def bronze_orders(): + return spark.readStream.format("cloudFiles").load("/data/orders") + +@dlt.table(name="silver_orders") +@dlt.expect_or_drop("valid_amount", "amount > 0") +def silver_orders(): + return dlt.read_stream("bronze_orders").filter(F.col("status") == "completed") + +dlt.create_streaming_table("dim_customers") +dlt.apply_changes( + target="dim_customers", + source="customers_cdc", + keys=["customer_id"], + sequence_by="updated_at", + stored_as_scd_type="2" +) +``` + +**After (SDP):** +```python +from pyspark import pipelines as dp +from pyspark.sql import functions as F + +@dp.table(name="bronze_orders", cluster_by=["order_date"]) +def bronze_orders(): + return spark.readStream.format("cloudFiles").load("/data/orders") + +@dp.table(name="silver_orders") +@dp.expect_or_drop("valid_amount", "amount > 0") +def silver_orders(): + return spark.readStream.table("bronze_orders").filter(F.col("status") == "completed") + +dp.create_streaming_table("dim_customers") +dp.create_auto_cdc_flow( + target="dim_customers", + source="customers_cdc", + keys=["customer_id"], + sequence_by=F.col("updated_at"), + stored_as_scd_type=2 +) +``` + +--- + +## Migration Path 2: DLT Python → SDP SQL + +Use this when simplifying pipelines by converting to SQL. + +### Decision Matrix + +| Feature/Pattern | DLT Python | SDP SQL | Recommendation | +|-----------------|------------|---------|----------------| +| Simple transformations | ✓ | ✓ | **Migrate to SQL** | +| Aggregations | ✓ | ✓ | **Migrate to SQL** | +| Filtering, WHERE clauses | ✓ | ✓ | **Migrate to SQL** | +| CASE expressions | ✓ | ✓ | **Migrate to SQL** | +| SCD Type 1/2 | ✓ | ✓ | **Migrate to SQL** (AUTO CDC) | +| Simple joins | ✓ | ✓ | **Migrate to SQL** | +| Auto Loader | ✓ | ✓ | **Migrate to SQL** (read_files) | +| Streaming sources (Kafka) | ✓ | ✓ | **Migrate to SQL** (read_kafka) | +| Complex Python UDFs | ✓ | ❌ | **Stay in Python** | +| External API calls | ✓ | ❌ | **Stay in Python** | +| Custom libraries | ✓ | ❌ | **Stay in Python** | +| ML model inference | ✓ | ❌ | **Stay in Python** | + +**Rule**: If 80%+ is SQL-expressible, migrate to SDP SQL. If heavy Python logic, stay with Python (use modern `dp` API). + +### Side-by-Side Conversions + +#### Basic Streaming Table + +**DLT Python:** +```python +@dlt.table(name="bronze_sales", comment="Raw sales") +def bronze_sales(): + return ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "json") + .load("/Volumes/my_catalog/my_schema/raw/sales") + .withColumn("_ingested_at", F.current_timestamp()) + ) +``` + +**SDP SQL:** +```sql +CREATE OR REFRESH STREAMING TABLE bronze_sales +COMMENT 'Raw sales' +AS +SELECT *, current_timestamp() AS _ingested_at +FROM STREAM read_files('/Volumes/my_catalog/my_schema/raw/sales', format => 'json'); +``` + +#### Filtering and Transformations + +**DLT Python:** +```python +@dlt.table(name="silver_sales") +@dlt.expect_or_drop("valid_amount", "amount > 0") +@dlt.expect_or_drop("valid_sale_id", "sale_id IS NOT NULL") +def silver_sales(): + return ( + dlt.read_stream("bronze_sales") + .withColumn("sale_date", F.to_date("sale_date")) + .withColumn("amount", F.col("amount").cast("decimal(10,2)")) + .select("sale_id", "customer_id", "amount", "sale_date") + ) +``` + +**SDP SQL:** +```sql +CREATE OR REFRESH STREAMING TABLE silver_sales AS +SELECT + sale_id, customer_id, + CAST(amount AS DECIMAL(10,2)) AS amount, + CAST(sale_date AS DATE) AS sale_date +FROM STREAM bronze_sales +WHERE amount > 0 AND sale_id IS NOT NULL; +``` + +#### SCD Type 2 + +**DLT Python:** +```python +dlt.create_streaming_table("customers_history") + +dlt.apply_changes( + target="customers_history", + source="customers_cdc_clean", + keys=["customer_id"], + sequence_by="event_timestamp", + stored_as_scd_type="2", + track_history_column_list=["*"] +) +``` + +**SDP SQL:** +```sql +CREATE OR REFRESH STREAMING TABLE customers_history; + +CREATE FLOW customers_scd2_flow AS +AUTO CDC INTO customers_history +FROM stream(customers_cdc_clean) +KEYS (customer_id) +APPLY AS DELETE WHEN operation = "DELETE" +SEQUENCE BY event_timestamp +COLUMNS * EXCEPT (operation, _ingested_at, _source_file) +STORED AS SCD TYPE 2; +``` + +**Note:** In SQL, put `APPLY AS DELETE WHEN` before `SEQUENCE BY`. Only list columns in `COLUMNS * EXCEPT (...)` that exist in the source. + +#### Joins + +**DLT Python:** +```python +@dlt.table(name="silver_sales_enriched") +def silver_sales_enriched(): + sales = dlt.read_stream("silver_sales") + products = dlt.read("dim_products") + return sales.join(products, "product_id", "left") +``` + +**SDP SQL:** +```sql +CREATE OR REFRESH STREAMING TABLE silver_sales_enriched AS +SELECT s.*, p.product_name, p.category +FROM STREAM silver_sales s +LEFT JOIN dim_products p ON s.product_id = p.product_id; +``` + +### Handling Expectations + +**DLT Python:** +```python +@dlt.expect_or_drop("valid_amount", "amount > 0") +@dlt.expect_or_fail("critical_id", "id IS NOT NULL") +``` + +**SDP SQL - Basic** (equivalent to expect_or_drop): +```sql +WHERE amount > 0 AND id IS NOT NULL +``` + +**SDP SQL - Quarantine Pattern** (for auditing dropped records): +```sql +-- Flag invalid records +CREATE OR REFRESH STREAMING TABLE bronze_data_flagged AS +SELECT *, + CASE WHEN amount <= 0 OR id IS NULL THEN TRUE ELSE FALSE END AS is_invalid +FROM STREAM bronze_data; + +-- Clean for downstream +CREATE OR REFRESH STREAMING TABLE silver_data_clean AS +SELECT * FROM STREAM bronze_data_flagged WHERE NOT is_invalid; + +-- Quarantine for investigation +CREATE OR REFRESH STREAMING TABLE silver_data_quarantine AS +SELECT * FROM STREAM bronze_data_flagged WHERE is_invalid; +``` + +### Handling UDFs + +#### Simple UDFs → SQL CASE + +**DLT Python:** +```python +@F.udf(returnType=StringType()) +def categorize_amount(amount): + if amount > 1000: return "High" + elif amount > 100: return "Medium" + else: return "Low" + +@dlt.table(name="sales_categorized") +def sales_categorized(): + return dlt.read("sales").withColumn("category", categorize_amount(F.col("amount"))) +``` + +**SDP SQL:** +```sql +CREATE OR REFRESH MATERIALIZED VIEW sales_categorized AS +SELECT *, + CASE + WHEN amount > 1000 THEN 'High' + WHEN amount > 100 THEN 'Medium' + ELSE 'Low' + END AS category +FROM sales; +``` + +#### Complex UDFs → Stay in Python + +Keep in Python if: +- Complex conditional logic +- External API calls +- Custom algorithms +- ML inference + +Use modern `dp` API instead of `dlt`. + +--- + +## Migration Process + +### Step 1: Inventory + +Document: +- Number of tables/views +- Python UDFs (simple vs complex) +- External dependencies +- Expectations and quality rules + +### Step 2: Choose Path + +- **80%+ SQL-expressible** → Migrate to SDP SQL +- **Heavy Python logic** → Migrate to SDP Python (`dp` API) +- **Mixed** → Hybrid (SQL for most, Python for complex) + +### Step 3: Migrate by Layer + +1. **Bronze** (ingestion): `cloudFiles` → `read_files()` or keep `cloudFiles` with `dp` +2. **Silver** (cleansing): `dlt.expect*` → WHERE clause or `dp.expect*` +3. **Gold** (aggregations): Usually straightforward +4. **SCD/CDC**: `apply_changes` → AUTO CDC or `create_auto_cdc_flow` + +### Step 4: Test + +- Run both pipelines in parallel +- Compare outputs for correctness +- Validate performance +- Check quality metrics + +--- + +## When NOT to Migrate + +**Stay with current approach if:** +1. Pipeline works well and team is comfortable +2. Heavy Python UDF usage (>30% of logic) +3. External API calls required +4. Custom ML model inference +5. Complex stateful operations not expressible in SQL +6. Limited time/resources for migration + +**Key**: DLT and SDP are both fully supported. Migrate for simplicity or new features, not necessity. + +--- + +## Common Issues + +| Issue | Solution | +|-------|----------| +| `sequence_by` type error | Use `col("column")` not string in `dp.create_auto_cdc_flow()` | +| UDF doesn't translate | Keep in Python or refactor with SQL built-ins | +| Expectations differ | Use quarantine pattern to audit dropped records | +| Performance degradation | Use `CLUSTER BY` for Liquid Clustering | +| Schema evolution different | Use `mode => 'PERMISSIVE'` in `read_files()` | +| AUTO CDC parse error | Put `APPLY AS DELETE WHEN` before `SEQUENCE BY` | + +--- + +## Related Documentation + +- **[python/1-syntax-basics.md](python/1-syntax-basics.md)** - Modern `dp` API reference +- **[python/4-cdc-patterns.md](python/4-cdc-patterns.md)** - Python CDC patterns +- **[sql/4-cdc-patterns.md](sql/4-cdc-patterns.md)** - SQL CDC patterns +- **[SKILL.md](../SKILL.md)** - Main skill entry point From 532b37cfaecd3f8eb74fbbf471af2569cf623d75 Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Tue, 26 May 2026 09:38:09 +0000 Subject: [PATCH 2/3] skills(pipelines): port workflows, configuration, performance from a-d-k MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2 of the a-d-k → d-a-s port for databricks-spark-declarative-pipelines. Adds three new references that fill the dev-side gaps that stable's per-feature × per-language reference files don't cover: - references/workflows.md — Workflow A/B/C chooser (standalone bundle via `databricks pipelines init`, pipeline-in-existing-bundle, rapid CLI iteration with no bundle); language selection rules; start-update + poll-the-update pattern with the "never poll top-level pipeline state" rationale; edit/ re-upload/restart flow. - references/pipeline-configuration.md — Full JSON config reference for `pipelines create|update` (top-level fields, clusters, event_log, notifications, configuration, run_as, restart_window, environment, deployment); variant snippets (dev mode, non-serverless, continuous, notifications, autoscaling, custom event log, serverless Python deps); multi-schema patterns; platform constraints. - references/performance.md — Liquid Clustering with per-layer key guidance (bronze/silver/gold), cluster-key type rules, table properties, state management strategies for streaming, join optimization, query optimization, pre-aggregation, compute config, monitoring. SKILL.md updates: - New "Choose Your Workflow" and "Language Selection" sections. - Scaffolding section documents both `databricks pipelines init` and `databricks bundle init lakeflow-pipelines`. - Pipeline API Reference list reorganized into Project & Lifecycle and Datasets, Flows & Quality groups. - Version bumped to 0.3.0. Deliberately dropped from a-d-k's databricks-spark-declarative-pipelines: - 2-mcp-approach.md (a-d-k experimental already replaced with 2-cli-approach.md — MCP tool refs removed per PR #73 policy). - python/{1..4}-*.md and sql/{1..4}-*.md (covered by stable's existing per- feature × per-language refs: python-basics, sql-basics, auto-loader-*, auto-cdc-*, streaming-table-*, sink-*, foreach-batch-sink-*, etc.). - scripts/exploration_notebook.py (stable convention has no scripts/; users use the CLI directly or the explorations/ folder generated by `pipelines init`). Source: databricks-solutions/ai-dev-kit@experimental. Co-authored-by: Isaac --- manifest.json | 9 +- skills/databricks-pipelines/SKILL.md | 61 ++- .../references/performance.md | 490 ++++++++++++++++++ .../references/pipeline-configuration.md | 351 +++++++++++++ .../references/workflows.md | 357 +++++++++++++ 5 files changed, 1262 insertions(+), 6 deletions(-) create mode 100644 skills/databricks-pipelines/references/performance.md create mode 100644 skills/databricks-pipelines/references/pipeline-configuration.md create mode 100644 skills/databricks-pipelines/references/workflows.md diff --git a/manifest.json b/manifest.json index 2489156..735c046 100644 --- a/manifest.json +++ b/manifest.json @@ -1,6 +1,6 @@ { "version": "2", - "updated_at": "2026-05-24T21:11:52Z", + "updated_at": "2026-05-26T09:37:29Z", "skills": { "databricks-apps": { "version": "0.1.2", @@ -111,10 +111,10 @@ ] }, "databricks-pipelines": { - "version": "0.2.0", + "version": "0.3.0", "description": "Databricks Spark Declarative Pipelines (SDP) for ETL and streaming", "repo_dir": "skills", - "updated_at": "2026-05-24T21:11:46Z", + "updated_at": "2026-05-26T09:37:19Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -142,6 +142,8 @@ "references/options-parquet.md", "references/options-text.md", "references/options-xml.md", + "references/performance.md", + "references/pipeline-configuration.md", "references/python-basics.md", "references/sink-python.md", "references/sink.md", @@ -154,6 +156,7 @@ "references/temporary-view.md", "references/view-sql.md", "references/view.md", + "references/workflows.md", "references/write-spark-declarative-pipelines.md" ], "base_revision": "5c4b4fb0a82a" diff --git a/skills/databricks-pipelines/SKILL.md b/skills/databricks-pipelines/SKILL.md index c281e4a..4e36906 100644 --- a/skills/databricks-pipelines/SKILL.md +++ b/skills/databricks-pipelines/SKILL.md @@ -3,7 +3,7 @@ name: databricks-pipelines description: Develop Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables) on Databricks. Use when building batch or streaming data pipelines with Python or SQL. Invoke BEFORE starting implementation. compatibility: Requires databricks CLI (>= v0.292.0) metadata: - version: "0.2.0" + version: "0.3.0" parent: databricks-core --- @@ -172,19 +172,65 @@ Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables / DLT) is a fra If you have an existing DLT pipeline (`import dlt`, `@dlt.table`, `dlt.read(...)`, `dlt.apply_changes(...)`) and want to move to SDP, see [references/dlt-migration.md](references/dlt-migration.md). It covers both migration paths — DLT Python → SDP Python (`from pyspark import pipelines as dp`) and DLT Python → SDP SQL — with side-by-side conversions for the table decorators, reads, expectations, CDC/SCD, and partitioning → liquid clustering. +## Choose Your Workflow + +Three project shapes exist — pick before scaffolding: + +| Situation | Workflow | +|-----------|----------| +| New standalone pipeline project with its own bundle | **A. Standalone bundle** | +| Pipeline added to an existing DAB project | **B. Existing bundle** | +| Quick prototyping, no bundle (yet) | **C. Rapid CLI iteration** | + +Default to A for production-bound work and C for exploration. Full details, generated structures, polling patterns, and edit/re-upload flow in [references/workflows.md](references/workflows.md). + +## Language Selection (Python vs SQL) + +Decide before scaffolding — the choice picks template files (`.py` vs `.sql`) and which reference docs apply. Both can coexist, but pick a primary. + +| User signal | Pick | +|-------------|------| +| "Python pipeline", UDF, pandas, ML inference, pyspark | **Python** | +| "SQL pipeline", "SQL files" | **SQL** | +| "Simple pipeline", "create a table", "an aggregation" | **SQL** (simpler) | +| Complex parameterized logic, custom UDFs, ML | **Python** | + +If ambiguous, ask. Stick with the chosen language unless the user explicitly switches. + ## Scaffolding a New Pipeline Project -Use `databricks bundle init` with a config file to scaffold non-interactively. This creates a project in the `/` directory: +The newer `databricks pipelines init` is focused on pipeline projects: + +```bash +databricks pipelines init --output-dir . --config-file init-config.json +``` + +`init-config.json`: + +```json +{ + "project_name": "my_pipeline", + "initial_catalog": "prod_catalog", + "use_personal_schema": "no", + "initial_language": "sql" +} +``` + +The template-based `databricks bundle init lakeflow-pipelines` also works: ```bash databricks bundle init lakeflow-pipelines --config-file <(echo '{"project_name": "my_pipeline", "language": "python", "serverless": "yes"}') --profile < /dev/null ``` +Field constraints: + - `project_name`: letters, numbers, underscores only -- `language`: `python` or `sql`. Ask the user which they prefer: +- `language` / `initial_language`: `python` or `sql` (lowercase) - SQL: Recommended for straightforward transformations (filters, joins, aggregations) - Python: Recommended for complex logic (custom UDFs, ML, advanced processing) +See [references/workflows.md](references/workflows.md) for the full generated structure, `databricks.yml` essentials, and per-target catalog/schema patterns. + After scaffolding, create `CLAUDE.md` and `AGENTS.md` in the project directory. These files are essential to provide agents with guidance on how to work with the project. Use this content: ``` @@ -263,6 +309,15 @@ resources: Detailed reference guides for each pipeline API. **Read the relevant guide before writing pipeline code.** +### Project & Lifecycle + +- [Workflows](references/workflows.md) — Standalone bundle / existing bundle / rapid CLI iteration; language selection; `pipelines init`; start-update + poll-the-update pattern; edit/re-upload/restart flow +- [Pipeline Configuration](references/pipeline-configuration.md) — Full JSON config reference (top-level, clusters, event_log, notifications, configuration, restart_window, environment) + variant snippets (dev mode, non-serverless, continuous, notifications, autoscaling, custom event log, serverless Python deps) + multi-schema patterns + platform constraints +- [Performance Tuning](references/performance.md) — Liquid Clustering by layer (bronze/silver/gold), key-type rules, state-management strategies for streaming, join optimization, pre-aggregation, monitoring +- [Migrating from DLT](references/dlt-migration.md) — Side-by-side conversions (decorators, reads, expectations, CDC/SCD, partitioning → liquid clustering) + +### Datasets, Flows & Quality + - [Write Spark Declarative Pipelines](references/write-spark-declarative-pipelines.md) — Core syntax and rules ([Python](references/python-basics.md), [SQL](references/sql-basics.md)) - [Streaming Tables](references/streaming-table.md) — Continuous data stream processing ([Python](references/streaming-table-python.md), [SQL](references/streaming-table-sql.md)) - [Materialized Views](references/materialized-view.md) — Physically stored query results with incremental refresh ([Python](references/materialized-view-python.md), [SQL](references/materialized-view-sql.md)) diff --git a/skills/databricks-pipelines/references/performance.md b/skills/databricks-pipelines/references/performance.md new file mode 100644 index 0000000..cf82997 --- /dev/null +++ b/skills/databricks-pipelines/references/performance.md @@ -0,0 +1,490 @@ +# Performance Tuning + +Performance patterns for Spark Declarative Pipelines: Liquid Clustering, state management for streaming, join strategy, query optimization, and pre-aggregation. Examples are shown in both SQL and Python. + +--- + +## Liquid Clustering + +**Recommended** for data layout. Replaces `PARTITION BY` + `ZORDER`. Adaptive, multi-dimensional, self-optimizing — no more manual `OPTIMIZE`. + +### Basic syntax + +```sql +CREATE OR REFRESH STREAMING TABLE bronze_events +CLUSTER BY (event_type, event_date) +AS +SELECT *, current_timestamp() AS _ingested_at +FROM STREAM read_files('/Volumes/cat/sch/raw/events/', format => 'json'); +``` + +```python +@dp.table(cluster_by=["event_type", "event_date"]) +def bronze_events(): + return spark.readStream.format("cloudFiles").load("/Volumes/cat/sch/raw/events/") +``` + +### Automatic key selection + +```sql +CLUSTER BY (AUTO) +``` + +```python +cluster_by=["AUTO"] +``` + +Use `AUTO` while learning the workload, prototyping, or when access patterns are unclear. Pick keys manually for production once query patterns are stable. + +### Cluster key data types + +**Cluster keys must be numeric, string, date, or timestamp.** `BOOLEAN`, `ARRAY`, `MAP`, `STRUCT`, `BINARY` are rejected at runtime with `DELTA_CLUSTERING_COLUMNS_DATATYPE_NOT_SUPPORTED`. Low-cardinality flags also don't benefit from clustering — leave them out. + +### Cluster key selection by layer + +| Layer | Good keys | Rationale | +|-------|-----------|-----------| +| **Bronze** | `event_type`, `ingestion_date` | Filter by type for processing, by date for incremental loads. | +| **Silver** | `primary_key`, `business_date` | Entity lookups + time-range queries. | +| **Gold** | aggregation dimensions | Dashboard filters. | + +**Rules of thumb**: +- First key: most-selective filter (e.g. `customer_id`). +- Second key: next-most-common filter (e.g. date). +- Order matters. Most-selective first. +- Limit to **4 keys** — diminishing returns beyond that. +- Use `AUTO` if unsure. + +### Bronze example + +```sql +CREATE OR REFRESH STREAMING TABLE bronze_events +CLUSTER BY (event_type, ingestion_date) +TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true') +AS +SELECT *, + current_timestamp() AS _ingested_at, + CAST(current_date() AS DATE) AS ingestion_date +FROM STREAM read_files('/Volumes/cat/sch/raw/events/', format => 'json'); +``` + +```python +@dp.table( + name="bronze_events", + cluster_by=["event_type", "ingestion_date"], + table_properties={"delta.autoOptimize.optimizeWrite": "true"}, +) +def bronze_events(): + return ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "json") + .load("/Volumes/cat/sch/raw/events/") + .withColumn("_ingested_at", F.current_timestamp()) + .withColumn("ingestion_date", F.current_date()) + ) +``` + +### Silver example (clustering for joins + time filters) + +```sql +CREATE OR REFRESH STREAMING TABLE silver_orders +CLUSTER BY (customer_id, order_date) +AS +SELECT order_id, customer_id, product_id, + CAST(amount AS DECIMAL(10,2)) AS amount, -- DECIMAL for monetary + CAST(order_timestamp AS DATE) AS order_date, + order_timestamp +FROM STREAM bronze_orders; +``` + +```python +@dp.table(name="silver_orders", cluster_by=["customer_id", "order_date"]) +def silver_orders(): + return ( + spark.readStream.table("bronze_orders") + .withColumn("order_date", F.to_date("order_timestamp")) + .select("order_id", "customer_id", "product_id", "amount", "order_date") + ) +``` + +### Gold example (clustering on aggregation dimensions) + +```sql +CREATE OR REFRESH MATERIALIZED VIEW gold_sales_summary +CLUSTER BY (product_category, year_month) +AS +SELECT product_category, + DATE_FORMAT(order_date, 'yyyy-MM') AS year_month, + SUM(amount) AS total_sales, + COUNT(*) AS transaction_count, + AVG(amount) AS avg_order_value +FROM silver_orders +GROUP BY product_category, DATE_FORMAT(order_date, 'yyyy-MM'); +``` + +```python +@dp.materialized_view(name="gold_sales_summary", cluster_by=["product_category", "year_month"]) +def gold_sales_summary(): + return ( + spark.read.table("silver_orders") + .withColumn("year_month", F.date_format("order_date", "yyyy-MM")) + .groupBy("product_category", "year_month") + .agg(F.sum("amount").alias("total_sales"), + F.count("*").alias("transaction_count"), + F.avg("amount").alias("avg_order_value")) + ) +``` + +### Migrating from `PARTITION BY` + `ZORDER` + +Before (legacy): + +```sql +CREATE OR REFRESH STREAMING TABLE events +PARTITIONED BY (date DATE) +TBLPROPERTIES ('pipelines.autoOptimize.zOrderCols' = 'user_id,event_type') +AS SELECT ...; +``` + +After: + +```sql +CREATE OR REFRESH STREAMING TABLE events +CLUSTER BY (date, user_id, event_type) +AS SELECT ...; +``` + +Typical wins: 20–50% query improvement, no small-file problem, automatic optimization, no manual `OPTIMIZE` job. + +**Keep `PARTITION BY` only for**: regulatory requirements (physical separation), data-lifecycle (need to `DROP` partitions for retention), DBR < 13.3 compatibility, or existing huge tables where migration cost > benefit. + +--- + +## Table Properties + +### Auto-optimize + +```sql +TBLPROPERTIES ( + 'delta.autoOptimize.optimizeWrite' = 'true', + 'delta.autoOptimize.autoCompact' = 'true' +) +``` + +```python +table_properties={ + "delta.autoOptimize.optimizeWrite": "true", + "delta.autoOptimize.autoCompact": "true", +} +``` + +### Change Data Feed + +```sql +TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true') +``` + +Enable when downstream systems need efficient change tracking. + +### Retention (high-volume tables) + +```sql +TBLPROPERTIES ( + 'delta.logRetentionDuration' = '7 days', + 'delta.deletedFileRetentionDuration' = '7 days' +) +``` + +Use for high-volume tables to reduce storage cost. Be careful: short retention windows break time-travel queries beyond the window. + +--- + +## Materialized View Refresh + +```sql +-- Near-real-time +CREATE OR REFRESH MATERIALIZED VIEW gold_live_metrics +REFRESH EVERY 5 MINUTES +AS SELECT metric_name, AVG(metric_value) AS avg_value, MAX(last_updated) AS freshness + FROM silver_metrics GROUP BY metric_name; + +-- Daily +CREATE OR REFRESH MATERIALIZED VIEW gold_daily_summary +REFRESH EVERY 1 DAY +AS SELECT report_date, SUM(amount) AS total_amount + FROM silver_sales GROUP BY report_date; +``` + +### Incremental refresh + +MVs use incremental refresh automatically when possible. Requirements: +- Source has Delta row tracking enabled. +- No row-level filters. +- Aggregation/expression pattern is supported. +- **Serverless pipeline.** Incremental refresh for aggregations is a serverless feature. + +--- + +## State Management for Streaming + +Higher cardinality → more state. Watch the combinations in `GROUP BY`. + +```sql +-- High state: every unique combination creates state +SELECT user_id, product_id, session_id, COUNT(*) AS events +FROM STREAM bronze_events +GROUP BY user_id, product_id, session_id; -- 1M × 10K × 100M — massive +``` + +### Strategy 1: reduce cardinality + +```sql +-- 100 categories instead of 10K products +SELECT user_id, product_category, DATE(event_time) AS event_date, COUNT(*) AS events +FROM STREAM bronze_events +GROUP BY user_id, product_category, DATE(event_time); +``` + +```python +@dp.table(name="user_category_stats") +def user_category_stats(): + return ( + spark.readStream.table("bronze_events") + .groupBy("user_id", "product_category", + F.to_date("event_time").alias("event_date")) + .agg(F.count("*").alias("events")) + ) +``` + +### Strategy 2: use time windows + +```sql +SELECT user_id, window(event_time, '1 hour') AS time_window, COUNT(*) AS events +FROM STREAM bronze_events +GROUP BY user_id, window(event_time, '1 hour'); +``` + +```python +@dp.table(name="user_hourly_stats") +def user_hourly_stats(): + return ( + spark.readStream.table("bronze_events") + .groupBy("user_id", F.window("event_time", "1 hour")) + .agg(F.count("*").alias("events")) + ) +``` + +### Strategy 3: materialize intermediates (move state to batch) + +```sql +-- Streaming aggregation (maintains state) +CREATE OR REFRESH STREAMING TABLE user_daily_stats AS +SELECT user_id, DATE(event_time) AS event_date, COUNT(*) AS event_count +FROM STREAM bronze_events +GROUP BY user_id, DATE(event_time); + +-- Batch aggregation on top (no streaming state) +CREATE OR REFRESH MATERIALIZED VIEW user_monthly_stats AS +SELECT user_id, DATE_TRUNC('month', event_date) AS month, SUM(event_count) AS total_events +FROM user_daily_stats +GROUP BY user_id, DATE_TRUNC('month', event_date); +``` + +--- + +## Join Optimization + +### Stream-to-static (efficient) + +```sql +-- Small static dimension joined to large streaming fact +CREATE OR REFRESH STREAMING TABLE sales_enriched AS +SELECT s.sale_id, s.product_id, s.amount, p.product_name, p.category +FROM STREAM bronze_sales s +LEFT JOIN dim_products p ON s.product_id = p.product_id; +``` + +```python +@dp.table(name="sales_enriched") +def sales_enriched(): + sales = spark.readStream.table("bronze_sales") + products = spark.read.table("dim_products") # static, broadcastable + return sales.join(products, "product_id", "left") \ + .select("sale_id", "product_id", "amount", "product_name", "category") +``` + +**Rule**: keep static dimensions small (< 10K rows) so they broadcast. + +### Stream-to-stream (stateful, time-bounded) + +```sql +-- Time bounds limit state retention +CREATE OR REFRESH STREAMING TABLE orders_with_payments AS +SELECT o.order_id, o.amount AS order_amount, p.payment_id, p.amount AS payment_amount +FROM STREAM bronze_orders o +INNER JOIN STREAM bronze_payments p + ON o.order_id = p.order_id + AND p.payment_time BETWEEN o.order_time AND o.order_time + INTERVAL 1 HOUR; +``` + +```python +@dp.table(name="orders_with_payments") +def orders_with_payments(): + orders = spark.readStream.table("bronze_orders") + payments = spark.readStream.table("bronze_payments") + return orders.join( + payments, + (orders.order_id == payments.order_id) & + (payments.payment_time >= orders.order_time) & + (payments.payment_time <= orders.order_time + F.expr("INTERVAL 1 HOUR")), + "inner", + ) +``` + +Without time bounds, stream-to-stream state grows unbounded. + +--- + +## Query Optimization + +### Filter early + +```sql +-- Filter at source +CREATE OR REFRESH STREAMING TABLE silver_recent AS +SELECT * +FROM STREAM bronze_events +WHERE event_date >= CURRENT_DATE() - INTERVAL 7 DAYS; +``` + +```python +@dp.table(name="silver_recent") +def silver_recent(): + return (spark.readStream.table("bronze_events") + .filter(F.col("event_date") >= F.current_date() - 7)) +``` + +Pushing filters into the streaming read keeps downstream MV inputs small. The anti-pattern is wide-open silver tables filtered later in gold MVs — every row is processed twice. + +### Select specific columns + +Skip `SELECT *` once schema is stable. Narrowed projections enable column pruning in Delta and shrink wire/state size for stateful operations. + +--- + +## Pre-Aggregation + +When the same coarse aggregation is queried frequently, materialize it. Querying the MV is far cheaper than re-aggregating the underlying table. + +```sql +CREATE OR REFRESH MATERIALIZED VIEW orders_monthly AS +SELECT customer_id, YEAR(order_date) AS year, MONTH(order_date) AS month, + SUM(amount) AS total +FROM large_orders_table +GROUP BY customer_id, YEAR(order_date), MONTH(order_date); + +-- Query the MV directly +SELECT * FROM orders_monthly WHERE year = 2024; +``` + +```python +@dp.materialized_view(name="orders_monthly") +def orders_monthly(): + return (spark.read.table("large_orders_table") + .groupBy("customer_id", + F.year("order_date").alias("year"), + F.month("order_date").alias("month")) + .agg(F.sum("amount").alias("total"))) +``` + +--- + +## Compute Configuration + +| Aspect | Serverless | Classic | +|--------|-----------|---------| +| Startup | Seconds | Minutes | +| Scaling | Automatic, instant | Manual / autoscale | +| Cost | Pay-per-use | Pay for cluster time | +| Best for | Variable / dev / test / most prod | Steady, very long-running workloads with special requirements | + +**Default to serverless.** Switch to classic only when R, Spark RDD APIs, JAR/Maven libraries, or other serverless-incompatible features are required — see [pipeline-configuration.md](pipeline-configuration.md#serverless-limitations-force-classic-clusters). + +--- + +## Monitoring Freshness + +```sql +SELECT table_name, + MAX(event_timestamp) AS latest_event, + CURRENT_TIMESTAMP() AS now, + TIMESTAMPDIFF(MINUTE, MAX(event_timestamp), CURRENT_TIMESTAMP()) AS lag_minutes +FROM pipeline_monitoring.table_metrics +GROUP BY table_name; +``` + +Watch for: + +1. Slow streaming tables (high processing lag). +2. Large state operations (high memory). +3. Expensive joins (long batch durations). +4. Small-file accumulation (raise auto-optimize, check write patterns). + +--- + +## Complete Example (Python) + +```python +from pyspark import pipelines as dp +from pyspark.sql import functions as F + +@dp.table( + name="bronze_orders", + cluster_by=["order_date"], + table_properties={ + "delta.autoOptimize.optimizeWrite": "true", + "delta.autoOptimize.autoCompact": "true", + }, +) +def bronze_orders(): + return ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "json") + .load("/Volumes/cat/sch/raw/orders/") + .withColumn("_ingested_at", F.current_timestamp()) + .withColumn("order_date", F.to_date("order_timestamp")) + ) + +@dp.table(name="silver_orders", cluster_by=["customer_id", "order_date"]) +@dp.expect_or_drop("valid_amount", "amount > 0") +def silver_orders(): + return ( + spark.readStream.table("bronze_orders") + .filter(F.col("order_date") >= F.current_date() - 90) # filter early + .withColumn("amount", F.col("amount").cast("decimal(10,2)")) + .select("order_id", "customer_id", "amount", "order_date") + ) + +@dp.materialized_view(name="gold_daily_revenue", cluster_by=["order_date"]) +def gold_daily_revenue(): + return ( + spark.read.table("silver_orders") + .groupBy("order_date") + .agg(F.sum("amount").alias("total_revenue"), + F.count("order_id").alias("order_count"), + F.countDistinct("customer_id").alias("unique_customers")) + ) +``` + +--- + +## Common Issues + +| Issue | Cause / Fix | +|-------|-------------| +| Pipeline running slowly | Check clustering keys, state size, join patterns. | +| High memory usage | Unbounded state — add time windows, reduce cardinality. | +| Many small files | Enable auto-optimize table properties. | +| Expensive queries on large tables | Add clustering on filter columns, build pre-aggregated MVs. | +| MV refresh slow | Enable row tracking on source, verify the refresh is actually incremental. | +| `DELTA_CLUSTERING_COLUMNS_DATATYPE_NOT_SUPPORTED` | A cluster key has an unsupported type (BOOLEAN / complex). Replace with a numeric / string / date / timestamp column. | diff --git a/skills/databricks-pipelines/references/pipeline-configuration.md b/skills/databricks-pipelines/references/pipeline-configuration.md new file mode 100644 index 0000000..f102e57 --- /dev/null +++ b/skills/databricks-pipelines/references/pipeline-configuration.md @@ -0,0 +1,351 @@ +# Pipeline Configuration + +JSON field reference for `databricks pipelines create --json '{...}'` and `databricks pipelines update --json '{...}'`, plus variant snippets for common configurations. + +Defaults to **serverless + Unity Catalog**. Don't set `serverless: false` unless the user explicitly needs R, Spark RDD APIs, or JAR / Maven libraries. + +## Canonical Create + +```bash +databricks pipelines create --json '{ + "name": "my_pipeline", + "catalog": "my_catalog", + "schema": "my_schema", + "serverless": true, + "continuous": false, + "channel": "PREVIEW", + "libraries": [{"glob": {"include": "/Workspace/Users//my_pipeline/**"}}] +}' +``` + +**Always pass `"continuous": false` explicitly.** A continuous pipeline auto-restarts failed updates forever (`cause: RETRY_ON_FAILURE`), burning serverless cost and trapping polling loops. Only set `true` when the user explicitly asks for an always-on streaming pipeline. + +The variant blocks below show only the **deltas** to add or change — don't re-paste the whole JSON. + +--- + +## Top-Level Fields + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `serverless` | bool | `true` | Serverless compute. `false` requires `clusters`. | +| `continuous` | bool | `false` | `true` = always running. `false` = triggered runs. | +| `development` | bool | `false` | Dev mode: faster startup, relaxed validation, no retries. | +| `photon` | bool | `false` | Photon vectorized engine. | +| `edition` | str | `"CORE"` | `"CORE"`, `"PRO"`, `"ADVANCED"`. CDC requires `"ADVANCED"`. | +| `channel` | str | `"CURRENT"` | `"CURRENT"` (stable) or `"PREVIEW"` (latest features). | +| `clusters` | list | `[]` | Cluster configs. Required if `serverless: false`. | +| `configuration` | dict | `{}` | Spark/pipeline config key-value (all values strings). | +| `tags` | dict | `{}` | Metadata tags (max 25). | +| `event_log` | dict | auto | Custom event log table location. | +| `notifications` | list | `[]` | Email/webhook alerts. | +| `allow_duplicate_names` | bool | `false` | Allow multiple pipelines with the same name. | +| `budget_policy_id` | str | — | Budget policy for cost tracking. | +| `storage` | str | — | DBFS root (legacy — use Unity Catalog). | +| `target` | str | — | **Deprecated** — use `schema`. | +| `dry_run` | bool | `false` | Validate without creating (create only). | +| `run_as` | dict | — | Run as specific user / service principal. | +| `restart_window` | dict | — | Maintenance window for continuous-pipeline restarts. | +| `filters` | dict | — | Include/exclude specific paths. | +| `trigger` | dict | — | **Deprecated** — use `continuous`. | +| `deployment` | dict | — | `BUNDLE` (DABs) vs `DEFAULT`. | +| `environment` | dict | — | Python pip deps for serverless. | +| `gateway_definition` | dict | — | CDC gateway pipeline config. | +| `ingestion_definition` | dict | — | Managed ingestion (Salesforce, Workday, etc.). | +| `usage_policy_id` | str | — | Usage policy. | + +### Edition Comparison + +| Feature | CORE | PRO | ADVANCED | +|---------|------|-----|----------| +| Streaming tables | ✓ | ✓ | ✓ | +| Materialized views | ✓ | ✓ | ✓ | +| Expectations | ✓ | ✓ | ✓ | +| CDC | — | — | ✓ | +| SCD Type 1/2 | — | — | ✓ | + +--- + +## `clusters[]` — Classic Cluster Config + +Required when `serverless: false`. Each cluster object: + +| Field | Type | Description | +|-------|------|-------------| +| `label` | str | **Required**. `"default"` (main) or `"maintenance"`. | +| `num_workers` | int | Fixed workers (mutually exclusive with `autoscale`). | +| `autoscale` | dict | `{"min_workers": N, "max_workers": N, "mode": "ENHANCED"}` — `"ENHANCED"` recommended. | +| `node_type_id` | str | Instance type (e.g. `"i3.xlarge"`). | +| `driver_node_type_id` | str | Defaults to `node_type_id`. | +| `instance_pool_id` | str | Faster startup via pool. | +| `driver_instance_pool_id` | str | Pool for driver. | +| `spark_conf` | dict | Per-cluster Spark config. | +| `spark_env_vars` | dict | Env vars. | +| `custom_tags` | dict | Cloud resource tags. | +| `init_scripts` | list | Init scripts. | +| `aws_attributes` | dict | e.g. `{"availability": "SPOT", "zone_id": "us-west-2a"}`. | +| `azure_attributes` | dict | e.g. `{"availability": "SPOT_AZURE"}`. | +| `gcp_attributes` | dict | GCP-specific. | + +--- + +## `event_log` — Custom Event Log Table + +| Field | Description | +|-------|-------------| +| `catalog` | UC catalog for the event log table. | +| `schema` | Schema for the event log table. | +| `name` | Table name. | + +--- + +## `notifications[]` — Alerts + +| Field | Description | +|-------|-------------| +| `email_recipients` | List of email addresses. | +| `alerts` | `"on-update-success"`, `"on-update-failure"`, `"on-update-fatal-failure"`, `"on-flow-failure"`. | + +--- + +## `configuration` — Spark / Pipeline Config + +All values must be strings. + +| Key | Description | +|-----|-------------| +| `spark.sql.shuffle.partitions` | Number of shuffle partitions. `"auto"` recommended. | +| `pipelines.numRetries` | Retries on transient failures. | +| `pipelines.trigger.interval` | Trigger interval for continuous pipelines (e.g. `"1 hour"`). | +| `spark.databricks.delta.preview.enabled` | Enable Delta preview features (`"true"`). | + +Any key here is also accessible from pipeline code via `spark.conf.get("key")` — use this to parameterize transformations. + +--- + +## `run_as` — Execution Identity + +Only one of these: + +| Field | Description | +|-------|-------------| +| `user_name` | Email of workspace user (can only set to your own). | +| `service_principal_name` | Application ID (requires `servicePrincipal/user` role). | + +--- + +## `restart_window` — Continuous-Pipeline Restart Window + +For continuous pipelines, the 5-hour window when daily restarts may occur: + +| Field | Description | +|-------|-------------| +| `start_hour` | **Required**. Hour 0–23 when window begins. | +| `days_of_week` | `"MONDAY"`, `"TUESDAY"`, … (default: all). | +| `time_zone_id` | e.g. `"America/Los_Angeles"` (default UTC). | + +--- + +## `filters` — Path Filtering + +| Field | Description | +|-------|-------------| +| `include` | Paths to include. | +| `exclude` | Paths to exclude. | + +--- + +## `environment` — Serverless Python Deps + +| Field | Description | +|-------|-------------| +| `dependencies` | List of pip requirements, e.g. `["pandas==2.0.0", "requests"]`. | + +--- + +## `deployment` — Deployment Method + +| Field | Description | +|-------|-------------| +| `kind` | `"BUNDLE"` (DABs) or `"DEFAULT"`. | +| `metadata_file_path` | Path to deployment metadata. | + +--- + +## Variant Snippets + +Each block shows what to add to (or replace in) the canonical create JSON. + +### Development mode + +```json +"development": true, +"tags": {"environment": "development", "owner": "data-team"} +``` + +### Non-serverless / dedicated cluster + +```json +"serverless": false, +"photon": true, +"edition": "ADVANCED", +"clusters": [{ + "label": "default", + "num_workers": 4, + "node_type_id": "i3.xlarge", + "custom_tags": {"cost_center": "analytics"} +}] +``` + +### Continuous streaming + +```json +"continuous": true, +"configuration": {"spark.sql.shuffle.partitions": "auto"} +``` + +### Production autoscaling cluster + +```json +"clusters": [{ + "label": "default", + "autoscale": {"min_workers": 2, "max_workers": 8, "mode": "ENHANCED"}, + "node_type_id": "i3.xlarge", + "spark_conf": {"spark.sql.adaptive.enabled": "true"}, + "custom_tags": {"environment": "production"} +}] +``` + +### Email notifications + +```json +"notifications": [{ + "email_recipients": ["team@example.com", "oncall@example.com"], + "alerts": ["on-update-failure", "on-update-fatal-failure", "on-flow-failure"] +}] +``` + +### Serverless Python dependencies + +```json +"environment": { + "dependencies": ["scikit-learn==1.3.0", "pandas>=2.0.0", "requests"] +} +``` + +### Continuous with restart window + +Combine `"continuous": true` with: + +```json +"restart_window": { + "start_hour": 2, + "days_of_week": ["SATURDAY", "SUNDAY"], + "time_zone_id": "America/Los_Angeles" +} +``` + +### Custom event-log location + +```json +"event_log": { + "catalog": "audit_catalog", + "schema": "pipeline_logs", + "name": "my_pipeline_events" +} +``` + +--- + +## Updating an Existing Pipeline + +`update` takes the same JSON shape as `create`: + +```bash +databricks pipelines update --json '{ + "name": "updated_name", + "development": false, + "notifications": [{"email_recipients": ["team@example.com"], "alerts": ["on-update-failure"]}] +}' +``` + +Then trigger a new run with `databricks pipelines start-update [--full-refresh]`. See [workflows.md](workflows.md#step-4-start-an-update-and-poll-that-update) for the polling pattern — never poll top-level `pipelines get` state for run completion. + +--- + +## Multi-Schema Patterns + +**Preferred: one pipeline, multiple schemas** via fully-qualified table names. Simpler than running multiple pipelines. + +For trivial cases where all tables go to the same schema, use name prefixes (`bronze_*`, `silver_*`, `gold_*`). + +### Same catalog, separate schemas (parameterized) + +Set pipeline defaults to bronze; pull silver/gold schemas from configuration: + +```python +from pyspark import pipelines as dp +from pyspark.sql.functions import col + +silver_schema = spark.conf.get("silver_schema") +gold_schema = spark.conf.get("gold_schema") +landing_schema = spark.conf.get("landing_schema") + +@dp.table(name="orders_bronze") +def orders_bronze(): + return spark.readStream.table(f"{landing_schema}.orders_raw") + +@dp.table(name=f"{silver_schema}.orders_clean") +def orders_clean(): + return spark.read.table("orders_bronze").filter(col("order_id").isNotNull()) + +@dp.materialized_view(name=f"{gold_schema}.orders_by_date") +def orders_by_date(): + return (spark.read.table(f"{silver_schema}.orders_clean") + .groupBy("order_date").count()) +``` + +Pass `silver_schema` / `gold_schema` / `landing_schema` via the pipeline's `configuration` block. + +### Custom catalog AND schema per layer + +For cross-catalog scenarios, use fully-qualified names directly: + +```python +@dp.table(name=f"{silver_catalog}.{silver_schema}.orders_clean") +def orders_clean(): + return spark.read.table("orders_bronze").filter(col("order_id").isNotNull()) +``` + +Same approach in SQL via fully-qualified `catalog.schema.table` in `CREATE OR REFRESH ...`. + +--- + +## Platform Constraints + +### Serverless requirements + +| Requirement | Notes | +|-------------|-------| +| Unity Catalog | Required — serverless always uses UC. | +| Region | Must be serverless-enabled. | +| Terms | Workspace must accept serverless terms of use. | +| CDC | Requires serverless (or Pro/Advanced with classic). | + +### Serverless limitations (force classic clusters) + +| Limitation | Reason to use classic | +|------------|----------------------| +| R language | Not supported on serverless. | +| Spark RDD APIs | Not supported. | +| JAR libraries / Maven coordinates | Not supported. | +| DBFS root access | Limited — use UC external locations. | +| Global temp views | Not supported. | + +### General constraints + +| Constraint | Notes | +|------------|-------| +| Schema evolution | Streaming tables need full refresh for incompatible changes. | +| `PIVOT` clause | Unsupported. | +| Sinks | Python only; streaming only; append-only flows. | diff --git a/skills/databricks-pipelines/references/workflows.md b/skills/databricks-pipelines/references/workflows.md new file mode 100644 index 0000000..671157c --- /dev/null +++ b/skills/databricks-pipelines/references/workflows.md @@ -0,0 +1,357 @@ +# Pipeline Project Workflows + +Three workflows for building Spark Declarative Pipelines, depending on what already exists in the project and how much DAB scaffolding the user wants. + +## Choose Your Workflow + +| Situation | Workflow | +|-----------|----------| +| New, standalone pipeline project with its own bundle | **A. Standalone bundle** | +| Pipeline added to an existing DAB project | **B. Existing bundle** | +| Quick prototyping, no bundle (yet) | **C. Rapid CLI iteration** | + +If the user is unsure, default to A for production-bound work and C for exploration. + +--- + +## Language Selection (Python vs SQL) + +Decide before scaffolding — the choice picks the template files (`.py` vs `.sql`) and pulls in different reference docs. Both languages can coexist in the same project, but pick one primary. + +| User signal | Pick | +|-------------|------| +| "Python pipeline", "use Python", UDF, pandas, ML inference, pyspark | **Python** | +| "SQL pipeline", "SQL files", "use SQL" | **SQL** | +| "Create a simple pipeline", "create a table", "an aggregation" | **SQL** (simpler) | +| Complex parameterized logic, custom UDFs, ML, advanced processing | **Python** | + +If the request is ambiguous, ask. Stick with the chosen language unless the user explicitly switches. + +--- + +## Workflow A: Standalone Bundle (`pipelines init`) + +Use when the user wants a new project where the pipeline *is* the project. + +### Non-interactive (recommended for agents) + +```bash +databricks pipelines init --output-dir . --config-file init-config.json +``` + +`init-config.json`: + +```json +{ + "project_name": "customer_pipeline", + "initial_catalog": "prod_catalog", + "use_personal_schema": "no", + "initial_language": "sql" +} +``` + +| Field | Notes | +|-------|-------| +| `project_name` | Letters, numbers, underscores only. Used for bundle name + folder. | +| `initial_catalog` | Must exist in Unity Catalog. | +| `use_personal_schema` | `"yes"` → `${workspace.current_user.short_name}` (dev). `"no"` → fixed value (prod). | +| `initial_language` | `"sql"` or `"python"` (lowercase). | + +### Interactive + +```bash +databricks pipelines init --output-dir . +``` + +Prompts for the same fields. + +### Generated structure + +``` +project_root/ +├── databricks.yml # Bundle config +├── pyproject.toml # Python only +├── resources/ +│ ├── _etl.pipeline.yml # Pipeline resource +│ └── sample_job.job.yml # Optional scheduled job +└── src/ + └── _etl/ + ├── explorations/ # Ad-hoc notebooks (NOT pipeline code) + └── transformations/ # Pipeline transformations + ├── sample_*.sql # or .py + └── ... +``` + +**Key rule**: Pipeline transformations are raw `.sql` / `.py` files. Notebooks go in `explorations/` for ad-hoc work only. + +### Customize and deploy + +1. Replace `sample_*` files in `transformations/` with real datasets (1 dataset per file). +2. Edit `databricks.yml` to set per-target catalog/schema variables and workspace host. +3. Edit `resources/_etl.pipeline.yml` for pipeline-level settings (serverless on by default). +4. `databricks bundle validate` → `databricks bundle deploy [-t ]` → `databricks bundle run `. + +### Alternative: `databricks bundle init lakeflow-pipelines` + +The older template-based scaffolding also works: + +```bash +databricks bundle init lakeflow-pipelines \ + --config-file <(echo '{"project_name": "my_pipeline", "language": "python", "serverless": "yes"}') \ + --profile < /dev/null +``` + +Both produce DAB-shaped projects; `pipelines init` is the newer, more focused command. + +### `databricks.yml` essentials + +```yaml +bundle: + name: customer_pipeline + +include: + - resources/*.yml + - resources/*/*.yml + +variables: + catalog: { description: The catalog to use } + schema: { description: The schema to use } + +targets: + dev: + mode: development # prefixes resources with [dev ], pauses schedules + default: true + workspace: + host: https://.cloud.databricks.com + variables: + catalog: dev_catalog + schema: ${workspace.current_user.short_name} + + prod: + mode: production # no prefix, schedules active + workspace: + host: https://.cloud.databricks.com + root_path: /Workspace/Users//.bundle/${bundle.name}/${bundle.target} + variables: + catalog: prod_catalog + schema: production + permissions: + - user_name: + level: CAN_MANAGE +``` + +### Pipeline resource (`resources/.pipeline.yml`) + +```yaml +resources: + pipelines: + customer_pipeline_etl: + name: customer_pipeline_etl + catalog: ${var.catalog} + schema: ${var.schema} + serverless: true + continuous: false # explicit — true auto-retries failed updates forever + root_path: "../src/customer_pipeline_etl" + libraries: + - glob: + include: ../src/customer_pipeline_etl/transformations/** + environment: # serverless Python deps (optional) + dependencies: + - --editable ${workspace.file_path} +``` + +### Python project dependencies + +Python projects ship a `pyproject.toml`. Runtime deps go in `[project].dependencies`; dev-only in `[project.optional-dependencies].dev`. The `--editable ${workspace.file_path}` line in the pipeline resource installs the package on serverless compute at deploy time. + +--- + +## Workflow B: Pipeline in Existing Bundle + +Use when `databricks.yml` already exists for a larger project (app + jobs + dashboards) and a pipeline is being added to it. + +### Step 1: Add a pipeline resource file + +`resources/my_pipeline.pipeline.yml`: + +```yaml +resources: + pipelines: + my_pipeline: + name: my_pipeline + catalog: ${var.catalog} + schema: ${var.schema} + serverless: true + continuous: false + libraries: + - glob: + include: ../src/pipelines/my_pipeline/** +``` + +### Step 2: Add source files + +``` +src/pipelines/my_pipeline/ +├── bronze_ingest.sql +├── silver_clean.sql +└── gold_summary.sql +``` + +### Step 3: Deploy + +```bash +databricks bundle deploy +databricks bundle run my_pipeline +``` + +The pipeline picks up the bundle's existing targets / variables / permissions. + +--- + +## Workflow C: Rapid CLI Iteration (no bundle) + +Use for prototyping when bundle scaffolding would slow the user down. Skip when the work is production-bound — workflow A or B is better long-term. + +### Step 1: Write files locally + +`.sql` or `.py` files in a folder. See [python-basics.md](python-basics.md) or [sql-basics.md](sql-basics.md) for syntax. + +### Step 2: Upload to workspace + +```bash +databricks workspace import-dir ./my_pipeline /Workspace/Users//my_pipeline +``` + +Re-upload with `--overwrite` after every code change. + +### Step 3: Create the pipeline + +```bash +databricks pipelines create --json '{ + "name": "my_pipeline", + "catalog": "my_catalog", + "schema": "my_schema", + "serverless": true, + "continuous": false, + "channel": "PREVIEW", + "libraries": [{"glob": {"include": "/Workspace/Users//my_pipeline/**"}}] +}' +``` + +`libraries` field notes: + +- `"glob"` — directory of files. Recommended. +- `"file"` — single `.sql` / `.py`. A `"file"` pointing at a folder fails with `Paths must end with .py or .sql`. +- `"notebook"` — **deprecated**, never use. + +Use enumerated `"file"` entries instead of `"glob"` only when explicit ordering matters. + +Capture the returned `pipeline_id`. + +### Step 4: Start an update and poll *that update* + +```bash +UPDATE_ID=$(databricks pipelines start-update | jq -r .update_id) +# Or with full refresh (destructive on streaming state — omit for incremental): +# UPDATE_ID=$(databricks pipelines start-update --full-refresh | jq -r .update_id) + +while :; do + STATE=$(databricks pipelines get-update "$UPDATE_ID" | jq -r '.update.state') + echo "$(date +%H:%M:%S) update=$UPDATE_ID state=$STATE" + case "$STATE" in COMPLETED|FAILED|CANCELED) break;; esac + sleep 30 +done +``` + +**Why poll the update, not the pipeline.** Top-level pipeline `state` flips back to `RUNNING` on `RETRY_ON_FAILURE`, so a loop watching the pipeline (or `latest_updates[0]`) can spin past a real `FAILED` update forever. Poll the captured `update_id` and stop on the first terminal state — including `FAILED`. + +**On `FAILED`**: read the events log, don't re-run. + +```bash +databricks pipelines list-pipeline-events \ + | jq '[.[] | select(.level=="ERROR") | {event_type, message: (.message // "")[0:300]}] | .[0:5]' +``` + +If the pipeline is already `RUNNING`, `start-update` queues the new update. Force-stop with `databricks pipelines stop ` first if needed. + +### Step 5: Edit → re-upload → restart + +```bash +# Re-upload (whole dir) +databricks workspace import-dir ./my_pipeline /Workspace/Users//my_pipeline --overwrite + +# Or a single file +databricks workspace import /Workspace/Users//my_pipeline/gold.sql \ + --file ./my_pipeline/gold.sql --format RAW --overwrite + +# Restart +databricks pipelines start-update +``` + +**Use `--format RAW`** for raw `.sql` / `.py` FILE entries. `--format SOURCE --language SQL|PYTHON` uploads a workspace *notebook* — and **notebooks are deprecated for pipelines**. Mixing the two on the same path fails with `Cannot overwrite the asset ... due to type mismatch (asked: NOTEBOOK, actual: FILE)`. + +### Step 6: Validate output data + +Even on `COMPLETED`, verify the data: + +```bash +databricks experimental aitools tools discover-schema \ + my_catalog.my_schema.bronze_orders \ + my_catalog.my_schema.silver_orders \ + my_catalog.my_schema.gold_summary +``` + +Returns columns/types, 5 sample rows, total row count, and null counts per column per table. + +Check for: empty tables (ingestion or filter problems), unexpected row counts (broken joins), missing columns (schema mismatch), nulls in key columns (data quality). + +### Python SDK alternative + +```python +from databricks.sdk import WorkspaceClient +import time + +w = WorkspaceClient() + +pipeline = w.pipelines.create( + name="my_pipeline", + catalog="my_catalog", + schema="my_schema", + serverless=True, + continuous=False, + libraries=[{"glob": {"include": "/Workspace/Users//my_pipeline/**"}}], + development=True, +) + +update = w.pipelines.start_update( + pipeline_id=pipeline.pipeline_id, + full_refresh=True, +) + +while True: + u = w.pipelines.get_update(pipeline_id=pipeline.pipeline_id, + update_id=update.update_id).update + if str(u.state) in ("COMPLETED", "FAILED", "CANCELED"): + print(f"Update {u.update_id}: {u.state}") + break + time.sleep(10) +``` + +--- + +## Migrating from a Manual Folder Structure + +If the user already has `bronze/`, `silver/`, `gold/` folders without a bundle, migrate to workflow A by wrapping them in a `databricks.yml` and a pipeline resource pointing at the existing folders via a `glob`. No file moves required — the medallion folders work as-is under `transformations/**`. + +--- + +## Common Initialization Issues + +| Issue | Fix | +|-------|-----| +| `Command not found: databricks` | `pip install databricks-cli` | +| `Invalid catalog name` | `databricks catalogs list` and verify; create with `databricks catalogs create --json '{"name": "..."}'` | +| `Language option not recognized` | Use lowercase `"sql"` / `"python"`, not `"SQL"` / `"Python"` | +| Files deploy but pipeline doesn't pick them up | Glob pattern in `libraries` doesn't match — re-check `include` path relative to the resource file | +| `Bundle validation failed: Invalid schema` | `databricks bundle validate`, check YAML indentation (spaces, not tabs) | +| Files deploy but pipeline config stale | `databricks bundle deploy --force` | From 2c91a595db90ae4f980f3123e4ea16212c71c676 Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Tue, 26 May 2026 15:24:21 +0000 Subject: [PATCH 3/3] skills(pipelines): port streaming patterns, kafka, SCD-2 querying MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3 of the a-d-k → d-a-s port. Closes the remaining customer-facing gaps from the python/{2,3,4} and sql/{2,3,4} reference files that the phase-2 port had judged as "covered" but on audit weren't. New references: - streaming-patterns.md — combined SQL + Python. Deduplication (by key, with time window, composite); windowed aggregations (tumbling, multi-size, session windows); event-time vs processing-time guidance; rescue-data quarantine (Auto Loader `_rescued_data` → bronze_quarantine + silver_clean fanout); stream-to-stream join as a pattern with cross-link to performance.md; running totals; anomaly detection (rolling z-score outlier flag); end-to-end lag monitoring. - kafka.md — combined SQL + Python. Basic Kafka read (`spark.readStream. format("kafka")` and `read_kafka()`); JSON payload parsing with explicit schemas; Databricks Secrets-based SASL/PLAIN auth; mTLS notes; Event Hubs via the Kafka protocol; pipeline-configuration plumbing for brokers/topics; pointer to sink.md for writing back to Kafka. Fills a full gap — stable's SKILL.md API table listed `read_kafka` and `format("kafka")` with no linked skill. - scd-2-querying.md — combined SQL + Python. `__START_AT` / `__END_AT` temporal semantics; current-state materialized views; point-in-time queries with the inclusive-lower / exclusive-upper boundary; per- entity history; period-bounded change analysis; joining facts with historical dimensions (as-of-transaction-time and current-dim variants); pre-filter MV optimization; clustering on (entity_key, __START_AT). Cross-references added: - auto-loader.md → streaming-patterns.md (quarantine), kafka.md, lag monitoring. - auto-cdc.md → scd-2-querying.md. - SKILL.md API reference list adds kafka, scd-2-querying, streaming- patterns. Co-authored-by: Isaac --- manifest.json | 3 + skills/databricks-pipelines/SKILL.md | 3 + .../references/auto-cdc.md | 4 + .../references/auto-loader.md | 6 + .../databricks-pipelines/references/kafka.md | 229 +++++++++ .../references/scd-2-querying.md | 277 +++++++++++ .../references/streaming-patterns.md | 457 ++++++++++++++++++ 7 files changed, 979 insertions(+) create mode 100644 skills/databricks-pipelines/references/kafka.md create mode 100644 skills/databricks-pipelines/references/scd-2-querying.md create mode 100644 skills/databricks-pipelines/references/streaming-patterns.md diff --git a/manifest.json b/manifest.json index 2365f97..d3f7e77 100644 --- a/manifest.json +++ b/manifest.json @@ -123,6 +123,7 @@ "references/expectations.md", "references/foreach-batch-sink-python.md", "references/foreach-batch-sink.md", + "references/kafka.md", "references/materialized-view-python.md", "references/materialized-view-sql.md", "references/materialized-view.md", @@ -136,9 +137,11 @@ "references/performance.md", "references/pipeline-configuration.md", "references/python-basics.md", + "references/scd-2-querying.md", "references/sink-python.md", "references/sink.md", "references/sql-basics.md", + "references/streaming-patterns.md", "references/streaming-table-python.md", "references/streaming-table-sql.md", "references/streaming-table.md", diff --git a/skills/databricks-pipelines/SKILL.md b/skills/databricks-pipelines/SKILL.md index 3decac4..78e05b3 100644 --- a/skills/databricks-pipelines/SKILL.md +++ b/skills/databricks-pipelines/SKILL.md @@ -324,7 +324,10 @@ Detailed reference guides for each pipeline API. **Read the relevant guide befor - [Views](references/view.md) — Reusable query logic published to Unity Catalog ([SQL](references/view-sql.md)) - [Temporary Views](references/temporary-view.md) — Pipeline-private views ([Python](references/temporary-view-python.md), [SQL](references/temporary-view-sql.md)) - [Auto Loader](references/auto-loader.md) — Incrementally ingest files from cloud storage ([Python](references/auto-loader-python.md), [SQL](references/auto-loader-sql.md)) +- [Kafka Ingestion](references/kafka.md) — Read from Kafka / Event Hubs with JSON parsing, Secrets-based auth - [Auto CDC](references/auto-cdc.md) — Process Change Data Capture feeds, SCD Type 1 & 2 ([Python](references/auto-cdc-python.md), [SQL](references/auto-cdc-sql.md)) +- [SCD Type 2 Querying](references/scd-2-querying.md) — Current-state views, point-in-time queries, joining facts with historical dimensions +- [Streaming Patterns](references/streaming-patterns.md) — Deduplication, windowed aggregations (tumbling/multi-size/session), late-arriving data, rescue-data quarantine, monitoring lag, anomaly detection - [Expectations](references/expectations.md) — Define and enforce data quality constraints ([Python](references/expectations-python.md), [SQL](references/expectations-sql.md)) - [Sinks](references/sink.md) — Write to Kafka, Event Hubs, external Delta tables ([Python](references/sink-python.md)) - [ForEachBatch Sinks](references/foreach-batch-sink.md) — Custom streaming sink with per-batch Python logic ([Python](references/foreach-batch-sink-python.md)) diff --git a/skills/databricks-pipelines/references/auto-cdc.md b/skills/databricks-pipelines/references/auto-cdc.md index 5fad71a..3bdad12 100644 --- a/skills/databricks-pipelines/references/auto-cdc.md +++ b/skills/databricks-pipelines/references/auto-cdc.md @@ -18,4 +18,8 @@ For detailed implementation guides: - **Python**: [auto-cdc-python.md](auto-cdc-python.md) - **SQL**: [auto-cdc-sql.md](auto-cdc-sql.md) +## Reading SCD Type 2 Tables + +For querying the history tables produced by SCD Type 2 (`__START_AT` / `__END_AT`), point-in-time queries, change analysis, and joining facts with historical dimensions, see [scd-2-querying.md](scd-2-querying.md). + **Note**: The API is also known as `applyChanges` in some contexts. diff --git a/skills/databricks-pipelines/references/auto-loader.md b/skills/databricks-pipelines/references/auto-loader.md index c686304..642031b 100644 --- a/skills/databricks-pipelines/references/auto-loader.md +++ b/skills/databricks-pipelines/references/auto-loader.md @@ -19,6 +19,12 @@ For detailed implementation guides: - **Python**: [auto-loader-python.md](auto-loader-python.md) - **SQL**: [auto-loader-sql.md](auto-loader-sql.md) +## Related Patterns + +- [Rescue-data quarantine](streaming-patterns.md#rescue-data-quarantine) — route rows where Auto Loader rescued malformed fields to a side table instead of dropping them. +- [Kafka ingestion](kafka.md) — for message-bus sources (Kafka, Event Hubs). +- [Monitoring lag](streaming-patterns.md#monitoring-lag) — track end-to-end freshness. + ## Format-Specific Options For format-specific configuration options, refer to: diff --git a/skills/databricks-pipelines/references/kafka.md b/skills/databricks-pipelines/references/kafka.md new file mode 100644 index 0000000..5e776b9 --- /dev/null +++ b/skills/databricks-pipelines/references/kafka.md @@ -0,0 +1,229 @@ +# Kafka Ingestion + +Ingest from Apache Kafka into streaming tables. Examples in both Python (`spark.readStream.format("kafka")`) and SQL (`read_kafka()`). Same pattern works for Azure Event Hubs via the Kafka protocol — see [Event Hubs](#event-hubs) below. + +For Kinesis, Pub/Sub, and Pulsar, use the analogous `read_kinesis`, `read_pubsub`, `read_pulsar` functions / Spark formats — same overall shape as below. + +--- + +## Basic Read + +Kafka returns rows with binary `key` and `value` columns plus `topic`, `partition`, `offset`, `timestamp`. Cast to strings (or `from_json` / `from_avro`) downstream. + +```sql +CREATE OR REFRESH STREAMING TABLE bronze_kafka_events AS +SELECT CAST(key AS STRING) AS event_key, + CAST(value AS STRING) AS event_value, + topic, partition, offset, + timestamp AS kafka_timestamp, + current_timestamp() AS _ingested_at +FROM read_kafka( + bootstrapServers => '${kafka_brokers}', + subscribe => 'events-topic', + startingOffsets => 'latest' +); +``` + +```python +from pyspark import pipelines as dp +from pyspark.sql import functions as F + +@dp.table(name="bronze_kafka_events") +def bronze_kafka_events(): + kafka_brokers = spark.conf.get("kafka_brokers") + return ( + spark.readStream.format("kafka") + .option("kafka.bootstrap.servers", kafka_brokers) + .option("subscribe", "events-topic") + .option("startingOffsets", "latest") + .load() + .selectExpr( + "CAST(key AS STRING) AS event_key", + "CAST(value AS STRING) AS event_value", + "topic", "partition", "offset", + "timestamp AS kafka_timestamp") + .withColumn("_ingested_at", F.current_timestamp()) + ) +``` + +**Documentation**: [`read_kafka` function reference](https://docs.databricks.com/aws/en/sql/language-manual/functions/read_kafka). + +### Common options + +| Option | Purpose | +|--------|---------| +| `bootstrapServers` / `kafka.bootstrap.servers` | Broker list. Use a pipeline config var, not a literal. | +| `subscribe` | Topic name or comma-separated list. | +| `subscribePattern` | Regex over topic names (alternative to `subscribe`). | +| `startingOffsets` | `"latest"`, `"earliest"`, or JSON per-partition offsets. | +| `endingOffsets` | Only for batch reads — ignored in streaming. | +| `maxOffsetsPerTrigger` | Throttle per micro-batch. | +| `failOnDataLoss` | Default `true`. Set `false` only when you accept gaps. | + +--- + +## Parse JSON Payloads + +`value` is a binary/string blob. Extract structured columns with `from_json` (SQL/Python) against an explicit schema. + +```sql +CREATE OR REFRESH STREAMING TABLE silver_events AS +SELECT data.*, kafka_timestamp, _ingested_at +FROM ( + SELECT from_json(event_value, + 'event_id STRING, event_type STRING, timestamp TIMESTAMP') AS data, + kafka_timestamp, _ingested_at + FROM STREAM bronze_kafka_events +); +``` + +```python +from pyspark.sql.types import StructType, StructField, StringType, TimestampType + +event_schema = StructType([ + StructField("event_id", StringType()), + StructField("event_type", StringType()), + StructField("timestamp", TimestampType()), +]) + +@dp.table(name="silver_events") +def silver_events(): + return ( + spark.readStream.table("bronze_kafka_events") + .withColumn("data", F.from_json("event_value", event_schema)) + .select("data.*", "kafka_timestamp", "_ingested_at") + ) +``` + +**Schema hygiene**: keep the schema in code (Python `StructType` or SQL string), versioned alongside the pipeline. Inferring JSON schema from a streaming Kafka source is not supported — the schema must be explicit. + +For Avro / Protobuf payloads, swap `from_json` for `from_avro` / `from_protobuf` (with Schema Registry config). Same overall pattern. + +--- + +## Authentication + +### Databricks Secrets + +Don't put credentials in code or pipeline config literally. Use `{{secrets/scope/key}}` interpolation. + +```sql +-- SASL/PLAIN +FROM read_kafka( + bootstrapServers => '${kafka_brokers}', + subscribe => 'events-topic', + `kafka.security.protocol` => 'SASL_SSL', + `kafka.sasl.mechanism` => 'PLAIN', + `kafka.sasl.jaas.config` => + 'org.apache.kafka.common.security.plain.PlainLoginModule required ' || + 'username="{{secrets/kafka/username}}" ' || + 'password="{{secrets/kafka/password}}";' +); +``` + +```python +@dp.table(name="bronze_kafka_authenticated") +def bronze_kafka_authenticated(): + username = dbutils.secrets.get(scope="kafka", key="username") + password = dbutils.secrets.get(scope="kafka", key="password") + return ( + spark.readStream.format("kafka") + .option("kafka.bootstrap.servers", spark.conf.get("kafka_brokers")) + .option("subscribe", "events-topic") + .option("kafka.security.protocol", "SASL_SSL") + .option("kafka.sasl.mechanism", "PLAIN") + .option("kafka.sasl.jaas.config", + f'org.apache.kafka.common.security.plain.PlainLoginModule required ' + f'username="{username}" password="{password}";') + .load() + ) +``` + +### TLS / mTLS + +For mTLS, additional `kafka.ssl.truststore.*` and `kafka.ssl.keystore.*` options are required. Truststore/keystore files typically come from Unity Catalog volumes; pass file paths via pipeline config. + +--- + +## Event Hubs (via Kafka protocol) + +Azure Event Hubs speaks the Kafka protocol on port 9093. Use the same Kafka source — only the connection string changes. + +```sql +FROM read_kafka( + bootstrapServers => '.servicebus.windows.net:9093', + subscribe => '', + `kafka.security.protocol` => 'SASL_SSL', + `kafka.sasl.mechanism` => 'PLAIN', + `kafka.sasl.jaas.config` => + 'org.apache.kafka.common.security.plain.PlainLoginModule required ' || + 'username="$ConnectionString" ' || + 'password="{{secrets/eventhub/connection-string}}";' +); +``` + +```python +@dp.table(name="bronze_eventhub_events") +def bronze_eventhub_events(): + conn_str = dbutils.secrets.get(scope="eventhub", key="connection-string") + return ( + spark.readStream.format("kafka") + .option("kafka.bootstrap.servers", ".servicebus.windows.net:9093") + .option("subscribe", "") + .option("kafka.security.protocol", "SASL_SSL") + .option("kafka.sasl.mechanism", "PLAIN") + .option("kafka.sasl.jaas.config", + 'org.apache.kafka.common.security.plain.PlainLoginModule required ' + f'username="$ConnectionString" password="{conn_str}";') + .load() + ) +``` + +The username is the literal string `$ConnectionString` and the password is the namespace-level or entity-level connection string (with `SharedAccessKey=…`). + +--- + +## Pipeline Configuration + +Pass Kafka brokers, topics, and consumer-group identity through pipeline configuration so dev/prod can differ without code changes. + +```yaml +# In resources/.pipeline.yml +resources: + pipelines: + my_pipeline: + ... + configuration: + kafka_brokers: "broker-1:9092,broker-2:9092,broker-3:9092" + kafka_topic: "events-topic" +``` + +Read in code with `spark.conf.get("kafka_brokers")` (Python) or `${kafka_brokers}` (SQL). + +--- + +## Writing to Kafka (Sinks) + +Sinks are Python-only. Write a payload to Kafka by creating a sink with `format="kafka"` and appending via `@dp.append_flow`. The `value` column is mandatory — use `to_json(struct(*))` to serialize the row. See [sink.md](sink.md) and [sink-python.md](sink-python.md). + +--- + +## Best Practices + +1. **Always cast `value` to a usable type** (`STRING`, `BINARY`) and parse with `from_json` / `from_avro` against an explicit schema. Don't carry `value` as bytes downstream. +2. **Add `_ingested_at`** for lag monitoring — see [streaming-patterns.md](streaming-patterns.md#monitoring-lag). +3. **Tune `maxOffsetsPerTrigger`** if downstream operations are bottlenecking. +4. **Don't set `failOnDataLoss = false`** unless you genuinely accept gaps. The default protects against retention-window data loss. +5. **Use the parent `databricks-core` skill** for secret-scope management. + +--- + +## Common Issues + +| Issue | Fix | +|-------|-----| +| `Unable to find Kafka source` | Confirm `format("kafka")` (Python) / `read_kafka` (SQL) and that the cluster has Kafka client libraries (default on serverless / DBR ML / standard runtimes). | +| `Connection refused` / SSL handshake | Verify `bootstrapServers` reachability and `kafka.security.protocol`. | +| Schema for `value` doesn't match | `from_json` returns `NULL` on parse failure — add a quarantine fanout on `data IS NULL` similar to [rescue-data quarantine](streaming-patterns.md#rescue-data-quarantine). | +| Increasing consumer lag | Bottleneck downstream — see [streaming-patterns.md](streaming-patterns.md#monitoring-lag) for lag table; tune cluster size / `maxOffsetsPerTrigger`. | +| `failOnDataLoss` error after a long pause | Kafka topic retention expired the offset checkpoint. Reset the pipeline (full refresh) or start from `earliest`. | diff --git a/skills/databricks-pipelines/references/scd-2-querying.md b/skills/databricks-pipelines/references/scd-2-querying.md new file mode 100644 index 0000000..9a626a1 --- /dev/null +++ b/skills/databricks-pipelines/references/scd-2-querying.md @@ -0,0 +1,277 @@ +# Querying SCD Type 2 Tables + +How to read SCD Type 2 history tables produced by Auto CDC: current-state views, point-in-time queries, change analysis, and joining facts with historical dimensions. Examples in both SQL and Python. + +For the CDC flow that *writes* these tables, see [auto-cdc.md](auto-cdc.md) and the per-language references. + +--- + +## Temporal Columns + +SCD Type 2 tables (from `stored_as_scd_type=2` / `STORED AS SCD TYPE 2`) include two system columns: + +| Column | Meaning | +|--------|---------| +| `__START_AT` | When this version became effective (typically `sequence_by` value). | +| `__END_AT` | When this version expired. `NULL` for the current version. | + +Both have the same type as the `SEQUENCE BY` / `sequence_by` column (usually `TIMESTAMP`). + +**Rule of thumb**: `WHERE __END_AT IS NULL` selects only current rows. That's the most common filter — bake it into a materialized view if you query it often. + +--- + +## Current State + +```sql +-- All current records (materialize for repeated use) +CREATE OR REFRESH MATERIALIZED VIEW dim_customers_current AS +SELECT customer_id, customer_name, email, phone, address, + __START_AT AS valid_from +FROM dim_customers +WHERE __END_AT IS NULL; + +-- Single customer current row +SELECT * +FROM dim_customers +WHERE customer_id = '12345' AND __END_AT IS NULL; +``` + +```python +@dp.materialized_view(name="dim_customers_current") +def dim_customers_current(): + return ( + spark.read.table("dim_customers") + .filter(F.col("__END_AT").isNull()) + .select("customer_id", "customer_name", "email", "phone", "address", + F.col("__START_AT").alias("valid_from")) + ) +``` + +--- + +## Point-in-Time Queries + +State as it existed on a specific date. The inclusive-lower / exclusive-upper boundary matters — get it right or you'll double-count at the seam between versions. + +```sql +-- Products as of 2024-01-01 +CREATE OR REFRESH MATERIALIZED VIEW products_as_of_2024_01_01 AS +SELECT product_id, product_name, price, category, + __START_AT, __END_AT +FROM products_history +WHERE __START_AT <= '2024-01-01' + AND (__END_AT > '2024-01-01' OR __END_AT IS NULL); +``` + +```python +@dp.materialized_view(name="products_as_of_2024_01_01") +def products_as_of_2024_01_01(): + as_of = "2024-01-01" + return ( + spark.read.table("products_history") + .filter(F.col("__START_AT") <= as_of) + .filter((F.col("__END_AT") > as_of) | F.col("__END_AT").isNull()) + ) +``` + +**Boundary convention**: `[__START_AT, __END_AT)` — start is inclusive, end is exclusive. A version with `__END_AT = '2024-01-01'` is *not* the active version on 2024-01-01. + +--- + +## Change Analysis + +### All versions of one entity (history) + +```sql +SELECT customer_id, customer_name, email, phone, + __START_AT, __END_AT, + COALESCE(DATEDIFF(DAY, __START_AT, __END_AT), + DATEDIFF(DAY, __START_AT, CURRENT_TIMESTAMP())) AS days_active +FROM dim_customers +WHERE customer_id = '12345' +ORDER BY __START_AT DESC; +``` + +```python +def customer_history(customer_id: str): + return ( + spark.read.table("dim_customers") + .filter(F.col("customer_id") == customer_id) + .withColumn("days_active", + F.coalesce(F.datediff("__END_AT", "__START_AT"), + F.datediff(F.current_timestamp(), "__START_AT"))) + .orderBy(F.col("__START_AT").desc()) + ) +``` + +### Changes within a time period + +```sql +-- Customers who changed during Q1 2024 (excluding the original version) +SELECT customer_id, customer_name, + __START_AT AS change_timestamp, + 'UPDATE' AS change_type +FROM dim_customers c +WHERE __START_AT BETWEEN '2024-01-01' AND '2024-03-31' + AND __START_AT != ( + SELECT MIN(__START_AT) FROM dim_customers c2 + WHERE c2.customer_id = c.customer_id + ) +ORDER BY __START_AT; +``` + +```python +@dp.materialized_view(name="customer_changes_q1_2024") +def customer_changes_q1_2024(): + history = spark.read.table("dim_customers") + first_seen = (history.groupBy("customer_id") + .agg(F.min("__START_AT").alias("first_start"))) + return ( + history.join(first_seen, "customer_id") + .filter(F.col("__START_AT").between("2024-01-01", "2024-03-31")) + .filter(F.col("__START_AT") != F.col("first_start")) + .select("customer_id", "customer_name", + F.col("__START_AT").alias("change_timestamp"), + F.lit("UPDATE").alias("change_type")) + ) +``` + +--- + +## Joining Facts with Historical Dimensions + +### As-of-transaction-time (canonical) + +For each fact row, pick the dimension version that was active at the transaction's event time. This is the common case for revenue-correct gold tables. + +```sql +CREATE OR REFRESH MATERIALIZED VIEW sales_with_historical_prices AS +SELECT s.sale_id, s.product_id, s.sale_date, s.quantity, + p.product_name, + p.price AS unit_price_at_sale_time, + s.quantity * p.price AS calculated_amount, + p.category +FROM sales_fact s +INNER JOIN products_history p + ON s.product_id = p.product_id + AND s.sale_date >= p.__START_AT + AND (s.sale_date < p.__END_AT OR p.__END_AT IS NULL); +``` + +```python +@dp.materialized_view(name="sales_with_historical_prices") +def sales_with_historical_prices(): + sales = spark.read.table("sales_fact") + products = spark.read.table("products_history") + return ( + sales.join( + products, + (sales.product_id == products.product_id) & + (sales.sale_date >= products.__START_AT) & + ((sales.sale_date < products.__END_AT) | products.__END_AT.isNull()), + "inner", + ) + .select(sales.sale_id, sales.product_id, sales.sale_date, sales.quantity, + products.product_name, + products.price.alias("unit_price_at_sale_time"), + (sales.quantity * products.price).alias("calculated_amount"), + products.category) + ) +``` + +### With the current dimension (ignore history) + +For reports that should always reflect today's attribute values (regardless of when the sale happened), join against the current row only. + +```sql +CREATE OR REFRESH MATERIALIZED VIEW sales_with_current_prices AS +SELECT s.sale_id, s.product_id, s.sale_date, s.quantity, + s.amount AS amount_at_sale, + p.product_name AS current_product_name, + p.price AS current_price +FROM sales_fact s +INNER JOIN products_history p + ON s.product_id = p.product_id + AND p.__END_AT IS NULL; +``` + +```python +@dp.materialized_view(name="sales_with_current_prices") +def sales_with_current_prices(): + sales = spark.read.table("sales_fact") + products_current = spark.read.table("products_history").filter(F.col("__END_AT").isNull()) + return ( + sales.join(products_current, "product_id", "inner") + .select("sale_id", "product_id", "sale_date", "quantity", + sales.amount.alias("amount_at_sale"), + products_current.product_name.alias("current_product_name"), + products_current.price.alias("current_price")) + ) +``` + +**Choosing between the two**: as-of-time for revenue, billing, and audit; current-dim for operational dashboards where attributes are *labels*, not values that drive the math. + +--- + +## Optimization Patterns + +### Pre-filter materialized views + +Querying the full history table for "current" repeatedly is wasteful. Bake the `__END_AT IS NULL` filter into an MV: + +```sql +CREATE OR REFRESH MATERIALIZED VIEW dim_products_current AS +SELECT * FROM products_history WHERE __END_AT IS NULL; + +CREATE OR REFRESH MATERIALIZED VIEW dim_recent_changes AS +SELECT * FROM products_history +WHERE __START_AT >= CURRENT_DATE() - INTERVAL 90 DAYS; + +CREATE OR REFRESH MATERIALIZED VIEW product_change_stats AS +SELECT product_id, + COUNT(*) AS version_count, + MIN(__START_AT) AS first_seen, + MAX(__START_AT) AS last_updated +FROM products_history +GROUP BY product_id; +``` + +```python +@dp.materialized_view(name="dim_products_current") +def dim_products_current(): + return spark.read.table("products_history").filter(F.col("__END_AT").isNull()) +``` + +### Cluster on lookup keys + time + +```sql +CREATE OR REFRESH STREAMING TABLE products_history +CLUSTER BY (product_id, __START_AT) +... +``` + +Clustering on `product_id` accelerates entity lookups; adding `__START_AT` helps point-in-time scans. See [performance.md](performance.md#cluster-key-selection-by-layer) for the full layer-by-layer key guide. + +--- + +## Best Practices + +1. **Filter `__END_AT IS NULL` for "current"** — never compare `__START_AT` against `MAX(__START_AT)` per entity. It's slower and breaks under concurrent updates. +2. **Use inclusive-lower / exclusive-upper** for point-in-time joins. Mismatched boundaries either drop the seam row or double-count it. +3. **Materialize repeated filters.** A `dim_*_current` MV is cheaper than re-filtering the history table on every downstream read. +4. **Make `SEQUENCE BY` high-precision.** Sub-second collisions (multiple changes at the same `updated_at`) cause non-deterministic ordering; prefer microsecond timestamps or compose with a tiebreaker via `STRUCT(timestamp, id)`. +5. **For wide history tables, `TRACK HISTORY ON` only the columns that need versions.** Other columns get Type-1 in-place updates and don't create new history rows. See [auto-cdc-python.md](auto-cdc-python.md) / [auto-cdc-sql.md](auto-cdc-sql.md). + +--- + +## Common Issues + +| Issue | Cause / Fix | +|-------|-------------| +| Multiple rows for the same key | Missing `__END_AT IS NULL` filter. | +| Point-in-time query returns no rows at the boundary | Wrong inclusive/exclusive — use `__START_AT <= D AND (__END_AT > D OR __END_AT IS NULL)`. | +| Point-in-time query double-counts at the boundary | Used `__END_AT >= D` instead of `__END_AT > D`. | +| Slow temporal join | Materialize current-state MV; cluster history on `(entity_key, __START_AT)`. | +| Unexpected duplicates per business key per moment | Multiple changes at the same `sequence_by` value — use a higher-precision sequence column or `STRUCT(ts, tiebreaker)`. | +| `__START_AT` / `__END_AT` columns missing | Source table isn't SCD Type 2 (Type 1 doesn't have temporals). | diff --git a/skills/databricks-pipelines/references/streaming-patterns.md b/skills/databricks-pipelines/references/streaming-patterns.md new file mode 100644 index 0000000..d7275a5 --- /dev/null +++ b/skills/databricks-pipelines/references/streaming-patterns.md @@ -0,0 +1,457 @@ +# Streaming Patterns + +Patterns for streaming pipelines: deduplication, windowed aggregations, late-arriving data, rescue-data quarantine, monitoring lag, and anomaly detection. Examples are shown in both SQL and Python. + +For perf-framed treatment of stream-to-stream joins, see [performance.md](performance.md#join-optimization). For Auto Loader API and options, see [auto-loader.md](auto-loader.md). For Kafka ingestion, see [kafka.md](kafka.md). + +--- + +## Deduplication + +Apply at the bronze → silver transition. Bronze accepts duplicates, silver is clean. + +### By key (keep first) + +```sql +CREATE OR REFRESH STREAMING TABLE silver_events_dedup AS +SELECT event_id, user_id, event_type, event_timestamp, _ingested_at +FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_timestamp) AS rn + FROM STREAM bronze_events +) +WHERE rn = 1; +``` + +```python +from pyspark import pipelines as dp +from pyspark.sql import functions as F +from pyspark.sql.window import Window + +@dp.table(name="silver_events_dedup", cluster_by=["event_date"]) +def silver_events_dedup(): + w = Window.partitionBy("event_id").orderBy("event_timestamp") + return ( + spark.readStream.table("bronze_events") + .withColumn("rn", F.row_number().over(w)) + .filter(F.col("rn") == 1) + .drop("rn") + ) +``` + +### Within a time window (tolerates late arrivals) + +```sql +CREATE OR REFRESH STREAMING TABLE silver_events_dedup AS +SELECT event_id, user_id, event_type, event_timestamp, + MIN(_ingested_at) AS first_seen_at +FROM STREAM bronze_events +GROUP BY event_id, user_id, event_type, event_timestamp, + window(event_timestamp, '1 hour'); +``` + +```python +@dp.table(name="silver_events_dedup") +def silver_events_dedup(): + return ( + spark.readStream.table("bronze_events") + .groupBy("event_id", "user_id", "event_type", "event_timestamp", + F.window("event_timestamp", "1 hour")) + .agg(F.min("_ingested_at").alias("first_seen_at")) + ) +``` + +### Composite key + +```sql +CREATE OR REFRESH STREAMING TABLE silver_transactions_dedup AS +SELECT transaction_id, customer_id, amount, transaction_timestamp, + MIN(_ingested_at) AS _ingested_at +FROM STREAM bronze_transactions +GROUP BY transaction_id, customer_id, amount, transaction_timestamp; +``` + +```python +@dp.table(name="silver_transactions_dedup") +def silver_transactions_dedup(): + return ( + spark.readStream.table("bronze_transactions") + .groupBy("transaction_id", "customer_id", "amount", "transaction_timestamp") + .agg(F.min("_ingested_at").alias("_ingested_at")) + ) +``` + +**Alternative for simple cases**: `SELECT DISTINCT ...` (SQL) or `.dropDuplicates(["event_id"])` (Python). These are fine for low-cardinality dedup but maintain state per unique row. + +### When to use Auto CDC instead + +For dedup with sequenced updates (most-recent-wins, deletes, late corrections), use Auto CDC with SCD Type 1 — see [auto-cdc.md](auto-cdc.md). Manual `ROW_NUMBER`/`GROUP BY` dedup is for append-only streams without semantic updates. + +--- + +## Windowed Aggregations + +### Tumbling windows (non-overlapping, fixed size) + +```sql +CREATE OR REFRESH STREAMING TABLE silver_sensor_5min AS +SELECT sensor_id, + window(event_timestamp, '5 minutes') AS time_window, + AVG(temperature) AS avg_temperature, + MIN(temperature) AS min_temperature, + MAX(temperature) AS max_temperature, + COUNT(*) AS event_count +FROM STREAM bronze_sensor_events +GROUP BY sensor_id, window(event_timestamp, '5 minutes'); +``` + +```python +@dp.table(name="silver_sensor_5min", cluster_by=["sensor_id"]) +def silver_sensor_5min(): + return ( + spark.readStream.table("bronze_sensor_events") + .groupBy("sensor_id", F.window("event_timestamp", "5 minutes")) + .agg(F.avg("temperature").alias("avg_temperature"), + F.min("temperature").alias("min_temperature"), + F.max("temperature").alias("max_temperature"), + F.count("*").alias("event_count")) + ) +``` + +### Multiple window sizes (separate tables per granularity) + +```sql +CREATE OR REFRESH STREAMING TABLE gold_sensor_1min AS +SELECT sensor_id, + window(event_timestamp, '1 minute').start AS window_start, + window(event_timestamp, '1 minute').end AS window_end, + AVG(value) AS avg_value, + COUNT(*) AS event_count +FROM STREAM silver_sensor_data +GROUP BY sensor_id, window(event_timestamp, '1 minute'); + +CREATE OR REFRESH STREAMING TABLE gold_sensor_1hour AS +SELECT sensor_id, + window(event_timestamp, '1 hour').start AS window_start, + AVG(value) AS avg_value, + STDDEV(value) AS stddev_value +FROM STREAM silver_sensor_data +GROUP BY sensor_id, window(event_timestamp, '1 hour'); +``` + +```python +@dp.table(name="gold_sensor_1min") +def gold_sensor_1min(): + return ( + spark.readStream.table("silver_sensor_data") + .groupBy("sensor_id", F.window("event_timestamp", "1 minute")) + .agg(F.avg("value").alias("avg_value"), + F.count("*").alias("event_count")) + .select("sensor_id", + F.col("window.start").alias("window_start"), + F.col("window.end").alias("window_end"), + "avg_value", "event_count") + ) + +@dp.table(name="gold_sensor_1hour") +def gold_sensor_1hour(): + return ( + spark.readStream.table("silver_sensor_data") + .groupBy("sensor_id", F.window("event_timestamp", "1 hour")) + .agg(F.avg("value").alias("avg_value"), + F.stddev("value").alias("stddev_value")) + ) +``` + +### Session windows (inactivity-bounded) + +Group events into sessions terminated by an inactivity gap: + +```sql +CREATE OR REFRESH STREAMING TABLE silver_user_sessions AS +SELECT user_id, + session_window(event_timestamp, '30 minutes') AS session, + MIN(event_timestamp) AS session_start, + MAX(event_timestamp) AS session_end, + COUNT(*) AS event_count, + COLLECT_LIST(event_type) AS event_sequence +FROM STREAM bronze_user_events +GROUP BY user_id, session_window(event_timestamp, '30 minutes'); +``` + +```python +@dp.table(name="silver_user_sessions") +def silver_user_sessions(): + return ( + spark.readStream.table("bronze_user_events") + .groupBy("user_id", F.session_window("event_timestamp", "30 minutes")) + .agg(F.min("event_timestamp").alias("session_start"), + F.max("event_timestamp").alias("session_end"), + F.count("*").alias("event_count"), + F.collect_list("event_type").alias("event_sequence")) + ) +``` + +### Window-size guidance + +| Window | Use case | +|--------|----------| +| 1–5 minutes | Real-time monitoring, alerting | +| 15–60 minutes | Operational dashboards | +| 1–24 hours | Analytical reports | + +Larger windows = less state pressure but stale results. Pick the smallest window that meets the freshness SLO. + +--- + +## Late-Arriving Data + +### Use event time, not processing time, for business logic + +```sql +CREATE OR REFRESH STREAMING TABLE gold_daily_orders AS +SELECT CAST(order_timestamp AS DATE) AS order_date, -- event time + COUNT(*) AS order_count, + SUM(amount) AS total_amount +FROM STREAM silver_orders +GROUP BY CAST(order_timestamp AS DATE); +``` + +```python +@dp.table(name="gold_daily_orders") +def gold_daily_orders(): + return ( + spark.readStream.table("silver_orders") + .groupBy(F.to_date("order_timestamp").alias("order_date")) # event time + .agg(F.count("*").alias("order_count"), + F.sum("amount").alias("total_amount")) + ) +``` + +Keep `_ingested_at` (processing time) in the schema as a debugging field — never the aggregation key. + +--- + +## Rescue-Data Quarantine + +Pattern: route malformed records to a quarantine table so the clean stream stays clean, but no data is silently dropped. Uses Auto Loader's rescued-data column (`_rescued_data`, default name; configurable via `rescuedDataColumn`). + +```sql +-- Bronze: ingest everything, flag rows where Auto Loader rescued bad fields +CREATE OR REFRESH STREAMING TABLE bronze_events AS +SELECT *, + current_timestamp() AS _ingested_at, + _rescued_data IS NOT NULL AS _has_errors +FROM STREAM read_files('/Volumes/cat/sch/raw/events/', format => 'json'); + +-- Quarantine: only the rescued/malformed rows +CREATE OR REFRESH STREAMING TABLE bronze_quarantine AS +SELECT * FROM STREAM bronze_events WHERE _rescued_data IS NOT NULL; + +-- Silver: only the clean rows +CREATE OR REFRESH STREAMING TABLE silver_clean AS +SELECT * FROM STREAM bronze_events WHERE _rescued_data IS NULL; +``` + +```python +@dp.table(name="bronze_events", cluster_by=["ingestion_date"]) +def bronze_events(): + return ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "json") + .option("rescuedDataColumn", "_rescued_data") + .load("/Volumes/cat/sch/raw/events/") + .withColumn("_ingested_at", F.current_timestamp()) + .withColumn("_has_errors", F.col("_rescued_data").isNotNull()) + ) + +@dp.table(name="bronze_quarantine") +def bronze_quarantine(): + return spark.readStream.table("bronze_events").filter("_has_errors = true") + +@dp.table(name="silver_clean") +def silver_clean(): + return spark.readStream.table("bronze_events").filter("_has_errors = false") +``` + +**When to use**: Schema drift on JSON / CSV ingestion, optional fields that arrive late, downstream tables that can't tolerate nulls in known columns. Pair with an alert on `bronze_quarantine` row growth. + +**Alternative**: `@dp.expect_or_drop` / `CONSTRAINT ... ON VIOLATION DROP ROW`. Use expectations when the rule is a value check (`amount > 0`); use rescued-data quarantine when the rule is a schema/parse problem. + +--- + +## Stream-to-Stream Joins (Pattern) + +Always bound the join by event-time interval. Without bounds, state grows unbounded. + +```sql +CREATE OR REFRESH STREAMING TABLE silver_orders_with_payments AS +SELECT o.order_id, o.customer_id, o.order_timestamp, + o.amount AS order_amount, + p.payment_id, p.payment_timestamp, p.payment_method, + p.amount AS payment_amount +FROM STREAM bronze_orders o +INNER JOIN STREAM bronze_payments p + ON o.order_id = p.order_id + AND p.payment_timestamp BETWEEN o.order_timestamp + AND o.order_timestamp + INTERVAL 1 HOUR; +``` + +```python +@dp.table(name="silver_orders_with_payments") +def silver_orders_with_payments(): + orders = spark.readStream.table("bronze_orders") + payments = spark.readStream.table("bronze_payments") + return orders.join( + payments, + (orders.order_id == payments.order_id) & + (payments.payment_timestamp >= orders.order_timestamp) & + (payments.payment_timestamp <= orders.order_timestamp + F.expr("INTERVAL 1 HOUR")), + "inner", + ) +``` + +For stream-to-static (broadcast small dimensions) and perf-tuning, see [performance.md](performance.md#join-optimization). + +--- + +## Incremental Aggregations (Running Totals) + +Streaming `GROUP BY` without windows yields cumulative aggregates per group. Watch state size — see [performance.md](performance.md#state-management-for-streaming). + +```sql +CREATE OR REFRESH STREAMING TABLE silver_customer_running_totals AS +SELECT customer_id, + SUM(amount) AS total_spent, + COUNT(*) AS transaction_count, + MAX(transaction_timestamp) AS last_transaction_at +FROM STREAM bronze_transactions +GROUP BY customer_id; +``` + +```python +@dp.table(name="silver_customer_running_totals") +def silver_customer_running_totals(): + return ( + spark.readStream.table("bronze_transactions") + .groupBy("customer_id") + .agg(F.sum("amount").alias("total_spent"), + F.count("*").alias("transaction_count"), + F.max("transaction_timestamp").alias("last_transaction_at")) + ) +``` + +--- + +## Anomaly Detection + +### Rolling z-score outlier flag + +```sql +CREATE OR REFRESH STREAMING TABLE silver_sensor_with_anomalies AS +SELECT sensor_id, event_timestamp, temperature, + AVG(temperature) OVER w AS rolling_avg_100, + STDDEV(temperature) OVER w AS rolling_stddev_100, + CASE + WHEN temperature > AVG(temperature) OVER w + 3 * STDDEV(temperature) OVER w THEN 'HIGH_OUTLIER' + WHEN temperature < AVG(temperature) OVER w - 3 * STDDEV(temperature) OVER w THEN 'LOW_OUTLIER' + ELSE 'NORMAL' + END AS anomaly_flag +FROM STREAM bronze_sensor_events +WINDOW w AS (PARTITION BY sensor_id ORDER BY event_timestamp + ROWS BETWEEN 100 PRECEDING AND CURRENT ROW); + +-- Route anomalies for alerting +CREATE OR REFRESH STREAMING TABLE silver_sensor_anomalies AS +SELECT * FROM STREAM silver_sensor_with_anomalies +WHERE anomaly_flag IN ('HIGH_OUTLIER', 'LOW_OUTLIER'); +``` + +```python +@dp.table(name="silver_sensor_with_anomalies") +def silver_sensor_with_anomalies(): + w = Window.partitionBy("sensor_id").orderBy("event_timestamp").rowsBetween(-100, 0) + return ( + spark.readStream.table("bronze_sensor_events") + .withColumn("rolling_avg", F.avg("temperature").over(w)) + .withColumn("rolling_stddev", F.stddev("temperature").over(w)) + .withColumn("anomaly_flag", + F.when(F.col("temperature") > F.col("rolling_avg") + 3 * F.col("rolling_stddev"), "HIGH_OUTLIER") + .when(F.col("temperature") < F.col("rolling_avg") - 3 * F.col("rolling_stddev"), "LOW_OUTLIER") + .otherwise("NORMAL")) + ) + +@dp.table(name="silver_sensor_anomalies") +def silver_sensor_anomalies(): + return ( + spark.readStream.table("silver_sensor_with_anomalies") + .filter(F.col("anomaly_flag").isin("HIGH_OUTLIER", "LOW_OUTLIER")) + ) +``` + +### Static threshold filtering + +```sql +CREATE OR REFRESH STREAMING TABLE silver_high_value_transactions AS +SELECT transaction_id, customer_id, amount, transaction_timestamp +FROM STREAM bronze_transactions +WHERE amount > 10000; +``` + +```python +@dp.table(name="silver_high_value_transactions") +def silver_high_value_transactions(): + return (spark.readStream.table("bronze_transactions").filter(F.col("amount") > 10000)) +``` + +--- + +## Monitoring Lag + +Track end-to-end freshness by comparing event-time max to processing time. Useful for alerting on ingestion delays from Kafka, Kinesis, or Auto Loader. + +```sql +CREATE OR REFRESH STREAMING TABLE monitoring_lag AS +SELECT 'kafka_events' AS source, + MAX(kafka_timestamp) AS max_event_timestamp, + current_timestamp() AS processing_timestamp, + unix_timestamp(current_timestamp()) - unix_timestamp(MAX(kafka_timestamp)) AS lag_seconds +FROM STREAM bronze_kafka_events +GROUP BY window(kafka_timestamp, '1 minute'); +``` + +```python +@dp.table(name="monitoring_lag") +def monitoring_lag(): + return ( + spark.readStream.table("bronze_kafka_events") + .groupBy(F.window("kafka_timestamp", "1 minute")) + .agg(F.lit("kafka_events").alias("source"), + F.max("kafka_timestamp").alias("max_event_timestamp"), + F.current_timestamp().alias("processing_timestamp")) + .withColumn("lag_seconds", + F.unix_timestamp("processing_timestamp") - F.unix_timestamp("max_event_timestamp")) + ) +``` + +--- + +## Best Practices + +1. **Use event time, not processing time**, for aggregation keys. +2. **Deduplicate at silver**, not bronze. Bronze is append-only, silver is clean. +3. **Bound state**: time windows, lower cardinality, materialize intermediates — see [performance.md](performance.md#state-management-for-streaming). +4. **Quarantine, don't drop silently** — route bad rows to a side table for observability. +5. **Use Auto CDC for sequenced updates** instead of building dedup with `ROW_NUMBER` — see [auto-cdc.md](auto-cdc.md). + +--- + +## Common Issues + +| Issue | Cause / Fix | +|-------|-------------| +| High memory with windows | Larger windows; reduce group-by cardinality. | +| Duplicate events in output | Add explicit dedup by unique key, or switch to Auto CDC SCD Type 1. | +| Missing late-arriving events | Larger window; check that aggregation uses event time not processing time. | +| Stream-to-stream join empty | Missing or too-narrow time bound on join condition. | +| State growth over time | Add time windows; reduce cardinality; materialize daily then aggregate batch monthly. | +| `bronze_quarantine` empty unexpectedly | `rescuedDataColumn` not enabled; check Auto Loader options. |