Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions REVIEW-NOTES-databricks-pipelines.md

Large diffs are not rendered by default.

15 changes: 3 additions & 12 deletions manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,19 @@
"agents/openai.yaml",
"assets/databricks.png",
"assets/databricks.svg",
"references/1-project-initialization-with-dab.md",
"references/2-rapid-iteration-with-cli.md",
"references/auto-cdc-python.md",
"references/auto-cdc-sql.md",
"references/auto-cdc.md",
"references/auto-loader-python.md",
"references/auto-loader-sql.md",
"references/auto-loader.md",
"references/dlt-migration.md",
"references/expectations-python.md",
"references/expectations-sql.md",
"references/expectations.md",
"references/foreach-batch-sink-python.md",
"references/foreach-batch-sink.md",
"references/kafka.md",
"references/materialized-view-python.md",
"references/materialized-view-sql.md",
"references/materialized-view.md",
"references/options-avro.md",
"references/options-csv.md",
"references/options-json.md",
Expand All @@ -139,19 +136,13 @@
"references/python-basics.md",
"references/scd-2-querying.md",
"references/sink-python.md",
"references/sink.md",
"references/sql-basics.md",
"references/streaming-patterns.md",
"references/streaming-table-python.md",
"references/streaming-table-sql.md",
"references/streaming-table.md",
"references/temporary-view-python.md",
"references/temporary-view-sql.md",
"references/temporary-view.md",
"references/view-sql.md",
"references/view.md",
"references/workflows.md",
"references/write-spark-declarative-pipelines.md"
"references/view-sql.md"
]
},
"databricks-serverless-migration": {
Expand Down
372 changes: 151 additions & 221 deletions skills/databricks-pipelines/SKILL.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,37 +1,17 @@
# Pipeline Project Workflows
# Project Initialization with DAB

Three workflows for building Spark Declarative Pipelines, depending on what already exists in the project and how much DAB scaffolding the user wants.
Two DAB-based workflows for creating Spark Declarative Pipelines:

## Choose Your Workflow
- **Workflow A**: Standalone new project (the pipeline *is* the project).
- **Workflow B**: Adding a pipeline to an existing bundle (the pipeline is part of a larger app + jobs + dashboards).

| Situation | Workflow |
|-----------|----------|
| New, standalone pipeline project with its own bundle | **A. Standalone bundle** |
| Pipeline added to an existing DAB project | **B. Existing bundle** |
| Quick prototyping, no bundle (yet) | **C. Rapid CLI iteration** |

If the user is unsure, default to A for production-bound work and C for exploration.

---

## Language Selection (Python vs SQL)

Decide before scaffolding — the choice picks the template files (`.py` vs `.sql`) and pulls in different reference docs. Both languages can coexist in the same project, but pick one primary.

| User signal | Pick |
|-------------|------|
| "Python pipeline", "use Python", UDF, pandas, ML inference, pyspark | **Python** |
| "SQL pipeline", "SQL files", "use SQL" | **SQL** |
| "Create a simple pipeline", "create a table", "an aggregation" | **SQL** (simpler) |
| Complex parameterized logic, custom UDFs, ML, advanced processing | **Python** |

If the request is ambiguous, ask. Stick with the chosen language unless the user explicitly switches.
For prototyping without a bundle, see [2-rapid-iteration-with-cli.md](2-rapid-iteration-with-cli.md).

---

## Workflow A: Standalone Bundle (`pipelines init`)

Use when the user wants a new project where the pipeline *is* the project.
Use when the user wants a new project where the pipeline *is* the project (no existing `databricks.yml`).

### Non-interactive (recommended for agents)

Expand Down Expand Up @@ -65,6 +45,18 @@ databricks pipelines init --output-dir .

Prompts for the same fields.

### Alternative: `databricks bundle init lakeflow-pipelines`

The older template-based scaffolding also works:

```bash
databricks bundle init lakeflow-pipelines \
--config-file <(echo '{"project_name": "my_pipeline", "language": "python", "serverless": "yes"}') \
--profile <PROFILE> < /dev/null
```

Both produce DAB-shaped projects; `pipelines init` is the newer, more focused command.

### Generated structure

```
Expand All @@ -91,18 +83,6 @@ project_root/
3. Edit `resources/<name>_etl.pipeline.yml` for pipeline-level settings (serverless on by default).
4. `databricks bundle validate` → `databricks bundle deploy [-t <target>]` → `databricks bundle run <pipeline_name>`.

### Alternative: `databricks bundle init lakeflow-pipelines`

The older template-based scaffolding also works:

```bash
databricks bundle init lakeflow-pipelines \
--config-file <(echo '{"project_name": "my_pipeline", "language": "python", "serverless": "yes"}') \
--profile <PROFILE> < /dev/null
```

Both produce DAB-shaped projects; `pipelines init` is the newer, more focused command.

### `databricks.yml` essentials

```yaml
Expand Down Expand Up @@ -160,9 +140,36 @@ resources:
- --editable ${workspace.file_path}
```

### Scheduling Pipelines

To schedule a pipeline, add a job that triggers it in `resources/<name>.job.yml`:

```yaml
resources:
jobs:
my_pipeline_job:
trigger:
periodic:
interval: 1
unit: DAYS
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.my_pipeline.id}
```


### Python project dependencies

Python projects ship a `pyproject.toml`. Runtime deps go in `[project].dependencies`; dev-only in `[project.optional-dependencies].dev`. The `--editable ${workspace.file_path}` line in the pipeline resource installs the package on serverless compute at deploy time.
Python projects ship a standard `pyproject.toml`. Runtime deps in `[project].dependencies`, dev-only in `[project.optional-dependencies].dev` (e.g. `databricks-connect>=15.4,<15.5`, `pytest`, `ruff`). The `--editable ${workspace.file_path}` line in the pipeline resource installs the package on serverless compute at deploy time.

### Multi-environment workflow

```bash
databricks bundle deploy # dev (default target) — resources prefixed [dev <user>]
databricks bundle deploy --target prod # prod — no prefix, schedules active
databricks bundle run customer_pipeline_etl [--target prod]
```

---

Expand Down Expand Up @@ -208,140 +215,45 @@ The pipeline picks up the bundle's existing targets / variables / permissions.

---

## Workflow C: Rapid CLI Iteration (no bundle)

Use for prototyping when bundle scaffolding would slow the user down. Skip when the work is production-bound — workflow A or B is better long-term.

### Step 1: Write files locally

`.sql` or `.py` files in a folder. See [python-basics.md](python-basics.md) or [sql-basics.md](sql-basics.md) for syntax.

### Step 2: Upload to workspace

```bash
databricks workspace import-dir ./my_pipeline /Workspace/Users/<user>/my_pipeline
```

Re-upload with `--overwrite` after every code change.

### Step 3: Create the pipeline

```bash
databricks pipelines create --json '{
"name": "my_pipeline",
"catalog": "my_catalog",
"schema": "my_schema",
"serverless": true,
"continuous": false,
"channel": "PREVIEW",
"libraries": [{"glob": {"include": "/Workspace/Users/<user>/my_pipeline/**"}}]
}'
```

`libraries` field notes:

- `"glob"` — directory of files. Recommended.
- `"file"` — single `.sql` / `.py`. A `"file"` pointing at a folder fails with `Paths must end with .py or .sql`.
- `"notebook"` — **deprecated**, never use.

Use enumerated `"file"` entries instead of `"glob"` only when explicit ordering matters.

Capture the returned `pipeline_id`.
## Running a Pipeline (Workflow A / B)

### Step 4: Start an update and poll *that update*
**You must deploy before running.** In local development, code changes only take effect after `databricks bundle deploy`. Always deploy before any run, dry run, or selective refresh.

```bash
UPDATE_ID=$(databricks pipelines start-update <pipeline_id> | jq -r .update_id)
# Or with full refresh (destructive on streaming state — omit for incremental):
# UPDATE_ID=$(databricks pipelines start-update <pipeline_id> --full-refresh | jq -r .update_id)

