diff --git a/manifest.json b/manifest.json index b64dd62..d3f7e77 100644 --- a/manifest.json +++ b/manifest.json @@ -55,8 +55,7 @@ "references/deploy-and-run.md", "references/resource-permissions.md", "references/sdp-pipelines.md" - ], - "base_revision": "e742f36e8ab1" + ] }, "databricks-jobs": { "version": "0.2.0", @@ -104,7 +103,7 @@ ] }, "databricks-pipelines": { - "version": "0.1.1", + "version": "0.3.0", "description": "Databricks Spark Declarative Pipelines (SDP) for ETL and streaming", "repo_dir": "skills", "files": [ @@ -118,11 +117,13 @@ "references/auto-loader-python.md", "references/auto-loader-sql.md", "references/auto-loader.md", + "references/dlt-migration.md", "references/expectations-python.md", "references/expectations-sql.md", "references/expectations.md", "references/foreach-batch-sink-python.md", "references/foreach-batch-sink.md", + "references/kafka.md", "references/materialized-view-python.md", "references/materialized-view-sql.md", "references/materialized-view.md", @@ -133,10 +134,14 @@ "references/options-parquet.md", "references/options-text.md", "references/options-xml.md", + "references/performance.md", + "references/pipeline-configuration.md", "references/python-basics.md", + "references/scd-2-querying.md", "references/sink-python.md", "references/sink.md", "references/sql-basics.md", + "references/streaming-patterns.md", "references/streaming-table-python.md", "references/streaming-table-sql.md", "references/streaming-table.md", @@ -145,9 +150,9 @@ "references/temporary-view.md", "references/view-sql.md", "references/view.md", + "references/workflows.md", "references/write-spark-declarative-pipelines.md" - ], - "base_revision": "5c4b4fb0a82a" + ] }, "databricks-serverless-migration": { "version": "0.1.0", diff --git a/skills/databricks-pipelines/SKILL.md b/skills/databricks-pipelines/SKILL.md index 7f25825..78e05b3 100644 --- a/skills/databricks-pipelines/SKILL.md +++ b/skills/databricks-pipelines/SKILL.md @@ -3,7 +3,7 @@ name: databricks-pipelines description: Develop Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables) on Databricks. Use when building batch or streaming data pipelines with Python or SQL. Invoke BEFORE starting implementation. compatibility: Requires databricks CLI (>= v1.0.0) metadata: - version: "0.1.1" + version: "0.3.0" parent: databricks-core --- @@ -168,19 +168,69 @@ Some features require reading multiple skills together: Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables / DLT) is a framework for building batch and streaming data pipelines. +## Migrating from DLT + +If you have an existing DLT pipeline (`import dlt`, `@dlt.table`, `dlt.read(...)`, `dlt.apply_changes(...)`) and want to move to SDP, see [references/dlt-migration.md](references/dlt-migration.md). It covers both migration paths — DLT Python → SDP Python (`from pyspark import pipelines as dp`) and DLT Python → SDP SQL — with side-by-side conversions for the table decorators, reads, expectations, CDC/SCD, and partitioning → liquid clustering. + +## Choose Your Workflow + +Three project shapes exist — pick before scaffolding: + +| Situation | Workflow | +|-----------|----------| +| New standalone pipeline project with its own bundle | **A. Standalone bundle** | +| Pipeline added to an existing DAB project | **B. Existing bundle** | +| Quick prototyping, no bundle (yet) | **C. Rapid CLI iteration** | + +Default to A for production-bound work and C for exploration. Full details, generated structures, polling patterns, and edit/re-upload flow in [references/workflows.md](references/workflows.md). + +## Language Selection (Python vs SQL) + +Decide before scaffolding — the choice picks template files (`.py` vs `.sql`) and which reference docs apply. Both can coexist, but pick a primary. + +| User signal | Pick | +|-------------|------| +| "Python pipeline", UDF, pandas, ML inference, pyspark | **Python** | +| "SQL pipeline", "SQL files" | **SQL** | +| "Simple pipeline", "create a table", "an aggregation" | **SQL** (simpler) | +| Complex parameterized logic, custom UDFs, ML | **Python** | + +If ambiguous, ask. Stick with the chosen language unless the user explicitly switches. + ## Scaffolding a New Pipeline Project -Use `databricks bundle init` with a config file to scaffold non-interactively. This creates a project in the `/` directory: +The newer `databricks pipelines init` is focused on pipeline projects: + +```bash +databricks pipelines init --output-dir . --config-file init-config.json +``` + +`init-config.json`: + +```json +{ + "project_name": "my_pipeline", + "initial_catalog": "prod_catalog", + "use_personal_schema": "no", + "initial_language": "sql" +} +``` + +The template-based `databricks bundle init lakeflow-pipelines` also works: ```bash databricks bundle init lakeflow-pipelines --config-file <(echo '{"project_name": "my_pipeline", "language": "python", "serverless": "yes"}') --profile < /dev/null ``` +Field constraints: + - `project_name`: letters, numbers, underscores only -- `language`: `python` or `sql`. Ask the user which they prefer: +- `language` / `initial_language`: `python` or `sql` (lowercase) - SQL: Recommended for straightforward transformations (filters, joins, aggregations) - Python: Recommended for complex logic (custom UDFs, ML, advanced processing) +See [references/workflows.md](references/workflows.md) for the full generated structure, `databricks.yml` essentials, and per-target catalog/schema patterns. + After scaffolding, create `CLAUDE.md` and `AGENTS.md` in the project directory. These files are essential to provide agents with guidance on how to work with the project. Use this content: ``` @@ -259,13 +309,25 @@ resources: Detailed reference guides for each pipeline API. **Read the relevant guide before writing pipeline code.** +### Project & Lifecycle + +- [Workflows](references/workflows.md) — Standalone bundle / existing bundle / rapid CLI iteration; language selection; `pipelines init`; start-update + poll-the-update pattern; edit/re-upload/restart flow +- [Pipeline Configuration](references/pipeline-configuration.md) — Full JSON config reference (top-level, clusters, event_log, notifications, configuration, restart_window, environment) + variant snippets (dev mode, non-serverless, continuous, notifications, autoscaling, custom event log, serverless Python deps) + multi-schema patterns + platform constraints +- [Performance Tuning](references/performance.md) — Liquid Clustering by layer (bronze/silver/gold), key-type rules, state-management strategies for streaming, join optimization, pre-aggregation, monitoring +- [Migrating from DLT](references/dlt-migration.md) — Side-by-side conversions (decorators, reads, expectations, CDC/SCD, partitioning → liquid clustering) + +### Datasets, Flows & Quality + - [Write Spark Declarative Pipelines](references/write-spark-declarative-pipelines.md) — Core syntax and rules ([Python](references/python-basics.md), [SQL](references/sql-basics.md)) - [Streaming Tables](references/streaming-table.md) — Continuous data stream processing ([Python](references/streaming-table-python.md), [SQL](references/streaming-table-sql.md)) - [Materialized Views](references/materialized-view.md) — Physically stored query results with incremental refresh ([Python](references/materialized-view-python.md), [SQL](references/materialized-view-sql.md)) - [Views](references/view.md) — Reusable query logic published to Unity Catalog ([SQL](references/view-sql.md)) - [Temporary Views](references/temporary-view.md) — Pipeline-private views ([Python](references/temporary-view-python.md), [SQL](references/temporary-view-sql.md)) - [Auto Loader](references/auto-loader.md) — Incrementally ingest files from cloud storage ([Python](references/auto-loader-python.md), [SQL](references/auto-loader-sql.md)) +- [Kafka Ingestion](references/kafka.md) — Read from Kafka / Event Hubs with JSON parsing, Secrets-based auth - [Auto CDC](references/auto-cdc.md) — Process Change Data Capture feeds, SCD Type 1 & 2 ([Python](references/auto-cdc-python.md), [SQL](references/auto-cdc-sql.md)) +- [SCD Type 2 Querying](references/scd-2-querying.md) — Current-state views, point-in-time queries, joining facts with historical dimensions +- [Streaming Patterns](references/streaming-patterns.md) — Deduplication, windowed aggregations (tumbling/multi-size/session), late-arriving data, rescue-data quarantine, monitoring lag, anomaly detection - [Expectations](references/expectations.md) — Define and enforce data quality constraints ([Python](references/expectations-python.md), [SQL](references/expectations-sql.md)) - [Sinks](references/sink.md) — Write to Kafka, Event Hubs, external Delta tables ([Python](references/sink-python.md)) - [ForEachBatch Sinks](references/foreach-batch-sink.md) — Custom streaming sink with per-batch Python logic ([Python](references/foreach-batch-sink-python.md)) diff --git a/skills/databricks-pipelines/references/auto-cdc.md b/skills/databricks-pipelines/references/auto-cdc.md index 5fad71a..3bdad12 100644 --- a/skills/databricks-pipelines/references/auto-cdc.md +++ b/skills/databricks-pipelines/references/auto-cdc.md @@ -18,4 +18,8 @@ For detailed implementation guides: - **Python**: [auto-cdc-python.md](auto-cdc-python.md) - **SQL**: [auto-cdc-sql.md](auto-cdc-sql.md) +## Reading SCD Type 2 Tables + +For querying the history tables produced by SCD Type 2 (`__START_AT` / `__END_AT`), point-in-time queries, change analysis, and joining facts with historical dimensions, see [scd-2-querying.md](scd-2-querying.md). + **Note**: The API is also known as `applyChanges` in some contexts. diff --git a/skills/databricks-pipelines/references/auto-loader.md b/skills/databricks-pipelines/references/auto-loader.md index c686304..642031b 100644 --- a/skills/databricks-pipelines/references/auto-loader.md +++ b/skills/databricks-pipelines/references/auto-loader.md @@ -19,6 +19,12 @@ For detailed implementation guides: - **Python**: [auto-loader-python.md](auto-loader-python.md) - **SQL**: [auto-loader-sql.md](auto-loader-sql.md) +## Related Patterns + +- [Rescue-data quarantine](streaming-patterns.md#rescue-data-quarantine) — route rows where Auto Loader rescued malformed fields to a side table instead of dropping them. +- [Kafka ingestion](kafka.md) — for message-bus sources (Kafka, Event Hubs). +- [Monitoring lag](streaming-patterns.md#monitoring-lag) — track end-to-end freshness. + ## Format-Specific Options For format-specific configuration options, refer to: diff --git a/skills/databricks-pipelines/references/dlt-migration.md b/skills/databricks-pipelines/references/dlt-migration.md new file mode 100644 index 0000000..dbde0d9 --- /dev/null +++ b/skills/databricks-pipelines/references/dlt-migration.md @@ -0,0 +1,447 @@ +# Migration Guide: DLT to SDP + +Guide for migrating from Delta Live Tables (DLT) to Spark Declarative Pipelines (SDP). + +**Two migration paths:** +1. **DLT Python → SDP Python** (dlt → dp): Same language, new API +2. **DLT Python → SDP SQL**: Change language for simpler pipelines + +--- + +## Migration Path 1: DLT Python → SDP Python (dlt → dp) + +Use this when staying with Python but moving to the modern `pyspark.pipelines` API. + +### Quick Reference + +| Aspect | Legacy (`dlt`) | Modern (`dp`) | +|--------|---------------|----------------| +| **Import** | `import dlt` | `from pyspark import pipelines as dp` | +| **Table decorator** | `@dlt.table()` | `@dp.table()` | +| **Read table** | `dlt.read("table")` | `spark.read.table("table")` | +| **Read stream** | `dlt.read_stream("table")` | `spark.readStream.table("table")` | +| **CDC/SCD** | `dlt.apply_changes()` | `dp.create_auto_cdc_flow()` | +| **Clustering** | `partition_cols=["date"]` | `cluster_by=["date", "col2"]` | + +### Step-by-Step Migration + +#### Step 1: Update Imports + +```python +# Before +import dlt + +# After +from pyspark import pipelines as dp +``` + +#### Step 2: Update Decorators + +```python +# Before +@dlt.table(name="my_table") + +# After +@dp.table(name="my_table") +``` + +#### Step 3: Update Table Reads + +```python +# Before +@dlt.table(name="silver_events") +def silver_events(): + return dlt.read("bronze_events").filter(...) + +# After +@dp.table(name="silver_events") +def silver_events(): + return spark.read.table("bronze_events").filter(...) +``` + +```python +# Before (streaming) +@dlt.table(name="silver_events") +def silver_events(): + return dlt.read_stream("bronze_events").filter(...) + +# After (streaming) +@dp.table(name="silver_events") +def silver_events(): + return spark.readStream.table("bronze_events").filter(...) +``` + +#### Step 4: Update Expectations + +```python +# Before +@dlt.table(name="silver") +@dlt.expect_or_drop("valid_id", "id IS NOT NULL") + +# After (identical syntax, just change dlt → dp) +@dp.table(name="silver") +@dp.expect_or_drop("valid_id", "id IS NOT NULL") +``` + +#### Step 5: Update CDC/SCD Operations + +```python +# Before +dlt.create_streaming_table("customers_history") +dlt.apply_changes( + target="customers_history", + source="customers_cdc", + keys=["customer_id"], + sequence_by="event_timestamp", + stored_as_scd_type="2" +) + +# After +from pyspark.sql.functions import col + +dp.create_streaming_table("customers_history") +dp.create_auto_cdc_flow( + target="customers_history", + source="customers_cdc", + keys=["customer_id"], + sequence_by=col("event_timestamp"), # Note: use col() + stored_as_scd_type=2 # Note: integer, not string +) +``` + +**Key differences:** +- `apply_changes()` → `create_auto_cdc_flow()` +- `sequence_by` takes a Column object (`col("...")`) not a string +- `stored_as_scd_type` is integer `2` for Type 2, string `"1"` for Type 1 + +#### Step 6: Update Clustering (Partitioning → Liquid Clustering) + +```python +# Before (legacy partitioning) +@dlt.table( + name="bronze_events", + partition_cols=["event_date"], + table_properties={"pipelines.autoOptimize.zOrderCols": "event_type"} +) + +# After (Liquid Clustering) +@dp.table( + name="bronze_events", + cluster_by=["event_date", "event_type"] +) +``` + +### Complete Before/After Example + +**Before (DLT):** +```python +import dlt +from pyspark.sql import functions as F + +@dlt.table(name="bronze_orders", partition_cols=["order_date"]) +def bronze_orders(): + return spark.readStream.format("cloudFiles").load("/data/orders") + +@dlt.table(name="silver_orders") +@dlt.expect_or_drop("valid_amount", "amount > 0") +def silver_orders(): + return dlt.read_stream("bronze_orders").filter(F.col("status") == "completed") + +dlt.create_streaming_table("dim_customers") +dlt.apply_changes( + target="dim_customers", + source="customers_cdc", + keys=["customer_id"], + sequence_by="updated_at", + stored_as_scd_type="2" +) +``` + +**After (SDP):** +```python +from pyspark import pipelines as dp +from pyspark.sql import functions as F + +@dp.table(name="bronze_orders", cluster_by=["order_date"]) +def bronze_orders(): + return spark.readStream.format("cloudFiles").load("/data/orders") + +@dp.table(name="silver_orders") +@dp.expect_or_drop("valid_amount", "amount > 0") +def silver_orders(): + return spark.readStream.table("bronze_orders").filter(F.col("status") == "completed") + +dp.create_streaming_table("dim_customers") +dp.create_auto_cdc_flow( + target="dim_customers", + source="customers_cdc", + keys=["customer_id"], + sequence_by=F.col("updated_at"), + stored_as_scd_type=2 +) +``` + +--- + +## Migration Path 2: DLT Python → SDP SQL + +Use this when simplifying pipelines by converting to SQL. + +### Decision Matrix + +| Feature/Pattern | DLT Python | SDP SQL | Recommendation | +|-----------------|------------|---------|----------------| +| Simple transformations | ✓ | ✓ | **Migrate to SQL** | +| Aggregations | ✓ | ✓ | **Migrate to SQL** | +| Filtering, WHERE clauses | ✓ | ✓ | **Migrate to SQL** | +| CASE expressions | ✓ | ✓ | **Migrate to SQL** | +| SCD Type 1/2 | ✓ | ✓ | **Migrate to SQL** (AUTO CDC) | +| Simple joins | ✓ | ✓ | **Migrate to SQL** | +| Auto Loader | ✓ | ✓ | **Migrate to SQL** (read_files) | +| Streaming sources (Kafka) | ✓ | ✓ | **Migrate to SQL** (read_kafka) | +| Complex Python UDFs | ✓ | ❌ | **Stay in Python** | +| External API calls | ✓ | ❌ | **Stay in Python** | +| Custom libraries | ✓ | ❌ | **Stay in Python** | +| ML model inference | ✓ | ❌ | **Stay in Python** | + +**Rule**: If 80%+ is SQL-expressible, migrate to SDP SQL. If heavy Python logic, stay with Python (use modern `dp` API). + +### Side-by-Side Conversions + +#### Basic Streaming Table + +**DLT Python:** +```python +@dlt.table(name="bronze_sales", comment="Raw sales") +def bronze_sales(): + return ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "json") + .load("/Volumes/my_catalog/my_schema/raw/sales") + .withColumn("_ingested_at", F.current_timestamp()) + ) +``` + +**SDP SQL:** +```sql +CREATE OR REFRESH STREAMING TABLE bronze_sales +COMMENT 'Raw sales' +AS +SELECT *, current_timestamp() AS _ingested_at +FROM STREAM read_files('/Volumes/my_catalog/my_schema/raw/sales', format => 'json'); +``` + +#### Filtering and Transformations + +**DLT Python:** +```python +@dlt.table(name="silver_sales") +@dlt.expect_or_drop("valid_amount", "amount > 0") +@dlt.expect_or_drop("valid_sale_id", "sale_id IS NOT NULL") +def silver_sales(): + return ( + dlt.read_stream("bronze_sales") + .withColumn("sale_date", F.to_date("sale_date")) + .withColumn("amount", F.col("amount").cast("decimal(10,2)")) + .select("sale_id", "customer_id", "amount", "sale_date") + ) +``` + +**SDP SQL:** +```sql +CREATE OR REFRESH STREAMING TABLE silver_sales AS +SELECT + sale_id, customer_id, + CAST(amount AS DECIMAL(10,2)) AS amount, + CAST(sale_date AS DATE) AS sale_date +FROM STREAM bronze_sales +WHERE amount > 0 AND sale_id IS NOT NULL; +``` + +#### SCD Type 2 + +**DLT Python:** +```python +dlt.create_streaming_table("customers_history") + +dlt.apply_changes( + target="customers_history", + source="customers_cdc_clean", + keys=["customer_id"], + sequence_by="event_timestamp", + stored_as_scd_type="2", + track_history_column_list=["*"] +) +``` + +**SDP SQL:** +```sql +CREATE OR REFRESH STREAMING TABLE customers_history; + +CREATE FLOW customers_scd2_flow AS +AUTO CDC INTO customers_history +FROM stream(customers_cdc_clean) +KEYS (customer_id) +APPLY AS DELETE WHEN operation = "DELETE" +SEQUENCE BY event_timestamp +COLUMNS * EXCEPT (operation, _ingested_at, _source_file) +STORED AS SCD TYPE 2; +``` + +**Note:** In SQL, put `APPLY AS DELETE WHEN` before `SEQUENCE BY`. Only list columns in `COLUMNS * EXCEPT (...)` that exist in the source. + +#### Joins + +**DLT Python:** +```python +@dlt.table(name="silver_sales_enriched") +def silver_sales_enriched(): + sales = dlt.read_stream("silver_sales") + products = dlt.read("dim_products") + return sales.join(products, "product_id", "left") +``` + +**SDP SQL:** +```sql +CREATE OR REFRESH STREAMING TABLE silver_sales_enriched AS +SELECT s.*, p.product_name, p.category +FROM STREAM silver_sales s +LEFT JOIN dim_products p ON s.product_id = p.product_id; +``` + +### Handling Expectations + +**DLT Python:** +```python +@dlt.expect_or_drop("valid_amount", "amount > 0") +@dlt.expect_or_fail("critical_id", "id IS NOT NULL") +``` + +**SDP SQL - Basic** (equivalent to expect_or_drop): +```sql +WHERE amount > 0 AND id IS NOT NULL +``` + +**SDP SQL - Quarantine Pattern** (for auditing dropped records): +```sql +-- Flag invalid records +CREATE OR REFRESH STREAMING TABLE bronze_data_flagged AS +SELECT *, + CASE WHEN amount <= 0 OR id IS NULL THEN TRUE ELSE FALSE END AS is_invalid +FROM STREAM bronze_data; + +-- Clean for downstream +CREATE OR REFRESH STREAMING TABLE silver_data_clean AS +SELECT * FROM STREAM bronze_data_flagged WHERE NOT is_invalid; + +-- Quarantine for investigation +CREATE OR REFRESH STREAMING TABLE silver_data_quarantine AS +SELECT * FROM STREAM bronze_data_flagged WHERE is_invalid; +``` + +### Handling UDFs + +#### Simple UDFs → SQL CASE + +**DLT Python:** +```python +@F.udf(returnType=StringType()) +def categorize_amount(amount): + if amount > 1000: return "High" + elif amount > 100: return "Medium" + else: return "Low" + +@dlt.table(name="sales_categorized") +def sales_categorized(): + return dlt.read("sales").withColumn("category", categorize_amount(F.col("amount"))) +``` + +**SDP SQL:** +```sql +CREATE OR REFRESH MATERIALIZED VIEW sales_categorized AS +SELECT *, + CASE + WHEN amount > 1000 THEN 'High' + WHEN amount > 100 THEN 'Medium' + ELSE 'Low' + END AS category +FROM sales; +``` + +#### Complex UDFs → Stay in Python + +Keep in Python if: +- Complex conditional logic +- External API calls +- Custom algorithms +- ML inference + +Use modern `dp` API instead of `dlt`. + +--- + +## Migration Process + +### Step 1: Inventory + +Document: +- Number of tables/views +- Python UDFs (simple vs complex) +- External dependencies +- Expectations and quality rules + +### Step 2: Choose Path + +- **80%+ SQL-expressible** → Migrate to SDP SQL +- **Heavy Python logic** → Migrate to SDP Python (`dp` API) +- **Mixed** → Hybrid (SQL for most, Python for complex) + +### Step 3: Migrate by Layer + +1. **Bronze** (ingestion): `cloudFiles` → `read_files()` or keep `cloudFiles` with `dp` +2. **Silver** (cleansing): `dlt.expect*` → WHERE clause or `dp.expect*` +3. **Gold** (aggregations): Usually straightforward +4. **SCD/CDC**: `apply_changes` → AUTO CDC or `create_auto_cdc_flow` + +### Step 4: Test + +- Run both pipelines in parallel +- Compare outputs for correctness +- Validate performance +- Check quality metrics + +--- + +## When NOT to Migrate + +**Stay with current approach if:** +1. Pipeline works well and team is comfortable +2. Heavy Python UDF usage (>30% of logic) +3. External API calls required +4. Custom ML model inference +5. Complex stateful operations not expressible in SQL +6. Limited time/resources for migration + +**Key**: DLT and SDP are both fully supported. Migrate for simplicity or new features, not necessity. + +--- + +## Common Issues + +| Issue | Solution | +|-------|----------| +| `sequence_by` type error | Use `col("column")` not string in `dp.create_auto_cdc_flow()` | +| UDF doesn't translate | Keep in Python or refactor with SQL built-ins | +| Expectations differ | Use quarantine pattern to audit dropped records | +| Performance degradation | Use `CLUSTER BY` for Liquid Clustering | +| Schema evolution different | Use `mode => 'PERMISSIVE'` in `read_files()` | +| AUTO CDC parse error | Put `APPLY AS DELETE WHEN` before `SEQUENCE BY` | + +--- + +## Related Documentation + +- **[python/1-syntax-basics.md](python/1-syntax-basics.md)** - Modern `dp` API reference +- **[python/4-cdc-patterns.md](python/4-cdc-patterns.md)** - Python CDC patterns +- **[sql/4-cdc-patterns.md](sql/4-cdc-patterns.md)** - SQL CDC patterns +- **[SKILL.md](../SKILL.md)** - Main skill entry point diff --git a/skills/databricks-pipelines/references/kafka.md b/skills/databricks-pipelines/references/kafka.md new file mode 100644 index 0000000..5e776b9 --- /dev/null +++ b/skills/databricks-pipelines/references/kafka.md @@ -0,0 +1,229 @@ +# Kafka Ingestion + +Ingest from Apache Kafka into streaming tables. Examples in both Python (`spark.readStream.format("kafka")`) and SQL (`read_kafka()`). Same pattern works for Azure Event Hubs via the Kafka protocol — see [Event Hubs](#event-hubs) below. + +For Kinesis, Pub/Sub, and Pulsar, use the analogous `read_kinesis`, `read_pubsub`, `read_pulsar` functions / Spark formats — same overall shape as below. + +--- + +## Basic Read + +Kafka returns rows with binary `key` and `value` columns plus `topic`, `partition`, `offset`, `timestamp`. Cast to strings (or `from_json` / `from_avro`) downstream. + +```sql +CREATE OR REFRESH STREAMING TABLE bronze_kafka_events AS +SELECT CAST(key AS STRING) AS event_key, + CAST(value AS STRING) AS event_value, + topic, partition, offset, + timestamp AS kafka_timestamp, + current_timestamp() AS _ingested_at +FROM read_kafka( + bootstrapServers => '${kafka_brokers}', + subscribe => 'events-topic', + startingOffsets => 'latest' +); +``` + +```python +from pyspark import pipelines as dp +from pyspark.sql import functions as F + +@dp.table(name="bronze_kafka_events") +def bronze_kafka_events(): + kafka_brokers = spark.conf.get("kafka_brokers") + return ( + spark.readStream.format("kafka") + .option("kafka.bootstrap.servers", kafka_brokers) + .option("subscribe", "events-topic") + .option("startingOffsets", "latest") + .load() + .selectExpr( + "CAST(key AS STRING) AS event_key", + "CAST(value AS STRING) AS event_value", + "topic", "partition", "offset", + "timestamp AS kafka_timestamp") + .withColumn("_ingested_at", F.current_timestamp()) + ) +``` + +**Documentation**: [`read_kafka` function reference](https://docs.databricks.com/aws/en/sql/language-manual/functions/read_kafka). + +### Common options + +| Option | Purpose | +|--------|---------| +| `bootstrapServers` / `kafka.bootstrap.servers` | Broker list. Use a pipeline config var, not a literal. | +| `subscribe` | Topic name or comma-separated list. | +| `subscribePattern` | Regex over topic names (alternative to `subscribe`). | +| `startingOffsets` | `"latest"`, `"earliest"`, or JSON per-partition offsets. | +| `endingOffsets` | Only for batch reads — ignored in streaming. | +| `maxOffsetsPerTrigger` | Throttle per micro-batch. | +| `failOnDataLoss` | Default `true`. Set `false` only when you accept gaps. | + +--- + +## Parse JSON Payloads + +`value` is a binary/string blob. Extract structured columns with `from_json` (SQL/Python) against an explicit schema. + +```sql +CREATE OR REFRESH STREAMING TABLE silver_events AS +SELECT data.*, kafka_timestamp, _ingested_at +FROM ( + SELECT from_json(event_value, + 'event_id STRING, event_type STRING, timestamp TIMESTAMP') AS data, + kafka_timestamp, _ingested_at + FROM STREAM bronze_kafka_events +); +``` + +```python +from pyspark.sql.types import StructType, StructField, StringType, TimestampType + +event_schema = StructType([ + StructField("event_id", StringType()), + StructField("event_type", StringType()), + StructField("timestamp", TimestampType()), +]) + +@dp.table(name="silver_events") +def silver_events(): + return ( + spark.readStream.table("bronze_kafka_events") + .withColumn("data", F.from_json("event_value", event_schema)) + .select("data.*", "kafka_timestamp", "_ingested_at") + ) +``` + +**Schema hygiene**: keep the schema in code (Python `StructType` or SQL string), versioned alongside the pipeline. Inferring JSON schema from a streaming Kafka source is not supported — the schema must be explicit. + +For Avro / Protobuf payloads, swap `from_json` for `from_avro` / `from_protobuf` (with Schema Registry config). Same overall pattern. + +--- + +## Authentication + +### Databricks Secrets + +Don't put credentials in code or pipeline config literally. Use `{{secrets/scope/key}}` interpolation. + +```sql +-- SASL/PLAIN +FROM read_kafka( + bootstrapServers => '${kafka_brokers}', + subscribe => 'events-topic', + `kafka.security.protocol` => 'SASL_SSL', + `kafka.sasl.mechanism` => 'PLAIN', + `kafka.sasl.jaas.config` => + 'org.apache.kafka.common.security.plain.PlainLoginModule required ' || + 'username="{{secrets/kafka/username}}" ' || + 'password="{{secrets/kafka/password}}";' +); +``` + +```python +@dp.table(name="bronze_kafka_authenticated") +def bronze_kafka_authenticated(): + username = dbutils.secrets.get(scope="kafka", key="username") + password = dbutils.secrets.get(scope="kafka", key="password") + return ( + spark.readStream.format("kafka") + .option("kafka.bootstrap.servers", spark.conf.get("kafka_brokers")) + .option("subscribe", "events-topic") + .option("kafka.security.protocol", "SASL_SSL") + .option("kafka.sasl.mechanism", "PLAIN") + .option("kafka.sasl.jaas.config", + f'org.apache.kafka.common.security.plain.PlainLoginModule required ' + f'username="{username}" password="{password}";') + .load() + ) +``` + +### TLS / mTLS + +For mTLS, additional `kafka.ssl.truststore.*` and `kafka.ssl.keystore.*` options are required. Truststore/keystore files typically come from Unity Catalog volumes; pass file paths via pipeline config. + +--- + +## Event Hubs (via Kafka protocol) + +Azure Event Hubs speaks the Kafka protocol on port 9093. Use the same Kafka source — only the connection string changes. + +```sql +FROM read_kafka( + bootstrapServers => '.servicebus.windows.net:9093', + subscribe => '', + `kafka.security.protocol` => 'SASL_SSL', + `kafka.sasl.mechanism` => 'PLAIN', + `kafka.sasl.jaas.config` => + 'org.apache.kafka.common.security.plain.PlainLoginModule required ' || + 'username="$ConnectionString" ' || + 'password="{{secrets/eventhub/connection-string}}";' +); +``` + +```python +@dp.table(name="bronze_eventhub_events") +def bronze_eventhub_events(): + conn_str = dbutils.secrets.get(scope="eventhub", key="connection-string") + return ( + spark.readStream.format("kafka") + .option("kafka.bootstrap.servers", ".servicebus.windows.net:9093") + .option("subscribe", "") + .option("kafka.security.protocol", "SASL_SSL") + .option("kafka.sasl.mechanism", "PLAIN") + .option("kafka.sasl.jaas.config", + 'org.apache.kafka.common.security.plain.PlainLoginModule required ' + f'username="$ConnectionString" password="{conn_str}";') + .load() + ) +``` + +The username is the literal string `$ConnectionString` and the password is the namespace-level or entity-level connection string (with `SharedAccessKey=…`). + +--- + +## Pipeline Configuration + +Pass Kafka brokers, topics, and consumer-group identity through pipeline configuration so dev/prod can differ without code changes. + +```yaml +# In resources/.pipeline.yml +resources: + pipelines: + my_pipeline: + ... + configuration: + kafka_brokers: "broker-1:9092,broker-2:9092,broker-3:9092" + kafka_topic: "events-topic" +``` + +Read in code with `spark.conf.get("kafka_brokers")` (Python) or `${kafka_brokers}` (SQL). + +--- + +## Writing to Kafka (Sinks) + +Sinks are Python-only. Write a payload to Kafka by creating a sink with `format="kafka"` and appending via `@dp.append_flow`. The `value` column is mandatory — use `to_json(struct(*))` to serialize the row. See [sink.md](sink.md) and [sink-python.md](sink-python.md). + +--- + +## Best Practices + +1. **Always cast `value` to a usable type** (`STRING`, `BINARY`) and parse with `from_json` / `from_avro` against an explicit schema. Don't carry `value` as bytes downstream. +2. **Add `_ingested_at`** for lag monitoring — see [streaming-patterns.md](streaming-patterns.md#monitoring-lag). +3. **Tune `maxOffsetsPerTrigger`** if downstream operations are bottlenecking. +4. **Don't set `failOnDataLoss = false`** unless you genuinely accept gaps. The default protects against retention-window data loss. +5. **Use the parent `databricks-core` skill** for secret-scope management. + +--- + +## Common Issues + +| Issue | Fix | +|-------|-----| +| `Unable to find Kafka source` | Confirm `format("kafka")` (Python) / `read_kafka` (SQL) and that the cluster has Kafka client libraries (default on serverless / DBR ML / standard runtimes). | +| `Connection refused` / SSL handshake | Verify `bootstrapServers` reachability and `kafka.security.protocol`. | +| Schema for `value` doesn't match | `from_json` returns `NULL` on parse failure — add a quarantine fanout on `data IS NULL` similar to [rescue-data quarantine](streaming-patterns.md#rescue-data-quarantine). | +| Increasing consumer lag | Bottleneck downstream — see [streaming-patterns.md](streaming-patterns.md#monitoring-lag) for lag table; tune cluster size / `maxOffsetsPerTrigger`. | +| `failOnDataLoss` error after a long pause | Kafka topic retention expired the offset checkpoint. Reset the pipeline (full refresh) or start from `earliest`. | diff --git a/skills/databricks-pipelines/references/performance.md b/skills/databricks-pipelines/references/performance.md new file mode 100644 index 0000000..cf82997 --- /dev/null +++ b/skills/databricks-pipelines/references/performance.md @@ -0,0 +1,490 @@ +# Performance Tuning + +Performance patterns for Spark Declarative Pipelines: Liquid Clustering, state management for streaming, join strategy, query optimization, and pre-aggregation. Examples are shown in both SQL and Python. + +--- + +## Liquid Clustering + +**Recommended** for data layout. Replaces `PARTITION BY` + `ZORDER`. Adaptive, multi-dimensional, self-optimizing — no more manual `OPTIMIZE`. + +### Basic syntax + +```sql +CREATE OR REFRESH STREAMING TABLE bronze_events +CLUSTER BY (event_type, event_date) +AS +SELECT *, current_timestamp() AS _ingested_at +FROM STREAM read_files('/Volumes/cat/sch/raw/events/', format => 'json'); +``` + +```python +@dp.table(cluster_by=["event_type", "event_date"]) +def bronze_events(): + return spark.readStream.format("cloudFiles").load("/Volumes/cat/sch/raw/events/") +``` + +### Automatic key selection + +```sql +CLUSTER BY (AUTO) +``` + +```python +cluster_by=["AUTO"] +``` + +Use `AUTO` while learning the workload, prototyping, or when access patterns are unclear. Pick keys manually for production once query patterns are stable. + +### Cluster key data types + +**Cluster keys must be numeric, string, date, or timestamp.** `BOOLEAN`, `ARRAY`, `MAP`, `STRUCT`, `BINARY` are rejected at runtime with `DELTA_CLUSTERING_COLUMNS_DATATYPE_NOT_SUPPORTED`. Low-cardinality flags also don't benefit from clustering — leave them out. + +### Cluster key selection by layer + +| Layer | Good keys | Rationale | +|-------|-----------|-----------| +| **Bronze** | `event_type`, `ingestion_date` | Filter by type for processing, by date for incremental loads. | +| **Silver** | `primary_key`, `business_date` | Entity lookups + time-range queries. | +| **Gold** | aggregation dimensions | Dashboard filters. | + +**Rules of thumb**: +- First key: most-selective filter (e.g. `customer_id`). +- Second key: next-most-common filter (e.g. date). +- Order matters. Most-selective first. +- Limit to **4 keys** — diminishing returns beyond that. +- Use `AUTO` if unsure. + +### Bronze example + +```sql +CREATE OR REFRESH STREAMING TABLE bronze_events +CLUSTER BY (event_type, ingestion_date) +TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true') +AS +SELECT *, + current_timestamp() AS _ingested_at, + CAST(current_date() AS DATE) AS ingestion_date +FROM STREAM read_files('/Volumes/cat/sch/raw/events/', format => 'json'); +``` + +```python +@dp.table( + name="bronze_events", + cluster_by=["event_type", "ingestion_date"], + table_properties={"delta.autoOptimize.optimizeWrite": "true"}, +) +def bronze_events(): + return ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "json") + .load("/Volumes/cat/sch/raw/events/") + .withColumn("_ingested_at", F.current_timestamp()) + .withColumn("ingestion_date", F.current_date()) + ) +``` + +### Silver example (clustering for joins + time filters) + +```sql +CREATE OR REFRESH STREAMING TABLE silver_orders +CLUSTER BY (customer_id, order_date) +AS +SELECT order_id, customer_id, product_id, + CAST(amount AS DECIMAL(10,2)) AS amount, -- DECIMAL for monetary + CAST(order_timestamp AS DATE) AS order_date, + order_timestamp +FROM STREAM bronze_orders; +``` + +```python +@dp.table(name="silver_orders", cluster_by=["customer_id", "order_date"]) +def silver_orders(): + return ( + spark.readStream.table("bronze_orders") + .withColumn("order_date", F.to_date("order_timestamp")) + .select("order_id", "customer_id", "product_id", "amount", "order_date") + ) +``` + +### Gold example (clustering on aggregation dimensions) + +```sql +CREATE OR REFRESH MATERIALIZED VIEW gold_sales_summary +CLUSTER BY (product_category, year_month) +AS +SELECT product_category, + DATE_FORMAT(order_date, 'yyyy-MM') AS year_month, + SUM(amount) AS total_sales, + COUNT(*) AS transaction_count, + AVG(amount) AS avg_order_value +FROM silver_orders +GROUP BY product_category, DATE_FORMAT(order_date, 'yyyy-MM'); +``` + +```python +@dp.materialized_view(name="gold_sales_summary", cluster_by=["product_category", "year_month"]) +def gold_sales_summary(): + return ( + spark.read.table("silver_orders") + .withColumn("year_month", F.date_format("order_date", "yyyy-MM")) + .groupBy("product_category", "year_month") + .agg(F.sum("amount").alias("total_sales"), + F.count("*").alias("transaction_count"), + F.avg("amount").alias("avg_order_value")) + ) +``` + +### Migrating from `PARTITION BY` + `ZORDER` + +Before (legacy): + +```sql +CREATE OR REFRESH STREAMING TABLE events +PARTITIONED BY (date DATE) +TBLPROPERTIES ('pipelines.autoOptimize.zOrderCols' = 'user_id,event_type') +AS SELECT ...; +``` + +After: + +```sql +CREATE OR REFRESH STREAMING TABLE events +CLUSTER BY (date, user_id, event_type) +AS SELECT ...; +``` + +Typical wins: 20–50% query improvement, no small-file problem, automatic optimization, no manual `OPTIMIZE` job. + +**Keep `PARTITION BY` only for**: regulatory requirements (physical separation), data-lifecycle (need to `DROP` partitions for retention), DBR < 13.3 compatibility, or existing huge tables where migration cost > benefit. + +--- + +## Table Properties + +### Auto-optimize + +```sql +TBLPROPERTIES ( + 'delta.autoOptimize.optimizeWrite' = 'true', + 'delta.autoOptimize.autoCompact' = 'true' +) +``` + +```python +table_properties={ + "delta.autoOptimize.optimizeWrite": "true", + "delta.autoOptimize.autoCompact": "true", +} +``` + +### Change Data Feed + +```sql +TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true') +``` + +Enable when downstream systems need efficient change tracking. + +### Retention (high-volume tables) + +```sql +TBLPROPERTIES ( + 'delta.logRetentionDuration' = '7 days', + 'delta.deletedFileRetentionDuration' = '7 days' +) +``` + +Use for high-volume tables to reduce storage cost. Be careful: short retention windows break time-travel queries beyond the window. + +--- + +## Materialized View Refresh + +```sql +-- Near-real-time +CREATE OR REFRESH MATERIALIZED VIEW gold_live_metrics +REFRESH EVERY 5 MINUTES +AS SELECT metric_name, AVG(metric_value) AS avg_value, MAX(last_updated) AS freshness + FROM silver_metrics GROUP BY metric_name; + +-- Daily +CREATE OR REFRESH MATERIALIZED VIEW gold_daily_summary +REFRESH EVERY 1 DAY +AS SELECT report_date, SUM(amount) AS total_amount + FROM silver_sales GROUP BY report_date; +``` + +### Incremental refresh + +MVs use incremental refresh automatically when possible. Requirements: +- Source has Delta row tracking enabled. +- No row-level filters. +- Aggregation/expression pattern is supported. +- **Serverless pipeline.** Incremental refresh for aggregations is a serverless feature. + +--- + +## State Management for Streaming + +Higher cardinality → more state. Watch the combinations in `GROUP BY`. + +```sql +-- High state: every unique combination creates state +SELECT user_id, product_id, session_id, COUNT(*) AS events +FROM STREAM bronze_events +GROUP BY user_id, product_id, session_id; -- 1M × 10K × 100M — massive +``` + +### Strategy 1: reduce cardinality + +```sql +-- 100 categories instead of 10K products +SELECT user_id, product_category, DATE(event_time) AS event_date, COUNT(*) AS events +FROM STREAM bronze_events +GROUP BY user_id, product_category, DATE(event_time); +``` + +```python +@dp.table(name="user_category_stats") +def user_category_stats(): + return ( + spark.readStream.table("bronze_events") + .groupBy("user_id", "product_category", + F.to_date("event_time").alias("event_date")) + .agg(F.count("*").alias("events")) + ) +``` + +### Strategy 2: use time windows + +```sql +SELECT user_id, window(event_time, '1 hour') AS time_window, COUNT(*) AS events +FROM STREAM bronze_events +GROUP BY user_id, window(event_time, '1 hour'); +``` + +```python +@dp.table(name="user_hourly_stats") +def user_hourly_stats(): + return ( + spark.readStream.table("bronze_events") + .groupBy("user_id", F.window("event_time", "1 hour")) + .agg(F.count("*").alias("events")) + ) +``` + +### Strategy 3: materialize intermediates (move state to batch) + +```sql +-- Streaming aggregation (maintains state) +CREATE OR REFRESH STREAMING TABLE user_daily_stats AS +SELECT user_id, DATE(event_time) AS event_date, COUNT(*) AS event_count +FROM STREAM bronze_events +GROUP BY user_id, DATE(event_time); + +-- Batch aggregation on top (no streaming state) +CREATE OR REFRESH MATERIALIZED VIEW user_monthly_stats AS +SELECT user_id, DATE_TRUNC('month', event_date) AS month, SUM(event_count) AS total_events +FROM user_daily_stats +GROUP BY user_id, DATE_TRUNC('month', event_date); +``` + +--- + +## Join Optimization + +### Stream-to-static (efficient) + +```sql +-- Small static dimension joined to large streaming fact +CREATE OR REFRESH STREAMING TABLE sales_enriched AS +SELECT s.sale_id, s.product_id, s.amount, p.product_name, p.category +FROM STREAM bronze_sales s +LEFT JOIN dim_products p ON s.product_id = p.product_id; +``` + +```python +@dp.table(name="sales_enriched") +def sales_enriched(): + sales = spark.readStream.table("bronze_sales") + products = spark.read.table("dim_products") # static, broadcastable + return sales.join(products, "product_id", "left") \ + .select("sale_id", "product_id", "amount", "product_name", "category") +``` + +**Rule**: keep static dimensions small (< 10K rows) so they broadcast. + +### Stream-to-stream (stateful, time-bounded) + +```sql +-- Time bounds limit state retention +CREATE OR REFRESH STREAMING TABLE orders_with_payments AS +SELECT o.order_id, o.amount AS order_amount, p.payment_id, p.amount AS payment_amount +FROM STREAM bronze_orders o +INNER JOIN STREAM bronze_payments p + ON o.order_id = p.order_id + AND p.payment_time BETWEEN o.order_time AND o.order_time + INTERVAL 1 HOUR; +``` + +```python +@dp.table(name="orders_with_payments") +def orders_with_payments(): + orders = spark.readStream.table("bronze_orders") + payments = spark.readStream.table("bronze_payments") + return orders.join( + payments, + (orders.order_id == payments.order_id) & + (payments.payment_time >= orders.order_time) & + (payments.payment_time <= orders.order_time + F.expr("INTERVAL 1 HOUR")), + "inner", + ) +``` + +Without time bounds, stream-to-stream state grows unbounded. + +--- + +## Query Optimization + +### Filter early + +```sql +-- Filter at source +CREATE OR REFRESH STREAMING TABLE silver_recent AS +SELECT * +FROM STREAM bronze_events +WHERE event_date >= CURRENT_DATE() - INTERVAL 7 DAYS; +``` + +```python +@dp.table(name="silver_recent") +def silver_recent(): + return (spark.readStream.table("bronze_events") + .filter(F.col("event_date") >= F.current_date() - 7)) +``` + +Pushing filters into the streaming read keeps downstream MV inputs small. The anti-pattern is wide-open silver tables filtered later in gold MVs — every row is processed twice. + +### Select specific columns + +Skip `SELECT *` once schema is stable. Narrowed projections enable column pruning in Delta and shrink wire/state size for stateful operations. + +--- + +## Pre-Aggregation + +When the same coarse aggregation is queried frequently, materialize it. Querying the MV is far cheaper than re-aggregating the underlying table. + +```sql +CREATE OR REFRESH MATERIALIZED VIEW orders_monthly AS +SELECT customer_id, YEAR(order_date) AS year, MONTH(order_date) AS month, + SUM(amount) AS total +FROM large_orders_table +GROUP BY customer_id, YEAR(order_date), MONTH(order_date); + +-- Query the MV directly +SELECT * FROM orders_monthly WHERE year = 2024; +``` + +```python +@dp.materialized_view(name="orders_monthly") +def orders_monthly(): + return (spark.read.table("large_orders_table") + .groupBy("customer_id", + F.year("order_date").alias("year"), + F.month("order_date").alias("month")) + .agg(F.sum("amount").alias("total"))) +``` + +--- + +## Compute Configuration + +| Aspect | Serverless | Classic | +|--------|-----------|---------| +| Startup | Seconds | Minutes | +| Scaling | Automatic, instant | Manual / autoscale | +| Cost | Pay-per-use | Pay for cluster time | +| Best for | Variable / dev / test / most prod | Steady, very long-running workloads with special requirements | + +**Default to serverless.** Switch to classic only when R, Spark RDD APIs, JAR/Maven libraries, or other serverless-incompatible features are required — see [pipeline-configuration.md](pipeline-configuration.md#serverless-limitations-force-classic-clusters). + +--- + +## Monitoring Freshness + +```sql +SELECT table_name, + MAX(event_timestamp) AS latest_event, + CURRENT_TIMESTAMP() AS now, + TIMESTAMPDIFF(MINUTE, MAX(event_timestamp), CURRENT_TIMESTAMP()) AS lag_minutes +FROM pipeline_monitoring.table_metrics +GROUP BY table_name; +``` + +Watch for: + +1. Slow streaming tables (high processing lag). +2. Large state operations (high memory). +3. Expensive joins (long batch durations). +4. Small-file accumulation (raise auto-optimize, check write patterns). + +--- + +## Complete Example (Python) + +```python +from pyspark import pipelines as dp +from pyspark.sql import functions as F + +@dp.table( + name="bronze_orders", + cluster_by=["order_date"], + table_properties={ + "delta.autoOptimize.optimizeWrite": "true", + "delta.autoOptimize.autoCompact": "true", + }, +) +def bronze_orders(): + return ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "json") + .load("/Volumes/cat/sch/raw/orders/") + .withColumn("_ingested_at", F.current_timestamp()) + .withColumn("order_date", F.to_date("order_timestamp")) + ) + +@dp.table(name="silver_orders", cluster_by=["customer_id", "order_date"]) +@dp.expect_or_drop("valid_amount", "amount > 0") +def silver_orders(): + return ( + spark.readStream.table("bronze_orders") + .filter(F.col("order_date") >= F.current_date() - 90) # filter early + .withColumn("amount", F.col("amount").cast("decimal(10,2)")) + .select("order_id", "customer_id", "amount", "order_date") + ) + +@dp.materialized_view(name="gold_daily_revenue", cluster_by=["order_date"]) +def gold_daily_revenue(): + return ( + spark.read.table("silver_orders") + .groupBy("order_date") + .agg(F.sum("amount").alias("total_revenue"), + F.count("order_id").alias("order_count"), + F.countDistinct("customer_id").alias("unique_customers")) + ) +``` + +--- + +## Common Issues + +| Issue | Cause / Fix | +|-------|-------------| +| Pipeline running slowly | Check clustering keys, state size, join patterns. | +| High memory usage | Unbounded state — add time windows, reduce cardinality. | +| Many small files | Enable auto-optimize table properties. | +| Expensive queries on large tables | Add clustering on filter columns, build pre-aggregated MVs. | +| MV refresh slow | Enable row tracking on source, verify the refresh is actually incremental. | +| `DELTA_CLUSTERING_COLUMNS_DATATYPE_NOT_SUPPORTED` | A cluster key has an unsupported type (BOOLEAN / complex). Replace with a numeric / string / date / timestamp column. | diff --git a/skills/databricks-pipelines/references/pipeline-configuration.md b/skills/databricks-pipelines/references/pipeline-configuration.md new file mode 100644 index 0000000..f102e57 --- /dev/null +++ b/skills/databricks-pipelines/references/pipeline-configuration.md @@ -0,0 +1,351 @@ +# Pipeline Configuration + +JSON field reference for `databricks pipelines create --json '{...}'` and `databricks pipelines update --json '{...}'`, plus variant snippets for common configurations. + +Defaults to **serverless + Unity Catalog**. Don't set `serverless: false` unless the user explicitly needs R, Spark RDD APIs, or JAR / Maven libraries. + +## Canonical Create + +```bash +databricks pipelines create --json '{ + "name": "my_pipeline", + "catalog": "my_catalog", + "schema": "my_schema", + "serverless": true, + "continuous": false, + "channel": "PREVIEW", + "libraries": [{"glob": {"include": "/Workspace/Users//my_pipeline/**"}}] +}' +``` + +**Always pass `"continuous": false` explicitly.** A continuous pipeline auto-restarts failed updates forever (`cause: RETRY_ON_FAILURE`), burning serverless cost and trapping polling loops. Only set `true` when the user explicitly asks for an always-on streaming pipeline. + +The variant blocks below show only the **deltas** to add or change — don't re-paste the whole JSON. + +--- + +## Top-Level Fields + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `serverless` | bool | `true` | Serverless compute. `false` requires `clusters`. | +| `continuous` | bool | `false` | `true` = always running. `false` = triggered runs. | +| `development` | bool | `false` | Dev mode: faster startup, relaxed validation, no retries. | +| `photon` | bool | `false` | Photon vectorized engine. | +| `edition` | str | `"CORE"` | `"CORE"`, `"PRO"`, `"ADVANCED"`. CDC requires `"ADVANCED"`. | +| `channel` | str | `"CURRENT"` | `"CURRENT"` (stable) or `"PREVIEW"` (latest features). | +| `clusters` | list | `[]` | Cluster configs. Required if `serverless: false`. | +| `configuration` | dict | `{}` | Spark/pipeline config key-value (all values strings). | +| `tags` | dict | `{}` | Metadata tags (max 25). | +| `event_log` | dict | auto | Custom event log table location. | +| `notifications` | list | `[]` | Email/webhook alerts. | +| `allow_duplicate_names` | bool | `false` | Allow multiple pipelines with the same name. | +| `budget_policy_id` | str | — | Budget policy for cost tracking. | +| `storage` | str | — | DBFS root (legacy — use Unity Catalog). | +| `target` | str | — | **Deprecated** — use `schema`. | +| `dry_run` | bool | `false` | Validate without creating (create only). | +| `run_as` | dict | — | Run as specific user / service principal. | +| `restart_window` | dict | — | Maintenance window for continuous-pipeline restarts. | +| `filters` | dict | — | Include/exclude specific paths. | +| `trigger` | dict | — | **Deprecated** — use `continuous`. | +| `deployment` | dict | — | `BUNDLE` (DABs) vs `DEFAULT`. | +| `environment` | dict | — | Python pip deps for serverless. | +| `gateway_definition` | dict | — | CDC gateway pipeline config. | +| `ingestion_definition` | dict | — | Managed ingestion (Salesforce, Workday, etc.). | +| `usage_policy_id` | str | — | Usage policy. | + +### Edition Comparison + +| Feature | CORE | PRO | ADVANCED | +|---------|------|-----|----------| +| Streaming tables | ✓ | ✓ | ✓ | +| Materialized views | ✓ | ✓ | ✓ | +| Expectations | ✓ | ✓ | ✓ | +| CDC | — | — | ✓ | +| SCD Type 1/2 | — | — | ✓ | + +--- + +## `clusters[]` — Classic Cluster Config + +Required when `serverless: false`. Each cluster object: + +| Field | Type | Description | +|-------|------|-------------| +| `label` | str | **Required**. `"default"` (main) or `"maintenance"`. | +| `num_workers` | int | Fixed workers (mutually exclusive with `autoscale`). | +| `autoscale` | dict | `{"min_workers": N, "max_workers": N, "mode": "ENHANCED"}` — `"ENHANCED"` recommended. | +| `node_type_id` | str | Instance type (e.g. `"i3.xlarge"`). | +| `driver_node_type_id` | str | Defaults to `node_type_id`. | +| `instance_pool_id` | str | Faster startup via pool. | +| `driver_instance_pool_id` | str | Pool for driver. | +| `spark_conf` | dict | Per-cluster Spark config. | +| `spark_env_vars` | dict | Env vars. | +| `custom_tags` | dict | Cloud resource tags. | +| `init_scripts` | list | Init scripts. | +| `aws_attributes` | dict | e.g. `{"availability": "SPOT", "zone_id": "us-west-2a"}`. | +| `azure_attributes` | dict | e.g. `{"availability": "SPOT_AZURE"}`. | +| `gcp_attributes` | dict | GCP-specific. | + +--- + +## `event_log` — Custom Event Log Table + +| Field | Description | +|-------|-------------| +| `catalog` | UC catalog for the event log table. | +| `schema` | Schema for the event log table. | +| `name` | Table name. | + +--- + +## `notifications[]` — Alerts + +| Field | Description | +|-------|-------------| +| `email_recipients` | List of email addresses. | +| `alerts` | `"on-update-success"`, `"on-update-failure"`, `"on-update-fatal-failure"`, `"on-flow-failure"`. | + +--- + +## `configuration` — Spark / Pipeline Config + +All values must be strings. + +| Key | Description | +|-----|-------------| +| `spark.sql.shuffle.partitions` | Number of shuffle partitions. `"auto"` recommended. | +| `pipelines.numRetries` | Retries on transient failures. | +| `pipelines.trigger.interval` | Trigger interval for continuous pipelines (e.g. `"1 hour"`). | +| `spark.databricks.delta.preview.enabled` | Enable Delta preview features (`"true"`). | + +Any key here is also accessible from pipeline code via `spark.conf.get("key")` — use this to parameterize transformations. + +--- + +## `run_as` — Execution Identity + +Only one of these: + +| Field | Description | +|-------|-------------| +| `user_name` | Email of workspace user (can only set to your own). | +| `service_principal_name` | Application ID (requires `servicePrincipal/user` role). | + +--- + +## `restart_window` — Continuous-Pipeline Restart Window + +For continuous pipelines, the 5-hour window when daily restarts may occur: + +| Field | Description | +|-------|-------------| +| `start_hour` | **Required**. Hour 0–23 when window begins. | +| `days_of_week` | `"MONDAY"`, `"TUESDAY"`, … (default: all). | +| `time_zone_id` | e.g. `"America/Los_Angeles"` (default UTC). | + +--- + +## `filters` — Path Filtering + +| Field | Description | +|-------|-------------| +| `include` | Paths to include. | +| `exclude` | Paths to exclude. | + +--- + +## `environment` — Serverless Python Deps + +| Field | Description | +|-------|-------------| +| `dependencies` | List of pip requirements, e.g. `["pandas==2.0.0", "requests"]`. | + +--- + +## `deployment` — Deployment Method + +| Field | Description | +|-------|-------------| +| `kind` | `"BUNDLE"` (DABs) or `"DEFAULT"`. | +| `metadata_file_path` | Path to deployment metadata. | + +--- + +## Variant Snippets + +Each block shows what to add to (or replace in) the canonical create JSON. + +### Development mode + +```json +"development": true, +"tags": {"environment": "development", "owner": "data-team"} +``` + +### Non-serverless / dedicated cluster + +```json +"serverless": false, +"photon": true, +"edition": "ADVANCED", +"clusters": [{ + "label": "default", + "num_workers": 4, + "node_type_id": "i3.xlarge", + "custom_tags": {"cost_center": "analytics"} +}] +``` + +### Continuous streaming + +```json +"continuous": true, +"configuration": {"spark.sql.shuffle.partitions": "auto"} +``` + +### Production autoscaling cluster + +```json +"clusters": [{ + "label": "default", + "autoscale": {"min_workers": 2, "max_workers": 8, "mode": "ENHANCED"}, + "node_type_id": "i3.xlarge", + "spark_conf": {"spark.sql.adaptive.enabled": "true"}, + "custom_tags": {"environment": "production"} +}] +``` + +### Email notifications + +```json +"notifications": [{ + "email_recipients": ["team@example.com", "oncall@example.com"], + "alerts": ["on-update-failure", "on-update-fatal-failure", "on-flow-failure"] +}] +``` + +### Serverless Python dependencies + +```json +"environment": { + "dependencies": ["scikit-learn==1.3.0", "pandas>=2.0.0", "requests"] +} +``` + +### Continuous with restart window + +Combine `"continuous": true` with: + +```json +"restart_window": { + "start_hour": 2, + "days_of_week": ["SATURDAY", "SUNDAY"], + "time_zone_id": "America/Los_Angeles" +} +``` + +### Custom event-log location + +```json +"event_log": { + "catalog": "audit_catalog", + "schema": "pipeline_logs", + "name": "my_pipeline_events" +} +``` + +--- + +## Updating an Existing Pipeline + +`update` takes the same JSON shape as `create`: + +```bash +databricks pipelines update --json '{ + "name": "updated_name", + "development": false, + "notifications": [{"email_recipients": ["team@example.com"], "alerts": ["on-update-failure"]}] +}' +``` + +Then trigger a new run with `databricks pipelines start-update [--full-refresh]`. See [workflows.md](workflows.md#step-4-start-an-update-and-poll-that-update) for the polling pattern — never poll top-level `pipelines get` state for run completion. + +--- + +## Multi-Schema Patterns + +**Preferred: one pipeline, multiple schemas** via fully-qualified table names. Simpler than running multiple pipelines. + +For trivial cases where all tables go to the same schema, use name prefixes (`bronze_*`, `silver_*`, `gold_*`). + +### Same catalog, separate schemas (parameterized) + +Set pipeline defaults to bronze; pull silver/gold schemas from configuration: + +```python +from pyspark import pipelines as dp +from pyspark.sql.functions import col + +silver_schema = spark.conf.get("silver_schema") +gold_schema = spark.conf.get("gold_schema") +landing_schema = spark.conf.get("landing_schema") + +@dp.table(name="orders_bronze") +def orders_bronze(): + return spark.readStream.table(f"{landing_schema}.orders_raw") + +@dp.table(name=f"{silver_schema}.orders_clean") +def orders_clean(): + return spark.read.table("orders_bronze").filter(col("order_id").isNotNull()) + +@dp.materialized_view(name=f"{gold_schema}.orders_by_date") +def orders_by_date(): + return (spark.read.table(f"{silver_schema}.orders_clean") + .groupBy("order_date").count()) +``` + +Pass `silver_schema` / `gold_schema` / `landing_schema` via the pipeline's `configuration` block. + +### Custom catalog AND schema per layer + +For cross-catalog scenarios, use fully-qualified names directly: + +```python +@dp.table(name=f"{silver_catalog}.{silver_schema}.orders_clean") +def orders_clean(): + return spark.read.table("orders_bronze").filter(col("order_id").isNotNull()) +``` + +Same approach in SQL via fully-qualified `catalog.schema.table` in `CREATE OR REFRESH ...`. + +--- + +## Platform Constraints + +### Serverless requirements + +| Requirement | Notes | +|-------------|-------| +| Unity Catalog | Required — serverless always uses UC. | +| Region | Must be serverless-enabled. | +| Terms | Workspace must accept serverless terms of use. | +| CDC | Requires serverless (or Pro/Advanced with classic). | + +### Serverless limitations (force classic clusters) + +| Limitation | Reason to use classic | +|------------|----------------------| +| R language | Not supported on serverless. | +| Spark RDD APIs | Not supported. | +| JAR libraries / Maven coordinates | Not supported. | +| DBFS root access | Limited — use UC external locations. | +| Global temp views | Not supported. | + +### General constraints + +| Constraint | Notes | +|------------|-------| +| Schema evolution | Streaming tables need full refresh for incompatible changes. | +| `PIVOT` clause | Unsupported. | +| Sinks | Python only; streaming only; append-only flows. | diff --git a/skills/databricks-pipelines/references/scd-2-querying.md b/skills/databricks-pipelines/references/scd-2-querying.md new file mode 100644 index 0000000..9a626a1 --- /dev/null +++ b/skills/databricks-pipelines/references/scd-2-querying.md @@ -0,0 +1,277 @@ +# Querying SCD Type 2 Tables + +How to read SCD Type 2 history tables produced by Auto CDC: current-state views, point-in-time queries, change analysis, and joining facts with historical dimensions. Examples in both SQL and Python. + +For the CDC flow that *writes* these tables, see [auto-cdc.md](auto-cdc.md) and the per-language references. + +--- + +## Temporal Columns + +SCD Type 2 tables (from `stored_as_scd_type=2` / `STORED AS SCD TYPE 2`) include two system columns: + +| Column | Meaning | +|--------|---------| +| `__START_AT` | When this version became effective (typically `sequence_by` value). | +| `__END_AT` | When this version expired. `NULL` for the current version. | + +Both have the same type as the `SEQUENCE BY` / `sequence_by` column (usually `TIMESTAMP`). + +**Rule of thumb**: `WHERE __END_AT IS NULL` selects only current rows. That's the most common filter — bake it into a materialized view if you query it often. + +--- + +## Current State + +```sql +-- All current records (materialize for repeated use) +CREATE OR REFRESH MATERIALIZED VIEW dim_customers_current AS +SELECT customer_id, customer_name, email, phone, address, + __START_AT AS valid_from +FROM dim_customers +WHERE __END_AT IS NULL; + +-- Single customer current row +SELECT * +FROM dim_customers +WHERE customer_id = '12345' AND __END_AT IS NULL; +``` + +```python +@dp.materialized_view(name="dim_customers_current") +def dim_customers_current(): + return ( + spark.read.table("dim_customers") + .filter(F.col("__END_AT").isNull()) + .select("customer_id", "customer_name", "email", "phone", "address", + F.col("__START_AT").alias("valid_from")) + ) +``` + +--- + +## Point-in-Time Queries + +State as it existed on a specific date. The inclusive-lower / exclusive-upper boundary matters — get it right or you'll double-count at the seam between versions. + +```sql +-- Products as of 2024-01-01 +CREATE OR REFRESH MATERIALIZED VIEW products_as_of_2024_01_01 AS +SELECT product_id, product_name, price, category, + __START_AT, __END_AT +FROM products_history +WHERE __START_AT <= '2024-01-01' + AND (__END_AT > '2024-01-01' OR __END_AT IS NULL); +``` + +```python +@dp.materialized_view(name="products_as_of_2024_01_01") +def products_as_of_2024_01_01(): + as_of = "2024-01-01" + return ( + spark.read.table("products_history") + .filter(F.col("__START_AT") <= as_of) + .filter((F.col("__END_AT") > as_of) | F.col("__END_AT").isNull()) + ) +``` + +**Boundary convention**: `[__START_AT, __END_AT)` — start is inclusive, end is exclusive. A version with `__END_AT = '2024-01-01'` is *not* the active version on 2024-01-01. + +--- + +## Change Analysis + +### All versions of one entity (history) + +```sql +SELECT customer_id, customer_name, email, phone, + __START_AT, __END_AT, + COALESCE(DATEDIFF(DAY, __START_AT, __END_AT), + DATEDIFF(DAY, __START_AT, CURRENT_TIMESTAMP())) AS days_active +FROM dim_customers +WHERE customer_id = '12345' +ORDER BY __START_AT DESC; +``` + +```python +def customer_history(customer_id: str): + return ( + spark.read.table("dim_customers") + .filter(F.col("customer_id") == customer_id) + .withColumn("days_active", + F.coalesce(F.datediff("__END_AT", "__START_AT"), + F.datediff(F.current_timestamp(), "__START_AT"))) + .orderBy(F.col("__START_AT").desc()) + ) +``` + +### Changes within a time period + +```sql +-- Customers who changed during Q1 2024 (excluding the original version) +SELECT customer_id, customer_name, + __START_AT AS change_timestamp, + 'UPDATE' AS change_type +FROM dim_customers c +WHERE __START_AT BETWEEN '2024-01-01' AND '2024-03-31' + AND __START_AT != ( + SELECT MIN(__START_AT) FROM dim_customers c2 + WHERE c2.customer_id = c.customer_id + ) +ORDER BY __START_AT; +``` + +```python +@dp.materialized_view(name="customer_changes_q1_2024") +def customer_changes_q1_2024(): + history = spark.read.table("dim_customers") + first_seen = (history.groupBy("customer_id") + .agg(F.min("__START_AT").alias("first_start"))) + return ( + history.join(first_seen, "customer_id") + .filter(F.col("__START_AT").between("2024-01-01", "2024-03-31")) + .filter(F.col("__START_AT") != F.col("first_start")) + .select("customer_id", "customer_name", + F.col("__START_AT").alias("change_timestamp"), + F.lit("UPDATE").alias("change_type")) + ) +``` + +--- + +## Joining Facts with Historical Dimensions + +### As-of-transaction-time (canonical) + +For each fact row, pick the dimension version that was active at the transaction's event time. This is the common case for revenue-correct gold tables. + +```sql +CREATE OR REFRESH MATERIALIZED VIEW sales_with_historical_prices AS +SELECT s.sale_id, s.product_id, s.sale_date, s.quantity, + p.product_name, + p.price AS unit_price_at_sale_time, + s.quantity * p.price AS calculated_amount, + p.category +FROM sales_fact s +INNER JOIN products_history p + ON s.product_id = p.product_id + AND s.sale_date >= p.__START_AT + AND (s.sale_date < p.__END_AT OR p.__END_AT IS NULL); +``` + +```python +@dp.materialized_view(name="sales_with_historical_prices") +def sales_with_historical_prices(): + sales = spark.read.table("sales_fact") + products = spark.read.table("products_history") + return ( + sales.join( + products, + (sales.product_id == products.product_id) & + (sales.sale_date >= products.__START_AT) & + ((sales.sale_date < products.__END_AT) | products.__END_AT.isNull()), + "inner", + ) + .select(sales.sale_id, sales.product_id, sales.sale_date, sales.quantity, + products.product_name, + products.price.alias("unit_price_at_sale_time"), + (sales.quantity * products.price).alias("calculated_amount"), + products.category) + ) +``` + +### With the current dimension (ignore history) + +For reports that should always reflect today's attribute values (regardless of when the sale happened), join against the current row only. + +```sql +CREATE OR REFRESH MATERIALIZED VIEW sales_with_current_prices AS +SELECT s.sale_id, s.product_id, s.sale_date, s.quantity, + s.amount AS amount_at_sale, + p.product_name AS current_product_name, + p.price AS current_price +FROM sales_fact s +INNER JOIN products_history p + ON s.product_id = p.product_id + AND p.__END_AT IS NULL; +``` + +```python +@dp.materialized_view(name="sales_with_current_prices") +def sales_with_current_prices(): + sales = spark.read.table("sales_fact") + products_current = spark.read.table("products_history").filter(F.col("__END_AT").isNull()) + return ( + sales.join(products_current, "product_id", "inner") + .select("sale_id", "product_id", "sale_date", "quantity", + sales.amount.alias("amount_at_sale"), + products_current.product_name.alias("current_product_name"), + products_current.price.alias("current_price")) + ) +``` + +**Choosing between the two**: as-of-time for revenue, billing, and audit; current-dim for operational dashboards where attributes are *labels*, not values that drive the math. + +--- + +## Optimization Patterns + +### Pre-filter materialized views + +Querying the full history table for "current" repeatedly is wasteful. Bake the `__END_AT IS NULL` filter into an MV: + +```sql +CREATE OR REFRESH MATERIALIZED VIEW dim_products_current AS +SELECT * FROM products_history WHERE __END_AT IS NULL; + +CREATE OR REFRESH MATERIALIZED VIEW dim_recent_changes AS +SELECT * FROM products_history +WHERE __START_AT >= CURRENT_DATE() - INTERVAL 90 DAYS; + +CREATE OR REFRESH MATERIALIZED VIEW product_change_stats AS +SELECT product_id, + COUNT(*) AS version_count, + MIN(__START_AT) AS first_seen, + MAX(__START_AT) AS last_updated +FROM products_history +GROUP BY product_id; +``` + +```python +@dp.materialized_view(name="dim_products_current") +def dim_products_current(): + return spark.read.table("products_history").filter(F.col("__END_AT").isNull()) +``` + +### Cluster on lookup keys + time + +```sql +CREATE OR REFRESH STREAMING TABLE products_history +CLUSTER BY (product_id, __START_AT) +... +``` + +Clustering on `product_id` accelerates entity lookups; adding `__START_AT` helps point-in-time scans. See [performance.md](performance.md#cluster-key-selection-by-layer) for the full layer-by-layer key guide. + +--- + +## Best Practices + +1. **Filter `__END_AT IS NULL` for "current"** — never compare `__START_AT` against `MAX(__START_AT)` per entity. It's slower and breaks under concurrent updates. +2. **Use inclusive-lower / exclusive-upper** for point-in-time joins. Mismatched boundaries either drop the seam row or double-count it. +3. **Materialize repeated filters.** A `dim_*_current` MV is cheaper than re-filtering the history table on every downstream read. +4. **Make `SEQUENCE BY` high-precision.** Sub-second collisions (multiple changes at the same `updated_at`) cause non-deterministic ordering; prefer microsecond timestamps or compose with a tiebreaker via `STRUCT(timestamp, id)`. +5. **For wide history tables, `TRACK HISTORY ON` only the columns that need versions.** Other columns get Type-1 in-place updates and don't create new history rows. See [auto-cdc-python.md](auto-cdc-python.md) / [auto-cdc-sql.md](auto-cdc-sql.md). + +--- + +## Common Issues + +| Issue | Cause / Fix | +|-------|-------------| +| Multiple rows for the same key | Missing `__END_AT IS NULL` filter. | +| Point-in-time query returns no rows at the boundary | Wrong inclusive/exclusive — use `__START_AT <= D AND (__END_AT > D OR __END_AT IS NULL)`. | +| Point-in-time query double-counts at the boundary | Used `__END_AT >= D` instead of `__END_AT > D`. | +| Slow temporal join | Materialize current-state MV; cluster history on `(entity_key, __START_AT)`. | +| Unexpected duplicates per business key per moment | Multiple changes at the same `sequence_by` value — use a higher-precision sequence column or `STRUCT(ts, tiebreaker)`. | +| `__START_AT` / `__END_AT` columns missing | Source table isn't SCD Type 2 (Type 1 doesn't have temporals). | diff --git a/skills/databricks-pipelines/references/streaming-patterns.md b/skills/databricks-pipelines/references/streaming-patterns.md new file mode 100644 index 0000000..d7275a5 --- /dev/null +++ b/skills/databricks-pipelines/references/streaming-patterns.md @@ -0,0 +1,457 @@ +# Streaming Patterns + +Patterns for streaming pipelines: deduplication, windowed aggregations, late-arriving data, rescue-data quarantine, monitoring lag, and anomaly detection. Examples are shown in both SQL and Python. + +For perf-framed treatment of stream-to-stream joins, see [performance.md](performance.md#join-optimization). For Auto Loader API and options, see [auto-loader.md](auto-loader.md). For Kafka ingestion, see [kafka.md](kafka.md). + +--- + +## Deduplication + +Apply at the bronze → silver transition. Bronze accepts duplicates, silver is clean. + +### By key (keep first) + +```sql +CREATE OR REFRESH STREAMING TABLE silver_events_dedup AS +SELECT event_id, user_id, event_type, event_timestamp, _ingested_at +FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_timestamp) AS rn + FROM STREAM bronze_events +) +WHERE rn = 1; +``` + +```python +from pyspark import pipelines as dp +from pyspark.sql import functions as F +from pyspark.sql.window import Window + +@dp.table(name="silver_events_dedup", cluster_by=["event_date"]) +def silver_events_dedup(): + w = Window.partitionBy("event_id").orderBy("event_timestamp") + return ( + spark.readStream.table("bronze_events") + .withColumn("rn", F.row_number().over(w)) + .filter(F.col("rn") == 1) + .drop("rn") + ) +``` + +### Within a time window (tolerates late arrivals) + +```sql +CREATE OR REFRESH STREAMING TABLE silver_events_dedup AS +SELECT event_id, user_id, event_type, event_timestamp, + MIN(_ingested_at) AS first_seen_at +FROM STREAM bronze_events +GROUP BY event_id, user_id, event_type, event_timestamp, + window(event_timestamp, '1 hour'); +``` + +```python +@dp.table(name="silver_events_dedup") +def silver_events_dedup(): + return ( + spark.readStream.table("bronze_events") + .groupBy("event_id", "user_id", "event_type", "event_timestamp", + F.window("event_timestamp", "1 hour")) + .agg(F.min("_ingested_at").alias("first_seen_at")) + ) +``` + +### Composite key + +```sql +CREATE OR REFRESH STREAMING TABLE silver_transactions_dedup AS +SELECT transaction_id, customer_id, amount, transaction_timestamp, + MIN(_ingested_at) AS _ingested_at +FROM STREAM bronze_transactions +GROUP BY transaction_id, customer_id, amount, transaction_timestamp; +``` + +```python +@dp.table(name="silver_transactions_dedup") +def silver_transactions_dedup(): + return ( + spark.readStream.table("bronze_transactions") + .groupBy("transaction_id", "customer_id", "amount", "transaction_timestamp") + .agg(F.min("_ingested_at").alias("_ingested_at")) + ) +``` + +**Alternative for simple cases**: `SELECT DISTINCT ...` (SQL) or `.dropDuplicates(["event_id"])` (Python). These are fine for low-cardinality dedup but maintain state per unique row. + +### When to use Auto CDC instead + +For dedup with sequenced updates (most-recent-wins, deletes, late corrections), use Auto CDC with SCD Type 1 — see [auto-cdc.md](auto-cdc.md). Manual `ROW_NUMBER`/`GROUP BY` dedup is for append-only streams without semantic updates. + +--- + +## Windowed Aggregations + +### Tumbling windows (non-overlapping, fixed size) + +```sql +CREATE OR REFRESH STREAMING TABLE silver_sensor_5min AS +SELECT sensor_id, + window(event_timestamp, '5 minutes') AS time_window, + AVG(temperature) AS avg_temperature, + MIN(temperature) AS min_temperature, + MAX(temperature) AS max_temperature, + COUNT(*) AS event_count +FROM STREAM bronze_sensor_events +GROUP BY sensor_id, window(event_timestamp, '5 minutes'); +``` + +```python +@dp.table(name="silver_sensor_5min", cluster_by=["sensor_id"]) +def silver_sensor_5min(): + return ( + spark.readStream.table("bronze_sensor_events") + .groupBy("sensor_id", F.window("event_timestamp", "5 minutes")) + .agg(F.avg("temperature").alias("avg_temperature"), + F.min("temperature").alias("min_temperature"), + F.max("temperature").alias("max_temperature"), + F.count("*").alias("event_count")) + ) +``` + +### Multiple window sizes (separate tables per granularity) + +```sql +CREATE OR REFRESH STREAMING TABLE gold_sensor_1min AS +SELECT sensor_id, + window(event_timestamp, '1 minute').start AS window_start, + window(event_timestamp, '1 minute').end AS window_end, + AVG(value) AS avg_value, + COUNT(*) AS event_count +FROM STREAM silver_sensor_data +GROUP BY sensor_id, window(event_timestamp, '1 minute'); + +CREATE OR REFRESH STREAMING TABLE gold_sensor_1hour AS +SELECT sensor_id, + window(event_timestamp, '1 hour').start AS window_start, + AVG(value) AS avg_value, + STDDEV(value) AS stddev_value +FROM STREAM silver_sensor_data +GROUP BY sensor_id, window(event_timestamp, '1 hour'); +``` + +```python +@dp.table(name="gold_sensor_1min") +def gold_sensor_1min(): + return ( + spark.readStream.table("silver_sensor_data") + .groupBy("sensor_id", F.window("event_timestamp", "1 minute")) + .agg(F.avg("value").alias("avg_value"), + F.count("*").alias("event_count")) + .select("sensor_id", + F.col("window.start").alias("window_start"), + F.col("window.end").alias("window_end"), + "avg_value", "event_count") + ) + +@dp.table(name="gold_sensor_1hour") +def gold_sensor_1hour(): + return ( + spark.readStream.table("silver_sensor_data") + .groupBy("sensor_id", F.window("event_timestamp", "1 hour")) + .agg(F.avg("value").alias("avg_value"), + F.stddev("value").alias("stddev_value")) + ) +``` + +### Session windows (inactivity-bounded) + +Group events into sessions terminated by an inactivity gap: + +```sql +CREATE OR REFRESH STREAMING TABLE silver_user_sessions AS +SELECT user_id, + session_window(event_timestamp, '30 minutes') AS session, + MIN(event_timestamp) AS session_start, + MAX(event_timestamp) AS session_end, + COUNT(*) AS event_count, + COLLECT_LIST(event_type) AS event_sequence +FROM STREAM bronze_user_events +GROUP BY user_id, session_window(event_timestamp, '30 minutes'); +``` + +```python +@dp.table(name="silver_user_sessions") +def silver_user_sessions(): + return ( + spark.readStream.table("bronze_user_events") + .groupBy("user_id", F.session_window("event_timestamp", "30 minutes")) + .agg(F.min("event_timestamp").alias("session_start"), + F.max("event_timestamp").alias("session_end"), + F.count("*").alias("event_count"), + F.collect_list("event_type").alias("event_sequence")) + ) +``` + +### Window-size guidance + +| Window | Use case | +|--------|----------| +| 1–5 minutes | Real-time monitoring, alerting | +| 15–60 minutes | Operational dashboards | +| 1–24 hours | Analytical reports | + +Larger windows = less state pressure but stale results. Pick the smallest window that meets the freshness SLO. + +--- + +## Late-Arriving Data + +### Use event time, not processing time, for business logic + +```sql +CREATE OR REFRESH STREAMING TABLE gold_daily_orders AS +SELECT CAST(order_timestamp AS DATE) AS order_date, -- event time + COUNT(*) AS order_count, + SUM(amount) AS total_amount +FROM STREAM silver_orders +GROUP BY CAST(order_timestamp AS DATE); +``` + +```python +@dp.table(name="gold_daily_orders") +def gold_daily_orders(): + return ( + spark.readStream.table("silver_orders") + .groupBy(F.to_date("order_timestamp").alias("order_date")) # event time + .agg(F.count("*").alias("order_count"), + F.sum("amount").alias("total_amount")) + ) +``` + +Keep `_ingested_at` (processing time) in the schema as a debugging field — never the aggregation key. + +--- + +## Rescue-Data Quarantine + +Pattern: route malformed records to a quarantine table so the clean stream stays clean, but no data is silently dropped. Uses Auto Loader's rescued-data column (`_rescued_data`, default name; configurable via `rescuedDataColumn`). + +```sql +-- Bronze: ingest everything, flag rows where Auto Loader rescued bad fields +CREATE OR REFRESH STREAMING TABLE bronze_events AS +SELECT *, + current_timestamp() AS _ingested_at, + _rescued_data IS NOT NULL AS _has_errors +FROM STREAM read_files('/Volumes/cat/sch/raw/events/', format => 'json'); + +-- Quarantine: only the rescued/malformed rows +CREATE OR REFRESH STREAMING TABLE bronze_quarantine AS +SELECT * FROM STREAM bronze_events WHERE _rescued_data IS NOT NULL; + +-- Silver: only the clean rows +CREATE OR REFRESH STREAMING TABLE silver_clean AS +SELECT * FROM STREAM bronze_events WHERE _rescued_data IS NULL; +``` + +```python +@dp.table(name="bronze_events", cluster_by=["ingestion_date"]) +def bronze_events(): + return ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "json") + .option("rescuedDataColumn", "_rescued_data") + .load("/Volumes/cat/sch/raw/events/") + .withColumn("_ingested_at", F.current_timestamp()) + .withColumn("_has_errors", F.col("_rescued_data").isNotNull()) + ) + +@dp.table(name="bronze_quarantine") +def bronze_quarantine(): + return spark.readStream.table("bronze_events").filter("_has_errors = true") + +@dp.table(name="silver_clean") +def silver_clean(): + return spark.readStream.table("bronze_events").filter("_has_errors = false") +``` + +**When to use**: Schema drift on JSON / CSV ingestion, optional fields that arrive late, downstream tables that can't tolerate nulls in known columns. Pair with an alert on `bronze_quarantine` row growth. + +**Alternative**: `@dp.expect_or_drop` / `CONSTRAINT ... ON VIOLATION DROP ROW`. Use expectations when the rule is a value check (`amount > 0`); use rescued-data quarantine when the rule is a schema/parse problem. + +--- + +## Stream-to-Stream Joins (Pattern) + +Always bound the join by event-time interval. Without bounds, state grows unbounded. + +```sql +CREATE OR REFRESH STREAMING TABLE silver_orders_with_payments AS +SELECT o.order_id, o.customer_id, o.order_timestamp, + o.amount AS order_amount, + p.payment_id, p.payment_timestamp, p.payment_method, + p.amount AS payment_amount +FROM STREAM bronze_orders o +INNER JOIN STREAM bronze_payments p + ON o.order_id = p.order_id + AND p.payment_timestamp BETWEEN o.order_timestamp + AND o.order_timestamp + INTERVAL 1 HOUR; +``` + +```python +@dp.table(name="silver_orders_with_payments") +def silver_orders_with_payments(): + orders = spark.readStream.table("bronze_orders") + payments = spark.readStream.table("bronze_payments") + return orders.join( + payments, + (orders.order_id == payments.order_id) & + (payments.payment_timestamp >= orders.order_timestamp) & + (payments.payment_timestamp <= orders.order_timestamp + F.expr("INTERVAL 1 HOUR")), + "inner", + ) +``` + +For stream-to-static (broadcast small dimensions) and perf-tuning, see [performance.md](performance.md#join-optimization). + +--- + +## Incremental Aggregations (Running Totals) + +Streaming `GROUP BY` without windows yields cumulative aggregates per group. Watch state size — see [performance.md](performance.md#state-management-for-streaming). + +```sql +CREATE OR REFRESH STREAMING TABLE silver_customer_running_totals AS +SELECT customer_id, + SUM(amount) AS total_spent, + COUNT(*) AS transaction_count, + MAX(transaction_timestamp) AS last_transaction_at +FROM STREAM bronze_transactions +GROUP BY customer_id; +``` + +```python +@dp.table(name="silver_customer_running_totals") +def silver_customer_running_totals(): + return ( + spark.readStream.table("bronze_transactions") + .groupBy("customer_id") + .agg(F.sum("amount").alias("total_spent"), + F.count("*").alias("transaction_count"), + F.max("transaction_timestamp").alias("last_transaction_at")) + ) +``` + +--- + +## Anomaly Detection + +### Rolling z-score outlier flag + +```sql +CREATE OR REFRESH STREAMING TABLE silver_sensor_with_anomalies AS +SELECT sensor_id, event_timestamp, temperature, + AVG(temperature) OVER w AS rolling_avg_100, + STDDEV(temperature) OVER w AS rolling_stddev_100, + CASE + WHEN temperature > AVG(temperature) OVER w + 3 * STDDEV(temperature) OVER w THEN 'HIGH_OUTLIER' + WHEN temperature < AVG(temperature) OVER w - 3 * STDDEV(temperature) OVER w THEN 'LOW_OUTLIER' + ELSE 'NORMAL' + END AS anomaly_flag +FROM STREAM bronze_sensor_events +WINDOW w AS (PARTITION BY sensor_id ORDER BY event_timestamp + ROWS BETWEEN 100 PRECEDING AND CURRENT ROW); + +-- Route anomalies for alerting +CREATE OR REFRESH STREAMING TABLE silver_sensor_anomalies AS +SELECT * FROM STREAM silver_sensor_with_anomalies +WHERE anomaly_flag IN ('HIGH_OUTLIER', 'LOW_OUTLIER'); +``` + +```python +@dp.table(name="silver_sensor_with_anomalies") +def silver_sensor_with_anomalies(): + w = Window.partitionBy("sensor_id").orderBy("event_timestamp").rowsBetween(-100, 0) + return ( + spark.readStream.table("bronze_sensor_events") + .withColumn("rolling_avg", F.avg("temperature").over(w)) + .withColumn("rolling_stddev", F.stddev("temperature").over(w)) + .withColumn("anomaly_flag", + F.when(F.col("temperature") > F.col("rolling_avg") + 3 * F.col("rolling_stddev"), "HIGH_OUTLIER") + .when(F.col("temperature") < F.col("rolling_avg") - 3 * F.col("rolling_stddev"), "LOW_OUTLIER") + .otherwise("NORMAL")) + ) + +@dp.table(name="silver_sensor_anomalies") +def silver_sensor_anomalies(): + return ( + spark.readStream.table("silver_sensor_with_anomalies") + .filter(F.col("anomaly_flag").isin("HIGH_OUTLIER", "LOW_OUTLIER")) + ) +``` + +### Static threshold filtering + +```sql +CREATE OR REFRESH STREAMING TABLE silver_high_value_transactions AS +SELECT transaction_id, customer_id, amount, transaction_timestamp +FROM STREAM bronze_transactions +WHERE amount > 10000; +``` + +```python +@dp.table(name="silver_high_value_transactions") +def silver_high_value_transactions(): + return (spark.readStream.table("bronze_transactions").filter(F.col("amount") > 10000)) +``` + +--- + +## Monitoring Lag + +Track end-to-end freshness by comparing event-time max to processing time. Useful for alerting on ingestion delays from Kafka, Kinesis, or Auto Loader. + +```sql +CREATE OR REFRESH STREAMING TABLE monitoring_lag AS +SELECT 'kafka_events' AS source, + MAX(kafka_timestamp) AS max_event_timestamp, + current_timestamp() AS processing_timestamp, + unix_timestamp(current_timestamp()) - unix_timestamp(MAX(kafka_timestamp)) AS lag_seconds +FROM STREAM bronze_kafka_events +GROUP BY window(kafka_timestamp, '1 minute'); +``` + +```python +@dp.table(name="monitoring_lag") +def monitoring_lag(): + return ( + spark.readStream.table("bronze_kafka_events") + .groupBy(F.window("kafka_timestamp", "1 minute")) + .agg(F.lit("kafka_events").alias("source"), + F.max("kafka_timestamp").alias("max_event_timestamp"), + F.current_timestamp().alias("processing_timestamp")) + .withColumn("lag_seconds", + F.unix_timestamp("processing_timestamp") - F.unix_timestamp("max_event_timestamp")) + ) +``` + +--- + +## Best Practices + +1. **Use event time, not processing time**, for aggregation keys. +2. **Deduplicate at silver**, not bronze. Bronze is append-only, silver is clean. +3. **Bound state**: time windows, lower cardinality, materialize intermediates — see [performance.md](performance.md#state-management-for-streaming). +4. **Quarantine, don't drop silently** — route bad rows to a side table for observability. +5. **Use Auto CDC for sequenced updates** instead of building dedup with `ROW_NUMBER` — see [auto-cdc.md](auto-cdc.md). + +--- + +## Common Issues + +| Issue | Cause / Fix | +|-------|-------------| +| High memory with windows | Larger windows; reduce group-by cardinality. | +| Duplicate events in output | Add explicit dedup by unique key, or switch to Auto CDC SCD Type 1. | +| Missing late-arriving events | Larger window; check that aggregation uses event time not processing time. | +| Stream-to-stream join empty | Missing or too-narrow time bound on join condition. | +| State growth over time | Add time windows; reduce cardinality; materialize daily then aggregate batch monthly. | +| `bronze_quarantine` empty unexpectedly | `rescuedDataColumn` not enabled; check Auto Loader options. | diff --git a/skills/databricks-pipelines/references/workflows.md b/skills/databricks-pipelines/references/workflows.md new file mode 100644 index 0000000..671157c --- /dev/null +++ b/skills/databricks-pipelines/references/workflows.md @@ -0,0 +1,357 @@ +# Pipeline Project Workflows + +Three workflows for building Spark Declarative Pipelines, depending on what already exists in the project and how much DAB scaffolding the user wants. + +## Choose Your Workflow + +| Situation | Workflow | +|-----------|----------| +| New, standalone pipeline project with its own bundle | **A. Standalone bundle** | +| Pipeline added to an existing DAB project | **B. Existing bundle** | +| Quick prototyping, no bundle (yet) | **C. Rapid CLI iteration** | + +If the user is unsure, default to A for production-bound work and C for exploration. + +--- + +## Language Selection (Python vs SQL) + +Decide before scaffolding — the choice picks the template files (`.py` vs `.sql`) and pulls in different reference docs. Both languages can coexist in the same project, but pick one primary. + +| User signal | Pick | +|-------------|------| +| "Python pipeline", "use Python", UDF, pandas, ML inference, pyspark | **Python** | +| "SQL pipeline", "SQL files", "use SQL" | **SQL** | +| "Create a simple pipeline", "create a table", "an aggregation" | **SQL** (simpler) | +| Complex parameterized logic, custom UDFs, ML, advanced processing | **Python** | + +If the request is ambiguous, ask. Stick with the chosen language unless the user explicitly switches. + +--- + +## Workflow A: Standalone Bundle (`pipelines init`) + +Use when the user wants a new project where the pipeline *is* the project. + +### Non-interactive (recommended for agents) + +```bash +databricks pipelines init --output-dir . --config-file init-config.json +``` + +`init-config.json`: + +```json +{ + "project_name": "customer_pipeline", + "initial_catalog": "prod_catalog", + "use_personal_schema": "no", + "initial_language": "sql" +} +``` + +| Field | Notes | +|-------|-------| +| `project_name` | Letters, numbers, underscores only. Used for bundle name + folder. | +| `initial_catalog` | Must exist in Unity Catalog. | +| `use_personal_schema` | `"yes"` → `${workspace.current_user.short_name}` (dev). `"no"` → fixed value (prod). | +| `initial_language` | `"sql"` or `"python"` (lowercase). | + +### Interactive + +```bash +databricks pipelines init --output-dir . +``` + +Prompts for the same fields. + +### Generated structure + +``` +project_root/ +├── databricks.yml # Bundle config +├── pyproject.toml # Python only +├── resources/ +│ ├── _etl.pipeline.yml # Pipeline resource +│ └── sample_job.job.yml # Optional scheduled job +└── src/ + └── _etl/ + ├── explorations/ # Ad-hoc notebooks (NOT pipeline code) + └── transformations/ # Pipeline transformations + ├── sample_*.sql # or .py + └── ... +``` + +**Key rule**: Pipeline transformations are raw `.sql` / `.py` files. Notebooks go in `explorations/` for ad-hoc work only. + +### Customize and deploy + +1. Replace `sample_*` files in `transformations/` with real datasets (1 dataset per file). +2. Edit `databricks.yml` to set per-target catalog/schema variables and workspace host. +3. Edit `resources/_etl.pipeline.yml` for pipeline-level settings (serverless on by default). +4. `databricks bundle validate` → `databricks bundle deploy [-t ]` → `databricks bundle run `. + +### Alternative: `databricks bundle init lakeflow-pipelines` + +The older template-based scaffolding also works: + +```bash +databricks bundle init lakeflow-pipelines \ + --config-file <(echo '{"project_name": "my_pipeline", "language": "python", "serverless": "yes"}') \ + --profile < /dev/null +``` + +Both produce DAB-shaped projects; `pipelines init` is the newer, more focused command. + +### `databricks.yml` essentials + +```yaml +bundle: + name: customer_pipeline + +include: + - resources/*.yml + - resources/*/*.yml + +variables: + catalog: { description: The catalog to use } + schema: { description: The schema to use } + +targets: + dev: + mode: development # prefixes resources with [dev ], pauses schedules + default: true + workspace: + host: https://.cloud.databricks.com + variables: + catalog: dev_catalog + schema: ${workspace.current_user.short_name} + + prod: + mode: production # no prefix, schedules active + workspace: + host: https://.cloud.databricks.com + root_path: /Workspace/Users//.bundle/${bundle.name}/${bundle.target} + variables: + catalog: prod_catalog + schema: production + permissions: + - user_name: + level: CAN_MANAGE +``` + +### Pipeline resource (`resources/.pipeline.yml`) + +```yaml +resources: + pipelines: + customer_pipeline_etl: + name: customer_pipeline_etl + catalog: ${var.catalog} + schema: ${var.schema} + serverless: true + continuous: false # explicit — true auto-retries failed updates forever + root_path: "../src/customer_pipeline_etl" + libraries: + - glob: + include: ../src/customer_pipeline_etl/transformations/** + environment: # serverless Python deps (optional) + dependencies: + - --editable ${workspace.file_path} +``` + +### Python project dependencies + +Python projects ship a `pyproject.toml`. Runtime deps go in `[project].dependencies`; dev-only in `[project.optional-dependencies].dev`. The `--editable ${workspace.file_path}` line in the pipeline resource installs the package on serverless compute at deploy time. + +--- + +## Workflow B: Pipeline in Existing Bundle + +Use when `databricks.yml` already exists for a larger project (app + jobs + dashboards) and a pipeline is being added to it. + +### Step 1: Add a pipeline resource file + +`resources/my_pipeline.pipeline.yml`: + +```yaml +resources: + pipelines: + my_pipeline: + name: my_pipeline + catalog: ${var.catalog} + schema: ${var.schema} + serverless: true + continuous: false + libraries: + - glob: + include: ../src/pipelines/my_pipeline/** +``` + +### Step 2: Add source files + +``` +src/pipelines/my_pipeline/ +├── bronze_ingest.sql +├── silver_clean.sql +└── gold_summary.sql +``` + +### Step 3: Deploy + +```bash +databricks bundle deploy +databricks bundle run my_pipeline +``` + +The pipeline picks up the bundle's existing targets / variables / permissions. + +--- + +## Workflow C: Rapid CLI Iteration (no bundle) + +Use for prototyping when bundle scaffolding would slow the user down. Skip when the work is production-bound — workflow A or B is better long-term. + +### Step 1: Write files locally + +`.sql` or `.py` files in a folder. See [python-basics.md](python-basics.md) or [sql-basics.md](sql-basics.md) for syntax. + +### Step 2: Upload to workspace + +```bash +databricks workspace import-dir ./my_pipeline /Workspace/Users//my_pipeline +``` + +Re-upload with `--overwrite` after every code change. + +### Step 3: Create the pipeline + +```bash +databricks pipelines create --json '{ + "name": "my_pipeline", + "catalog": "my_catalog", + "schema": "my_schema", + "serverless": true, + "continuous": false, + "channel": "PREVIEW", + "libraries": [{"glob": {"include": "/Workspace/Users//my_pipeline/**"}}] +}' +``` + +`libraries` field notes: + +- `"glob"` — directory of files. Recommended. +- `"file"` — single `.sql` / `.py`. A `"file"` pointing at a folder fails with `Paths must end with .py or .sql`. +- `"notebook"` — **deprecated**, never use. + +Use enumerated `"file"` entries instead of `"glob"` only when explicit ordering matters. + +Capture the returned `pipeline_id`. + +### Step 4: Start an update and poll *that update* + +```bash +UPDATE_ID=$(databricks pipelines start-update | jq -r .update_id) +# Or with full refresh (destructive on streaming state — omit for incremental): +# UPDATE_ID=$(databricks pipelines start-update --full-refresh | jq -r .update_id) + +while :; do + STATE=$(databricks pipelines get-update "$UPDATE_ID" | jq -r '.update.state') + echo "$(date +%H:%M:%S) update=$UPDATE_ID state=$STATE" + case "$STATE" in COMPLETED|FAILED|CANCELED) break;; esac + sleep 30 +done +``` + +**Why poll the update, not the pipeline.** Top-level pipeline `state` flips back to `RUNNING` on `RETRY_ON_FAILURE`, so a loop watching the pipeline (or `latest_updates[0]`) can spin past a real `FAILED` update forever. Poll the captured `update_id` and stop on the first terminal state — including `FAILED`. + +**On `FAILED`**: read the events log, don't re-run. + +```bash +databricks pipelines list-pipeline-events \ + | jq '[.[] | select(.level=="ERROR") | {event_type, message: (.message // "")[0:300]}] | .[0:5]' +``` + +If the pipeline is already `RUNNING`, `start-update` queues the new update. Force-stop with `databricks pipelines stop ` first if needed. + +### Step 5: Edit → re-upload → restart + +```bash +# Re-upload (whole dir) +databricks workspace import-dir ./my_pipeline /Workspace/Users//my_pipeline --overwrite + +# Or a single file +databricks workspace import /Workspace/Users//my_pipeline/gold.sql \ + --file ./my_pipeline/gold.sql --format RAW --overwrite + +# Restart +databricks pipelines start-update +``` + +**Use `--format RAW`** for raw `.sql` / `.py` FILE entries. `--format SOURCE --language SQL|PYTHON` uploads a workspace *notebook* — and **notebooks are deprecated for pipelines**. Mixing the two on the same path fails with `Cannot overwrite the asset ... due to type mismatch (asked: NOTEBOOK, actual: FILE)`. + +### Step 6: Validate output data + +Even on `COMPLETED`, verify the data: + +```bash +databricks experimental aitools tools discover-schema \ + my_catalog.my_schema.bronze_orders \ + my_catalog.my_schema.silver_orders \ + my_catalog.my_schema.gold_summary +``` + +Returns columns/types, 5 sample rows, total row count, and null counts per column per table. + +Check for: empty tables (ingestion or filter problems), unexpected row counts (broken joins), missing columns (schema mismatch), nulls in key columns (data quality). + +### Python SDK alternative + +```python +from databricks.sdk import WorkspaceClient +import time + +w = WorkspaceClient() + +pipeline = w.pipelines.create( + name="my_pipeline", + catalog="my_catalog", + schema="my_schema", + serverless=True, + continuous=False, + libraries=[{"glob": {"include": "/Workspace/Users//my_pipeline/**"}}], + development=True, +) + +update = w.pipelines.start_update( + pipeline_id=pipeline.pipeline_id, + full_refresh=True, +) + +while True: + u = w.pipelines.get_update(pipeline_id=pipeline.pipeline_id, + update_id=update.update_id).update + if str(u.state) in ("COMPLETED", "FAILED", "CANCELED"): + print(f"Update {u.update_id}: {u.state}") + break + time.sleep(10) +``` + +--- + +## Migrating from a Manual Folder Structure + +If the user already has `bronze/`, `silver/`, `gold/` folders without a bundle, migrate to workflow A by wrapping them in a `databricks.yml` and a pipeline resource pointing at the existing folders via a `glob`. No file moves required — the medallion folders work as-is under `transformations/**`. + +--- + +## Common Initialization Issues + +| Issue | Fix | +|-------|-----| +| `Command not found: databricks` | `pip install databricks-cli` | +| `Invalid catalog name` | `databricks catalogs list` and verify; create with `databricks catalogs create --json '{"name": "..."}'` | +| `Language option not recognized` | Use lowercase `"sql"` / `"python"`, not `"SQL"` / `"Python"` | +| Files deploy but pipeline doesn't pick them up | Glob pattern in `libraries` doesn't match — re-check `include` path relative to the resource file | +| `Bundle validation failed: Invalid schema` | `databricks bundle validate`, check YAML indentation (spaces, not tabs) | +| Files deploy but pipeline config stale | `databricks bundle deploy --force` |