while :; do
STATE=$(databricks pipelines get-update <pipeline_id> "$UPDATE_ID" | jq -r '.update.state')
echo "$(date +%H:%M:%S) update=$UPDATE_ID state=$STATE"
case "$STATE" in COMPLETED|FAILED|CANCELED) break;; esac
sleep 30
done
```

**Why poll the update, not the pipeline.** Top-level pipeline `state` flips back to `RUNNING` on `RETRY_ON_FAILURE`, so a loop watching the pipeline (or `latest_updates[0]`) can spin past a real `FAILED` update forever. Poll the captured `update_id` and stop on the first terminal state — including `FAILED`.

**On `FAILED`**: read the events log, don't re-run.
### Development workflow

```bash
databricks pipelines list-pipeline-events <pipeline_id> \
| jq '[.[] | select(.level=="ERROR") | {event_type, message: (.message // "")[0:300]}] | .[0:5]'
```
# 1. Validate the bundle config
databricks bundle validate --profile <profile>

If the pipeline is already `RUNNING`, `start-update` queues the new update. Force-stop with `databricks pipelines stop <pipeline_id>` first if needed.
# 2. Deploy to a target (dev is default)
databricks bundle deploy -t dev --profile <profile>

### Step 5: Edit → re-upload → restart
# 3. Trigger the pipeline
databricks bundle run <pipeline_name> -t dev --profile <profile>

```bash
# Re-upload (whole dir)
databricks workspace import-dir ./my_pipeline /Workspace/Users/<user>/my_pipeline --overwrite

# Or a single file
databricks workspace import /Workspace/Users/<user>/my_pipeline/gold.sql \
--file ./my_pipeline/gold.sql --format RAW --overwrite

# Restart
databricks pipelines start-update <pipeline_id>
# 4. Check status (capture the update_id from step 3 and poll it — not top-level state)
databricks pipelines get <pipeline_id> --profile <profile>
databricks pipelines get-update <pipeline_id> <update_id> --profile <profile>
```

**Use `--format RAW`** for raw `.sql` / `.py` FILE entries. `--format SOURCE --language SQL|PYTHON` uploads a workspace *notebook* — and **notebooks are deprecated for pipelines**. Mixing the two on the same path fails with `Cannot overwrite the asset ... due to type mismatch (asked: NOTEBOOK, actual: FILE)`.
For the rationale on polling the update (not the pipeline) and the FAILED-extraction `jq` pattern, see [2-rapid-iteration-with-cli.md#step-4-start-an-update-and-poll-that-update](2-rapid-iteration-with-cli.md#step-4-start-an-update-and-poll-that-update). It applies to bundle runs too.

### Step 6: Validate output data
### Refresh modes

Even on `COMPLETED`, verify the data:
- **Selective refresh** is preferred when only one table needs to run. Dependencies must already be materialized.
- **Full refresh** is the most expensive option and **can lead to data loss** — it reprocesses streaming sources from scratch and destroys streaming state. Use only when necessary, and always surface it as a follow-up the user must explicitly approve. CLI: `databricks bundle run <pipeline_name> --full-refresh-all` or `--refresh <table>` for selective.

```bash
databricks experimental aitools tools discover-schema \
my_catalog.my_schema.bronze_orders \
my_catalog.my_schema.silver_orders \
my_catalog.my_schema.gold_summary
```
### Editing pipeline code

Returns columns/types, 5 sample rows, total row count, and null counts per column per table.

Check for: empty tables (ingestion or filter problems), unexpected row counts (broken joins), missing columns (schema mismatch), nulls in key columns (data quality).

### Python SDK alternative

```python
from databricks.sdk import WorkspaceClient
import time

w = WorkspaceClient()

pipeline = w.pipelines.create(
name="my_pipeline",
catalog="my_catalog",
schema="my_schema",
serverless=True,
continuous=False,
libraries=[{"glob": {"include": "/Workspace/Users/<user>/my_pipeline/**"}}],
development=True,
)

update = w.pipelines.start_update(
pipeline_id=pipeline.pipeline_id,
full_refresh=True,
)

while True:
u = w.pipelines.get_update(pipeline_id=pipeline.pipeline_id,
update_id=update.update_id).update
if str(u.state) in ("COMPLETED", "FAILED", "CANCELED"):
print(f"Update {u.update_id}: {u.state}")
break
time.sleep(10)
```
Edit `.sql` / `.py` files under `src/`, then re-run `databricks bundle deploy` + `databricks bundle run`. Bundle deploy uploads changed files as raw `FILE` entries. Don't mix `databricks workspace import --format SOURCE` into a bundle-managed pipeline — that creates a NOTEBOOK entry and subsequent bundle deploys fail with `type mismatch (asked: FILE, actual: NOTEBOOK)`.

---

## Migrating from a Manual Folder Structure

If the user already has `bronze/`, `silver/`, `gold/` folders without a bundle, migrate to workflow A by wrapping them in a `databricks.yml` and a pipeline resource pointing at the existing folders via a `glob`. No file moves required — the medallion folders work as-is under `transformations/**`.
If the user already has `bronze/`, `silver/`, `gold/` folders without a bundle, migrate to Workflow A by wrapping them in a `databricks.yml` and a pipeline resource pointing at the existing folders via a `glob`. No file moves required — the medallion folders work as-is under `transformations/**`.

For detailed pipeline configuration options (development mode, continuous, custom event log, notifications, Python deps, classic clusters), see [pipeline-configuration.md](pipeline-configuration.md).

---

Expand All @@ -355,3 +267,4 @@ If the user already has `bronze/`, `silver/`, `gold/` folders without a bundle,
| Files deploy but pipeline doesn't pick them up | Glob pattern in `libraries` doesn't match — re-check `include` path relative to the resource file |
| `Bundle validation failed: Invalid schema` | `databricks bundle validate`, check YAML indentation (spaces, not tabs) |
| Files deploy but pipeline config stale | `databricks bundle deploy --force` |
| `Authentication error` on deploy | `databricks configure --host https://<workspace>.cloud.databricks.com` or set `DATABRICKS_HOST` / `DATABRICKS_TOKEN` |
Loading