diff --git a/manifest.json b/manifest.json index bc7836a..8a1f3fc 100644 --- a/manifest.json +++ b/manifest.json @@ -1,12 +1,12 @@ { "version": "2", - "updated_at": "2026-05-11T13:22:07Z", + "updated_at": "2026-05-18T12:22:38Z", "skills": { "databricks-apps": { "version": "0.1.1", "description": "Databricks Apps development and deployment (evaluates analytics vs synced tables data access)", "experimental": false, - "updated_at": "2026-05-11T13:22:01Z", + "updated_at": "2026-05-18T12:11:44Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -33,7 +33,7 @@ "version": "0.1.0", "description": "Core Databricks skill for CLI, auth, and data exploration", "experimental": false, - "updated_at": "2026-05-11T10:22:59Z", + "updated_at": "2026-05-18T12:11:44Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -48,7 +48,7 @@ "version": "0.0.0", "description": "Declarative Automation Bundles (DABs) for deploying and managing Databricks resources", "experimental": false, - "updated_at": "2026-05-05T15:31:42Z", + "updated_at": "2026-05-18T12:11:44Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -66,7 +66,7 @@ "version": "0.1.0", "description": "Databricks Jobs orchestration and scheduling", "experimental": false, - "updated_at": "2026-05-07T15:19:50Z", + "updated_at": "2026-05-18T12:11:44Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -78,7 +78,7 @@ "version": "0.1.0", "description": "Databricks Lakebase Postgres: projects, scaling, connectivity, synced tables, and Data API", "experimental": false, - "updated_at": "2026-05-11T10:23:05Z", + "updated_at": "2026-05-18T12:11:44Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -93,7 +93,7 @@ "version": "0.1.0", "description": "Databricks Model Serving endpoint management", "experimental": false, - "updated_at": "2026-05-07T15:19:45Z", + "updated_at": "2026-05-18T12:11:44Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -105,7 +105,7 @@ "version": "0.1.0", "description": "Databricks Pipelines (DLT) for ETL and streaming", "experimental": false, - "updated_at": "2026-05-07T15:19:55Z", + "updated_at": "2026-05-18T12:11:44Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -152,7 +152,7 @@ "version": "0.1.0", "description": "Migrate Databricks workloads from classic compute to serverless compute, including compatibility checks and concrete fixes", "experimental": false, - "updated_at": "2026-05-07T15:19:59Z", + "updated_at": "2026-05-18T12:22:14Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -164,6 +164,22 @@ "references/networking-and-security.md", "references/streaming-migration.md" ] + }, + "databricks-serverless-storage-check": { + "version": "0.1.0", + "description": "Detect cross-task file-sharing antipatterns in serverless jobs (writes to /local_disk0, /tmp, or trustedTemp that are read by sibling or child tasks) and recommend UC Volumes or /Workspace handoff", + "experimental": false, + "updated_at": "2026-05-18T12:21:52Z", + "files": [ + "SKILL.md", + "agents/openai.yaml", + "assets/databricks.png", + "assets/databricks.svg", + "references/pattern-catalog.md", + "references/remediation-guide.md", + "scripts/preflight.py", + "scripts/test_preflight.py" + ] } } } diff --git a/scripts/skills.py b/scripts/skills.py index cdfdcf7..b1865b1 100644 --- a/scripts/skills.py +++ b/scripts/skills.py @@ -48,6 +48,10 @@ "description": "Migrate Databricks workloads from classic compute to serverless compute, including compatibility checks and concrete fixes", "experimental": False, }, + "databricks-serverless-storage-check": { + "description": "Detect cross-task file-sharing antipatterns in serverless jobs (writes to /local_disk0, /tmp, or trustedTemp that are read by sibling or child tasks) and recommend UC Volumes or /Workspace handoff", + "experimental": False, + }, } diff --git a/skills/databricks-serverless-migration/SKILL.md b/skills/databricks-serverless-migration/SKILL.md index 859d01a..97f69e4 100644 --- a/skills/databricks-serverless-migration/SKILL.md +++ b/skills/databricks-serverless-migration/SKILL.md @@ -118,7 +118,7 @@ Scan the code for patterns that are incompatible with the serverless compute arc | Pattern | Severity | Fix | |---------|----------|-----| | `dbfs:/` or `/dbfs/` paths (persistent data) | Blocker | Replace with `/Volumes//schema/volume/path` | -| `dbfs:/tmp/`, `/dbfs/tmp/`, paths with `cache`/`scratch`/`temp` | Warning | Use `/tmp/` or `/local_disk0/tmp/` (local driver disk) — do not use Volumes for temp files due to performance | +| `dbfs:/tmp/`, `/dbfs/tmp/`, paths with `cache`/`scratch`/`temp` | Warning | Use `/tmp/` or `/local_disk0/tmp/` (local driver disk) — do not use Volumes for temp files due to performance. **Per-task scratch only**: if another task (child notebook, sibling job task, or pipeline) needs to read the file, use UC Volumes or `/Workspace` — see [`databricks-serverless-storage-check`](../databricks-serverless-storage-check/SKILL.md). | | `file:///dbfs/` FUSE mount paths | Warning | Replace persistent paths with `/Volumes/...`; replace temp paths with `/local_disk0/tmp/` | | `dbutils.fs.mount(...)` | Blocker | Create UC external location + external volume | | `hive_metastore.db.table` | Warning | Migrate to UC or use HMS Federation: `CREATE FOREIGN CATALOG ... USING CONNECTION hms_connection` | diff --git a/skills/databricks-serverless-storage-check/SKILL.md b/skills/databricks-serverless-storage-check/SKILL.md new file mode 100644 index 0000000..9ecef20 --- /dev/null +++ b/skills/databricks-serverless-storage-check/SKILL.md @@ -0,0 +1,149 @@ +--- +name: databricks-serverless-storage-check +description: "Detect cross-task file-sharing antipatterns in Databricks serverless jobs (writes to /local_disk0, /tmp, or trustedTemp that are read by sibling or child tasks on potentially different compute nodes) and recommend UC Volumes or /Workspace for handoff. Use when a serverless job fails with `INTERNAL_ERROR: [Errno 13] Permission denied` on /local_disk0 paths, when parallel child notebooks fail intermittently, when reviewing a DAB job before deploying to serverless, or when the user mentions trustedTemp, fan-out, or cross-task file handoff. Complements databricks-serverless-migration (which covers single-notebook migration)." +compatibility: Requires databricks CLI (>= v0.292.0) for --job-id and --run-id modes; --notebook / --dir / --job-yaml modes have no external dependencies. +metadata: + version: "0.1.0" +parent: databricks-core +--- + +# Serverless Storage Check + +**FIRST**: Use the parent `databricks-core` skill for CLI basics, authentication, and profile selection. + +This skill detects a specific class of serverless failure: **cross-task file handoffs through local disk**. On serverless compute, each task may run on a different node, so a path written by a parent task to `/local_disk0`, `/tmp`, or a `trustedTemp` directory is not guaranteed to be visible to a child task. The typical symptom is: + +``` +INTERNAL_ERROR: [Errno 13] Permission denied: +'/local_disk0/spark-/trustedTemp-/tmp' +``` + +The fix is to move the handoff off local disk and onto durable, cross-node storage — UC Volumes (preferred) or `/Workspace` (fallback) — or replace the file handoff entirely with `dbutils.jobs.taskValues` for small payloads. + +This skill ships an executable preflight scanner (`scripts/preflight.py`) that statically detects these antipatterns and emits remediation guidance. It is intentionally narrow: it does **not** try to fix `ENVIRONMENT_SETUP_ERROR.PYTHON_NOTEBOOK_ENVIRONMENT`, which is a separate, platform-side intermittent issue (see "What this skill does NOT cover" below). + +## When to use this skill + +Use this skill when any of these triggers appear: + +- A serverless job fails with `INTERNAL_ERROR: [Errno 13] Permission denied` on `/local_disk0`, `/tmp`, or a path containing `trustedTemp` +- Parallel child notebooks (`dbutils.notebook.run`) fail intermittently while the same logic succeeds when run sequentially in a single notebook +- A DAB job is about to be deployed to serverless and has multiple `notebook_task` or `pipeline_task` tasks +- The user mentions "trustedTemp", "fan-out", "cross-task file sharing", or `/local_disk0` +- A new serverless job design needs a sanity check before first run + +This skill is **complementary to**, not a replacement for, [`databricks-serverless-migration`](../databricks-serverless-migration/SKILL.md). That skill handles single-notebook migration and explicitly recommends `/local_disk0/tmp` for per-task scratch — which is correct *inside* a task. The boundary between the two skills: + +| Concern | Use skill | +|---------|-----------| +| Migrating one notebook from classic DBR to serverless | `databricks-serverless-migration` | +| Per-task scratch storage (intra-task) | `databricks-serverless-migration` (recommends `/local_disk0/tmp`) | +| **Cross-task file handoff between parent/child notebooks or sibling tasks** | **this skill** | +| Permission-denied on `/local_disk0` during a multi-task run | **this skill** | + +## Quick start + +Run the preflight scanner against any of: a single notebook, a directory, a DAB job YAML, a remote job, or a failed run. + +```bash +# Single notebook +python3 scripts/preflight.py --notebook path/to/notebook.ipynb + +# Recursive scan of a directory +python3 scripts/preflight.py --dir path/to/repo/ + +# A DAB job YAML (auto-resolves referenced notebooks) +python3 scripts/preflight.py --job-yaml resources/my_job.job.yml + +# A remote job (pulls notebook source via databricks workspace export) +python3 scripts/preflight.py --job-id 123456789 --profile DEFAULT + +# A failed run (classifies the error trace as fan-out vs env-sync) +python3 scripts/preflight.py --run-id 987654321 --profile DEFAULT + +# Machine-readable output for CI gating +python3 scripts/preflight.py --dir . --json +``` + +## Interpreting the output + +The scanner prints findings grouped by severity. Each finding includes the pattern ID, file, line, code snippet, and a recommended fix snippet. + +| Severity | Meaning | Exit code | +|----------|---------|-----------| +| **Blocker** | Will fail on serverless. Must fix before deploy. | `2` | +| **Warning** | Likely to fail under parallel execution. Should fix. | `1` | +| **Info** | Awareness-only or escalation routing (e.g. env-sync error). | `0` | + +Clean scans exit `0`. Use `--json` for CI: pipe to `jq` or fail builds when blockers are found. + +## The core rule + +The boundary between safe and unsafe local-disk use on serverless: + +> **Local disk (`/local_disk0`, `/tmp`, `trustedTemp`) is per-task only.** Anything one task writes that another task reads MUST live on `/Volumes` or `/Workspace`. + +This is verbatim from the BSI thread guidance: when the parent task writes to local disk and the child task tries to read it, the child may be on a different node and the file won't exist (or will hit `Permission denied`). See [`references/remediation-guide.md`](references/remediation-guide.md) for concrete before/after patterns. + +## Pattern catalog (summary) + +| ID | Severity | What it detects | +|----|----------|-----------------| +| `FANOUT001` | Blocker | Local-disk path written then passed to `dbutils.notebook.run`, `taskValues.set`, or job-task parameter | +| `FANOUT002` | Blocker | Child notebook reads from `/local_disk0` or `/tmp` via widget, parameter, or `taskValues.get` | +| `FANOUT003` | Warning | DAB job with multiple sibling tasks referencing the same local-disk path | +| `FANOUT004` | Warning | `pipeline_task` immediately downstream of a `notebook_task` that wrote to local temp | +| `FANOUT005` | Info | `dbutils.fs.cp` from local path to local path inside a multi-task job (heuristic) | +| `FANOUT006` | Blocker | Hardcoded path matching the BSI signature `/local_disk0/spark-*/trustedTemp/...` | +| `ENV001` | Info | Run output contains `ENVIRONMENT_SETUP_ERROR.PYTHON_NOTEBOOK_ENVIRONMENT` — route to escalation | + +Full rules, sample matches, and per-pattern fixes are in [`references/pattern-catalog.md`](references/pattern-catalog.md). + +## Remediation summary + +When the scanner flags a finding, prefer fixes in this order: + +1. **UC Volumes** (preferred): `/Volumes////handoff//...` + - Durable, cross-node, UC-governed, works for any file size + - Requires `WRITE FILES` on the volume and a parent that creates the volume per run or per job + +2. **`/Workspace`** (fallback): `/Workspace/Shared//handoff/...` + - Durable and cross-node, no UC dependency + - Best for smaller files; subject to workspace storage limits + +3. **`dbutils.jobs.taskValues`** (small payloads only): no file at all + - For scalars and small JSON (well under 48 KB total per run) + - Replaces the file entirely — preferred when the handoff is just a parameter, config, or summary + +4. **Keep `/local_disk0/tmp`** for **intra-task scratch only**. Never for cross-task. + +Full before/after code is in [`references/remediation-guide.md`](references/remediation-guide.md). + +## What this skill does NOT cover + +The original BSI thread combined two distinct failures. This skill addresses only the storage one. The other failure, `ENVIRONMENT_SETUP_ERROR.PYTHON_NOTEBOOK_ENVIRONMENT` / "Virtual environment changed while syncing", is a rare, platform-side issue that the Databricks team treats as an engineering escalation. The scanner detects it in `--run-id` mode and emits an `ENV001` info finding routing the user to support, but does not attempt to fix it. + +If the scanner emits `ENV001`: + +1. Open a Databricks engineering support ticket (use the `/jira-actions` skill or `/support-escalation` if available) with the run ID and error trace +2. As a temporary mitigation, reduce dependency setup during child notebook startup (move heavy `%pip install` to the parent or a job-level environment spec) +3. Add retries on the affected task — the error is usually transient + +## Related skills + +- [`databricks-serverless-migration`](../databricks-serverless-migration/SKILL.md) — single-notebook classic-to-serverless migration. **Use that skill first** if the workload hasn't been migrated yet. +- [`databricks-dabs`](../databricks-dabs/SKILL.md) — DAB structure and resource definitions. Use when authoring or fixing the `job.yml` flagged by `FANOUT003` or `FANOUT004`. +- [`databricks-jobs`](../databricks-jobs/SKILL.md) — Lakeflow Jobs orchestration. Use when restructuring task dependencies to avoid the fan-out antipattern. +- [`databricks-core`](../databricks-core/SKILL.md) — parent skill for CLI auth and profile selection. + +## Reference docs + +- [Pattern catalog](references/pattern-catalog.md) — all detection rules with examples +- [Remediation guide](references/remediation-guide.md) — before/after code for Volumes, Workspace, and taskValues handoffs + +## External documentation + +- [Serverless compute limitations](https://docs.databricks.com/en/compute/serverless/limitations) — official local-disk scoping rules +- [Unity Catalog volumes](https://docs.databricks.com/en/connect/unity-catalog/volumes.html) — the preferred handoff target +- [Workspace files](https://docs.databricks.com/en/files/workspace.html) — the fallback handoff target +- [`dbutils.jobs.taskValues`](https://docs.databricks.com/en/dev-tools/databricks-utils.html#task-values-utility-dbutilsjobstaskvalues) — for non-file handoffs diff --git a/skills/databricks-serverless-storage-check/agents/openai.yaml b/skills/databricks-serverless-storage-check/agents/openai.yaml new file mode 100644 index 0000000..082e5b9 --- /dev/null +++ b/skills/databricks-serverless-storage-check/agents/openai.yaml @@ -0,0 +1,7 @@ +interface: + display_name: "Databricks Serverless Storage Check" + short_description: "Detect cross-task local-disk handoffs in serverless jobs" + icon_small: "./assets/databricks.svg" + icon_large: "./assets/databricks.png" + brand_color: "#FF3621" + default_prompt: "Use $databricks-serverless-storage-check to scan a serverless job, notebook, or DAB for cross-task file handoffs through /local_disk0, /tmp, or trustedTemp." diff --git a/skills/databricks-serverless-storage-check/assets/databricks.png b/skills/databricks-serverless-storage-check/assets/databricks.png new file mode 100644 index 0000000..263fe98 Binary files /dev/null and b/skills/databricks-serverless-storage-check/assets/databricks.png differ diff --git a/skills/databricks-serverless-storage-check/assets/databricks.svg b/skills/databricks-serverless-storage-check/assets/databricks.svg new file mode 100644 index 0000000..9d19110 --- /dev/null +++ b/skills/databricks-serverless-storage-check/assets/databricks.svg @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/skills/databricks-serverless-storage-check/references/pattern-catalog.md b/skills/databricks-serverless-storage-check/references/pattern-catalog.md new file mode 100644 index 0000000..78a34fd --- /dev/null +++ b/skills/databricks-serverless-storage-check/references/pattern-catalog.md @@ -0,0 +1,198 @@ +# Pattern Catalog + +All detection rules used by `scripts/preflight.py`. Each pattern has a stable ID, a severity, a description of what it matches, an example that triggers it, the recommended fix, and the underlying detection rule. + +The preflight scanner is intentionally conservative — false-negatives on heavily dynamic code (paths built from many variables at runtime) are expected. False positives should be rare. If you hit one, the scanner accepts the finding being ignored at the call site; please file an issue with a minimal repro. + +## Severity scale + +| Severity | Meaning | +|----------|---------| +| **Blocker** | The job WILL fail on serverless under realistic execution. Must fix before deploying. Contributes to exit code `2`. | +| **Warning** | The job is likely to fail under parallel execution or fan-out. Should fix. Contributes to exit code `1`. | +| **Info** | Awareness-only or escalation routing (e.g. env-sync error). Does not affect exit code. | + +## Patterns + +### FANOUT001 — Local-disk path passed to a child call + +**Severity**: Blocker + +**What it matches**: A string literal (or a variable bound to a string literal) starting with `/local_disk0`, `/tmp`, `/dbfs/tmp`, or containing `trustedTemp` is passed as an argument to one of: + +- `dbutils.notebook.run(notebook, timeout, args)` +- `dbutils.jobs.taskValues.set(key=..., value=path)` +- `dbutils.task_values.set(...)` (legacy spelling) + +The path may be passed directly, inside a dict literal (`{"handoff_path": tmp}`), or inside a list/tuple/set literal. + +**Example that triggers it**: + +```python +tmp = "/local_disk0/scratch/output.parquet" +df.write.parquet(tmp) +dbutils.notebook.run("./child", 600, {"handoff_path": tmp}) +``` + +**Fix**: + +```python +import uuid +handoff = f"/Volumes/main/analytics/handoffs/{uuid.uuid4()}.parquet" +df.write.parquet(handoff) +dbutils.notebook.run("./child", 600, {"handoff_path": handoff}) +``` + +**Detection rule**: AST visitor on `ast.Call`. `_call_qualname` matches `dbutils.notebook.run`, `dbutils.jobs.taskValues.set`, or `dbutils.task_values.set`. `_string_args` resolves Name nodes via the cell's variable map and recurses into dict/list literals to find string values. Each resolved string is tested with `is_local_disk_path()`. + +### FANOUT002 — Child notebook reads from a local-disk path + +**Severity**: Blocker + +**What it matches**: A notebook that pulls parameters via `dbutils.widgets.get` or `dbutils.jobs.taskValues.get` (suggesting it's a child) also performs a read (`open(path)`, `pd.read_*`, `spark.read.*`) from a `/local_disk0`, `/tmp`, or `trustedTemp` path. + +**Example that triggers it**: + +```python +dbutils.widgets.text("handoff_path", "") +path = dbutils.widgets.get("handoff_path") +df = pd.read_parquet("/local_disk0/scratch/input.parquet") +``` + +**Fix**: The parent must write to durable storage (`/Volumes` or `/Workspace`), and the child must read from the same. Pass the durable path via the parameter. + +**Detection rule**: The scanner flags a cell as `is_likely_child` if any cell in the notebook uses `dbutils.widgets.get`, `dbutils.jobs.taskValues.get`, or `dbutils.task_values.get`. In a likely-child notebook, any read target (resolved via `_read_targets_in_cell`) matching `is_local_disk_path()` triggers this finding. + +### FANOUT003 — Sibling tasks share a local-disk path + +**Severity**: Warning + +**What it matches**: A DAB job YAML defines two or more sibling tasks (or a task and one of its descendants) whose referenced notebooks both touch the same `/local_disk0`, `/tmp`, or `trustedTemp` path. + +**Example that triggers it**: + +```yaml +resources: + jobs: + my_job: + tasks: + - task_key: producer + notebook_task: + notebook_path: ./producer.py + - task_key: consumer + depends_on: [{ task_key: producer }] + notebook_task: + notebook_path: ./consumer.py +``` + +```python +shared = "/tmp/foo.parquet" +pd.DataFrame({"x": [1]}).to_parquet(shared) +``` + +```python +shared = "/tmp/foo.parquet" +df = pd.read_parquet(shared) +``` + +**Fix**: Move the shared artifact to `/Volumes/...` or `/Workspace/...` and update both notebooks. + +**Detection rule**: `scan_job_yaml` resolves each task's `notebook_path`, runs the per-cell scanner on every referenced notebook, and collects the set of local-disk paths each notebook touches (writes, reads, child-call args, or bare string literals). When two or more task keys overlap on the same path, the finding fires. + +### FANOUT004 — `pipeline_task` downstream of a local-temp-writing notebook + +**Severity**: Warning + +**What it matches**: A DAB task with `pipeline_task: ...` whose `depends_on` includes a `notebook_task` that writes to a local-disk path. + +**Example that triggers it**: + +```yaml +tasks: + - task_key: prep + notebook_task: + notebook_path: ./prep.py # writes to /tmp/staging.parquet + - task_key: run_pipeline + depends_on: [{ task_key: prep }] + pipeline_task: + pipeline_id: 12345 +``` + +**Fix**: Have `prep` write to a UC Volume that the pipeline ingests via Auto Loader or a streaming table, or materialize the prep output as a table the pipeline reads from. + +**Detection rule**: For each task with `is_pipeline_task == True`, if any upstream `depends_on` task's notebook contains a local-disk write (recorded in `notebook_local_paths`), the finding fires. + +### FANOUT005 — `dbutils.fs.cp` from local path to local path + +**Severity**: Info + +**What it matches**: A `dbutils.fs.cp(src, dst)` or `dbutils.fs.mv(src, dst)` call where both arguments resolve to local-disk paths. + +**Example that triggers it**: + +```python +dbutils.fs.cp("/local_disk0/staging/x.parquet", "/tmp/cache/x.parquet") +``` + +**Fix**: Safe within a single task. If the notebook is invoked by a multi-task job, change one side to `/Volumes/...` or `/Workspace/...` so the destination is visible to other tasks. + +**Detection rule**: AST visitor matches `dbutils.fs.cp` and `dbutils.fs.mv` calls with two string args that both pass `is_local_disk_path()`. + +### FANOUT006 — Hardcoded BSI trustedTemp signature + +**Severity**: Blocker + +**What it matches**: Any string anywhere in the source that matches the regex `/local_disk0/spark-[A-Za-z0-9\-]+/trustedTemp[A-Za-z0-9\-]*`. This is the exact path family that produced the original BSI failure: + +``` +/local_disk0/spark-d6bae111-42bd-4f54-9136-a4e9fbdec3d6/trustedTemp-55adadbe-d9ed-4278-a751-868797c1562f/tmpc58fz4pv +``` + +**Example that triggers it**: + +```python +tmp = "/local_disk0/spark-abc/trustedTemp-def/handoff.parquet" +``` + +**Fix**: Never hardcode a `trustedTemp` path. The full path is a runtime-internal Spark scratch location; if you depend on it from another task, the path will exist on a different node from where you wrote it. Use `/Volumes/...` or `/Workspace/...` for any cross-task data. + +**Detection rule**: A tree-walk over every string `Constant` node in every cell tests `is_bsi_signature()` (which uses the `BSI_TRUSTED_TEMP_RE` regex). Triggers regardless of whether the string is in an assignment, a call arg, or a free expression. + +### ENV001 — `ENVIRONMENT_SETUP_ERROR.PYTHON_NOTEBOOK_ENVIRONMENT` in run output + +**Severity**: Info + +**What it matches**: `--run-id` mode only. The run's error trace contains `ENVIRONMENT_SETUP_ERROR.PYTHON_NOTEBOOK_ENVIRONMENT` (often accompanied by "Virtual environment changed while syncing"). + +**Why this is info-only**: Per the BSI thread (Philip Nord), this error is a rare, platform-side intermittent issue. There is no customer-side fix this skill can apply. The scanner emits this finding to route the user to escalation rather than mislead them into a code change. + +**Fix**: + +1. Open a Databricks engineering support ticket (use `/jira-actions` or `/support-escalation`) with the run ID and error trace. +2. As a mitigation, reduce dependency setup during child notebook startup. Move heavy `%pip install` into the parent or into a job-level environment spec where possible. +3. Add task retries — the error is usually transient and the next run typically succeeds. + +**Detection rule**: `--run-id` mode shells out to `databricks jobs get-run-output` and tests the combined `error` + `error_trace` text against `ENV_SYNC_RE`. + +## False-positive escape hatch + +If a finding is genuinely safe in your workload (rare, but possible — e.g. you have a single-task notebook where `/local_disk0` use is fine), the simplest mitigation is to wrap the path construction so the literal doesn't appear in source: + +```python +# Hidden from the static scanner; only do this when you've verified the +# context is genuinely single-task. +import os +LOCAL_SCRATCH = os.environ.get("LOCAL_SCRATCH_ROOT", "/local_disk0/tmp") +``` + +The scanner does not resolve `os.environ.get()`, so paths constructed this way are skipped. Prefer fixing the antipattern when possible; this is an explicit opt-out, not a recommendation. + +## Adding a new pattern + +To add a new detection rule: + +1. Add the rule logic to `_NotebookScanner` (cell-scoped) or `scan_job_yaml` (DAB-scoped) in `scripts/preflight.py`. +2. Append a new entry to this catalog with: ID (next `FANOUT###` or topical prefix), severity, what-it-matches, example, fix, detection rule. +3. Add a unit test in `scripts/test_preflight.py` that exercises a triggering fixture and asserts the expected finding. +4. Update the summary table in `SKILL.md`. +5. Run `python3 scripts/test_preflight.py` — must still pass cleanly. diff --git a/skills/databricks-serverless-storage-check/references/remediation-guide.md b/skills/databricks-serverless-storage-check/references/remediation-guide.md new file mode 100644 index 0000000..515d4dd --- /dev/null +++ b/skills/databricks-serverless-storage-check/references/remediation-guide.md @@ -0,0 +1,272 @@ +# Remediation Guide + +Concrete before/after patterns for fixing the antipatterns flagged by `scripts/preflight.py`. Choose the fix that matches your handoff payload size and governance requirements. + +## Decision tree + +``` +What is the handoff? + + Small scalar or JSON (< ~48 KB total per run) + → use dbutils.jobs.taskValues (no file at all) + + A file + Need UC governance / large files / Delta tables? + → /Volumes////handoff/... (PREFERRED) + Smaller files, no UC required, simpler permissions? + → /Workspace/Shared//handoff/... (FALLBACK) + + Same-task scratch only + → /local_disk0/tmp/... is FINE (and recommended) +``` + +## Fix 1 — UC Volumes handoff (preferred) + +Use a Volume for any cross-task file. Volumes are durable, cross-node, UC-governed, and work for any file size. + +### Setup (one-time, per workload) + +```sql +CREATE VOLUME IF NOT EXISTS main.analytics.job_handoffs; +GRANT WRITE VOLUME ON VOLUME main.analytics.job_handoffs TO ``; +GRANT READ VOLUME ON VOLUME main.analytics.job_handoffs TO ``; +``` + +### Before — broken (FANOUT001 + FANOUT006) + +```python +import pandas as pd + +tmp = "/local_disk0/spark-abc/trustedTemp-def/handoff.parquet" +pd.DataFrame({"x": [1, 2, 3]}).to_parquet(tmp) + +dbutils.notebook.run("./child", 600, {"handoff_path": tmp}) +``` + +### After — durable Volumes handoff + +```python +import pandas as pd + +run_id = dbutils.notebook.entry_point.getDbutils().notebook().getContext().jobId().get() +handoff = f"/Volumes/main/analytics/job_handoffs/run_{run_id}/data.parquet" + +dbutils.fs.mkdirs(f"/Volumes/main/analytics/job_handoffs/run_{run_id}") +pd.DataFrame({"x": [1, 2, 3]}).to_parquet(handoff) + +dbutils.notebook.run("./child", 600, {"handoff_path": handoff}) +``` + +### Cleanup (optional, end-of-job task) + +```python +# Remove this run's handoff directory at the end of the job +import shutil +run_dir = f"/Volumes/main/analytics/job_handoffs/run_{run_id}" +shutil.rmtree(run_dir, ignore_errors=True) +``` + +### Permission notes + +- The job's run-as identity needs `WRITE VOLUME` on the producing side and `READ VOLUME` on the consuming side. +- For ad-hoc development from a notebook, the calling user needs the same grants. +- Volume paths are accessible from Python (`open`, `pd.read_*`, `shutil.*`), Spark (`spark.read.*`), and shell commands (`cat`, `ls`). +- Lifecycle: Volumes persist until you delete them. Plan for cleanup if your job produces many handoff directories. + +## Fix 2 — `/Workspace` handoff (fallback) + +Use `/Workspace` files when UC is not available or when the file is small and ephemeral. Files written under `/Workspace` are durable and visible across nodes, but subject to workspace storage quotas and not designed for high-throughput I/O. + +### Before — broken + +```python +import json + +tmp = "/tmp/config.json" +with open(tmp, "w") as f: + json.dump({"feature_flags": ["a", "b"]}, f) + +dbutils.notebook.run("./apply_config", 600, {"config_path": tmp}) +``` + +### After — Workspace handoff + +```python +import json, os + +handoff_dir = "/Workspace/Shared/my_job/handoff" +os.makedirs(handoff_dir, exist_ok=True) +handoff = f"{handoff_dir}/config.json" + +with open(handoff, "w") as f: + json.dump({"feature_flags": ["a", "b"]}, f) + +dbutils.notebook.run("./apply_config", 600, {"config_path": handoff}) +``` + +### Permission notes + +- The run-as identity needs **CAN_EDIT** on `/Workspace/Shared/my_job/` (or whichever folder you write to). +- `/Workspace` is workspace-scoped: the same path is **not** visible from a different workspace. +- Keep files under `/Workspace` modest in size (megabytes, not gigabytes). For large data, use Volumes. + +## Fix 3 — `dbutils.jobs.taskValues` for small payloads + +If the handoff is a scalar, a small dict, or a small JSON blob, skip the file entirely. Task values are designed for this and avoid all the storage concerns. + +### Before — broken + +```python +# In parent task +import json +status = {"records_processed": 12345, "skipped": 2} +with open("/tmp/status.json", "w") as f: + json.dump(status, f) +``` + +```python +# In child task +import json +with open("/tmp/status.json") as f: # FANOUT002: child reads from /tmp + status = json.load(f) +``` + +### After — taskValues handoff + +```python +# In parent task +dbutils.jobs.taskValues.set(key="records_processed", value=12345) +dbutils.jobs.taskValues.set(key="status", value={"records_processed": 12345, "skipped": 2}) +``` + +```python +# In child task — referencing the parent task by key +records = dbutils.jobs.taskValues.get( + taskKey="parent_task", + key="records_processed", + debugValue=0, +) +status = dbutils.jobs.taskValues.get( + taskKey="parent_task", + key="status", + debugValue={}, +) +``` + +### Limits + +- Per-task-value: 48 KB serialized JSON +- Per-run total across all task values: 5 MB +- Types: any JSON-serializable Python value (str, int, float, bool, list, dict, None) +- The `debugValue` is required and is used when running the notebook interactively (outside a job) + +## Fix 4 — `pipeline_task` downstream of a notebook (FANOUT004) + +When a pipeline task depends on a notebook task, don't try to hand off via a local-disk path. The pipeline runs in its own context. + +### Before — broken + +```yaml +tasks: + - task_key: prep + notebook_task: + notebook_path: ./prep_data.py # writes /tmp/staging.parquet + - task_key: run_pipeline + depends_on: [{ task_key: prep }] + pipeline_task: + pipeline_id: 12345 # tries to read /tmp/staging.parquet +``` + +### After — Volumes-based handoff + +Update the notebook: + +```python +# prep_data.py +dest = "/Volumes/main/raw/staging/run_42/data.parquet" +df.write.format("parquet").mode("overwrite").save(dest) +``` + +Update the pipeline to read from the volume: + +```python +# In the pipeline notebook (DLT / SDP) +import dlt + +@dlt.table +def staging(): + return spark.read.format("parquet").load("/Volumes/main/raw/staging/run_42/data.parquet") +``` + +For incremental ingest, prefer Auto Loader over a single-path read: + +```python +@dlt.table +def staging(): + return ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "parquet") + .load("/Volumes/main/raw/staging/") + ) +``` + +## What NOT to do — anti-examples + +These are the exact patterns the scanner exists to catch. Do not use any of them for cross-task data. + +### Anti-pattern 1: parent writes to trustedTemp, child reads + +```python +# Parent +tmp = "/local_disk0/spark-abc/trustedTemp-def/handoff.parquet" # FANOUT006 +df.write.parquet(tmp) # writes to local node only +dbutils.notebook.run("./child", 600, {"handoff_path": tmp}) # FANOUT001 +``` + +```python +# Child +path = dbutils.widgets.get("handoff_path") +df = pd.read_parquet(path) # FANOUT002 — child likely runs on a different node +``` + +### Anti-pattern 2: sibling tasks share `/tmp` + +```yaml +tasks: + - task_key: producer + notebook_task: { notebook_path: ./producer.py } # writes /tmp/foo.parquet + - task_key: consumer + depends_on: [{ task_key: producer }] + notebook_task: { notebook_path: ./consumer.py } # reads /tmp/foo.parquet — FANOUT003 +``` + +### Anti-pattern 3: cleanup that depends on local state across tasks + +```python +# Final cleanup task +import shutil +shutil.rmtree("/local_disk0/scratch/") # only cleans this node; other nodes are untouched +``` + +The "cleanup" task may run on a node that never saw the scratch directory. Either move scratch to `/Volumes` and clean that, or skip the cleanup task entirely (local disk is reclaimed when the task ends). + +## When `/local_disk0/tmp` IS fine + +For completeness: local-disk paths are correct, and recommended, for **per-task scratch** that doesn't outlive the task. + +```python +# OK on serverless: temporary intermediate inside a single task +scratch = "/local_disk0/tmp/intermediate.parquet" +df.write.parquet(scratch) +# ... use scratch later in the SAME task ... +post = spark.read.parquet(scratch) +``` + +The boundary is: does another task — child notebook, sibling task, pipeline — need to read this file? If yes, it must live on `/Volumes` or `/Workspace`. If no, `/local_disk0/tmp` is the right answer. + +## Reference + +- [Unity Catalog volumes overview](https://docs.databricks.com/en/connect/unity-catalog/volumes.html) +- [Workspace files](https://docs.databricks.com/en/files/workspace.html) +- [`dbutils.jobs.taskValues`](https://docs.databricks.com/en/dev-tools/databricks-utils.html#task-values-utility-dbutilsjobstaskvalues) +- [Serverless compute limitations](https://docs.databricks.com/en/compute/serverless/limitations) diff --git a/skills/databricks-serverless-storage-check/scripts/preflight.py b/skills/databricks-serverless-storage-check/scripts/preflight.py new file mode 100644 index 0000000..7d6b64d --- /dev/null +++ b/skills/databricks-serverless-storage-check/scripts/preflight.py @@ -0,0 +1,1240 @@ +#!/usr/bin/env python3 +"""Serverless storage preflight: detect cross-task local-disk handoffs. + +Scans Databricks notebooks, directories, DAB job YAML, or remote jobs/runs +for the antipattern where one task writes to /local_disk0, /tmp, or a +trustedTemp directory and another task reads from it. On serverless +compute, tasks may run on different nodes, so these handoffs fail with +`INTERNAL_ERROR: [Errno 13] Permission denied`. + +Stdlib only. Optional `databricks` CLI for --job-id / --run-id modes. + +Usage: + preflight.py --notebook PATH [--json] + preflight.py --dir PATH [--json] + preflight.py --job-yaml PATH [--json] + preflight.py --job-id ID --profile NAME [--json] + preflight.py --run-id ID --profile NAME [--json] + +Exit codes: + 0 clean (or info-only findings) + 1 warnings found + 2 blockers found +""" + +from __future__ import annotations + +import argparse +import ast +import json +import re +import subprocess +import sys +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Iterable, Iterator + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +# Local-disk path roots that are unsafe for cross-task sharing on serverless. +LOCAL_DISK_PREFIXES = ( + "/local_disk0", + "/tmp", + "/dbfs/tmp", + "dbfs:/tmp", +) + +# Exact BSI signature: /local_disk0/spark-/trustedTemp-/... +BSI_TRUSTED_TEMP_RE = re.compile( + r"/local_disk0/spark-[A-Za-z0-9\-]+/trustedTemp[A-Za-z0-9\-]*" +) + +# Generic trustedTemp anywhere in the path. +TRUSTED_TEMP_RE = re.compile(r"trustedTemp[A-Za-z0-9\-]*") + +# Durable, cross-node storage roots. Paths starting with these are safe. +SAFE_PREFIXES = ("/Volumes/", "/Workspace/") + +# Calls that move data to a child task / sibling. If a local-disk path +# flows into one of these, that's a cross-task handoff. +CHILD_CALL_NAMES = { + "dbutils.notebook.run", + "dbutils.jobs.taskValues.set", + "dbutils.task_values.set", +} + +# Calls that pull from a parent task. If the value is then used as a path +# starting with /local_disk0 or /tmp, the parent must have written it there. +PARENT_PULL_NAMES = { + "dbutils.widgets.get", + "dbutils.jobs.taskValues.get", + "dbutils.task_values.get", +} + +# Env-sync error signature (run-id mode only). +ENV_SYNC_RE = re.compile( + r"ENVIRONMENT_SETUP_ERROR\.PYTHON_NOTEBOOK_ENVIRONMENT" +) + +# Databricks notebook cell delimiter for .py source format. +PY_CELL_DELIM_RE = re.compile(r"^# COMMAND -+\s*$", re.MULTILINE) +PY_MAGIC_RE = re.compile(r"^# MAGIC %(\w+)\s*(.*)$", re.MULTILINE) + + +# --------------------------------------------------------------------------- +# Finding model +# --------------------------------------------------------------------------- + +SEVERITY_BLOCKER = "blocker" +SEVERITY_WARNING = "warning" +SEVERITY_INFO = "info" + +SEVERITY_ORDER = { + SEVERITY_BLOCKER: 2, + SEVERITY_WARNING: 1, + SEVERITY_INFO: 0, +} + + +@dataclass +class Finding: + pattern_id: str + severity: str + file: str + line: int + snippet: str + message: str + fix: str + + def to_dict(self) -> dict: + return asdict(self) + + +# --------------------------------------------------------------------------- +# Path classification helpers +# --------------------------------------------------------------------------- + + +def is_local_disk_path(value: str) -> bool: + """True if the string looks like a local-disk path on Databricks compute.""" + if not isinstance(value, str) or not value: + return False + if TRUSTED_TEMP_RE.search(value): + return True + for prefix in LOCAL_DISK_PREFIXES: + if value == prefix or value.startswith(prefix + "/"): + return True + return False + + +def is_bsi_signature(value: str) -> bool: + """True if the string matches the exact BSI trustedTemp signature.""" + return bool(isinstance(value, str) and BSI_TRUSTED_TEMP_RE.search(value)) + + +def is_safe_path(value: str) -> bool: + """True if the string is a durable, cross-node storage path.""" + return isinstance(value, str) and any( + value.startswith(p) for p in SAFE_PREFIXES + ) + + +# --------------------------------------------------------------------------- +# Notebook source extraction +# --------------------------------------------------------------------------- + + +@dataclass +class PythonCell: + """A Python code block extracted from a notebook, with its source offset.""" + + code: str + start_line: int # 1-indexed line in the original file + + +def extract_python_cells(file_path: Path) -> list[PythonCell]: + """Return Python code cells from a .py or .ipynb notebook. + + For .py (Databricks source format), splits on `# COMMAND -----` and + keeps only cells that are Python (no leading `# MAGIC %sql/%scala/%r`). + For .ipynb, returns cells with `cell_type == "code"`. Magic-only cells + (those that start with `%sql`, `%pip`, etc.) are skipped from AST + analysis but remain visible to regex scans elsewhere. + """ + suffix = file_path.suffix.lower() + text = file_path.read_text(encoding="utf-8", errors="replace") + + if suffix == ".ipynb": + return _extract_ipynb_cells(text) + return _extract_py_cells(text) + + +def _extract_py_cells(text: str) -> list[PythonCell]: + cells: list[PythonCell] = [] + pos = 0 + line = 1 + parts = PY_CELL_DELIM_RE.split(text) + for part in parts: + stripped = part.lstrip("\n") + leading = len(part) - len(stripped) + # Skip cells whose first non-blank line is a magic that isn't %python. + first_nonblank = next( + (ln for ln in stripped.splitlines() if ln.strip()), + "", + ) + magic = PY_MAGIC_RE.match(first_nonblank) + if magic and magic.group(1) not in ("python", "py"): + line += part.count("\n") + continue + # Strip Databricks `# MAGIC ` prefixes from any python magic lines + # so the remainder is valid Python for ast.parse. + cleaned = "\n".join( + re.sub(r"^# MAGIC ?", "", ln) for ln in stripped.splitlines() + ) + if cleaned.strip(): + cells.append(PythonCell(code=cleaned, start_line=line + leading)) + line += part.count("\n") + return cells + + +def _extract_ipynb_cells(text: str) -> list[PythonCell]: + try: + nb = json.loads(text) + except json.JSONDecodeError: + return [] + cells: list[PythonCell] = [] + synthetic_line = 1 + for cell in nb.get("cells", []): + if cell.get("cell_type") != "code": + synthetic_line += 1 + continue + source = cell.get("source", "") + if isinstance(source, list): + source = "".join(source) + first_nonblank = next( + (ln for ln in source.splitlines() if ln.strip()), + "", + ) + if first_nonblank.startswith("%") and not first_nonblank.startswith( + "%python" + ): + synthetic_line += source.count("\n") + 1 + continue + cleaned = "\n".join( + ln[len("%python") :] if ln.startswith("%python") else ln + for ln in source.splitlines() + ) + if cleaned.strip(): + cells.append(PythonCell(code=cleaned, start_line=synthetic_line)) + synthetic_line += source.count("\n") + 1 + return cells + + +# --------------------------------------------------------------------------- +# AST visitor +# --------------------------------------------------------------------------- + + +def _attr_chain(node: ast.AST) -> str | None: + """Return a dotted name for an ast.Attribute chain like a.b.c, else None.""" + parts: list[str] = [] + cur = node + while isinstance(cur, ast.Attribute): + parts.append(cur.attr) + cur = cur.value + if isinstance(cur, ast.Name): + parts.append(cur.id) + return ".".join(reversed(parts)) + return None + + +def _call_qualname(node: ast.Call) -> str | None: + """Return a dotted callable name like `dbutils.notebook.run`, else None.""" + return _attr_chain(node.func) if isinstance(node.func, ast.Attribute) else ( + node.func.id if isinstance(node.func, ast.Name) else None + ) + + +def _resolve_string( + node: ast.AST, var_map: dict[str, str] +) -> str | None: + """Return a string value for a constant or a Name bound to a constant.""" + if isinstance(node, ast.Constant) and isinstance(node.value, str): + return node.value + if isinstance(node, ast.Name) and node.id in var_map: + return var_map[node.id] + return None + + +def _string_args( + node: ast.Call, var_map: dict[str, str] | None = None +) -> list[tuple[str, int]]: + """Yield (value, line) for every string positional/keyword arg. + + Resolves Name nodes against `var_map` (assignments earlier in the cell) + and recurses into dict/list/tuple/set literals so paths passed via + `{"k": tmp}` or `[tmp]` are still detected. + """ + vm = var_map or {} + out: list[tuple[str, int]] = [] + + def _collect(value_node: ast.AST, lineno: int) -> None: + s = _resolve_string(value_node, vm) + if s is not None: + out.append((s, lineno)) + return + if isinstance(value_node, (ast.List, ast.Tuple, ast.Set)): + for elt in value_node.elts: + _collect(elt, getattr(elt, "lineno", lineno)) + elif isinstance(value_node, ast.Dict): + for k, v in zip(value_node.keys, value_node.values): + if k is not None: + _collect(k, getattr(k, "lineno", lineno)) + if v is not None: + _collect(v, getattr(v, "lineno", lineno)) + + for arg in node.args: + _collect(arg, arg.lineno) + for kw in node.keywords: + if kw.value is not None: + _collect(kw.value, kw.value.lineno) + return out + + +def _build_var_map(tree: ast.AST) -> dict[str, str]: + """Build name -> string-literal map from top-level and nested Assigns.""" + out: dict[str, str] = {} + for node in ast.walk(tree): + if not isinstance(node, ast.Assign): + continue + if not ( + isinstance(node.value, ast.Constant) and isinstance(node.value.value, str) + ): + continue + for target in node.targets: + if isinstance(target, ast.Name): + out[target.id] = node.value.value + return out + + +def _all_string_constants(tree: ast.AST) -> Iterator[tuple[str, int]]: + """Yield (value, lineno) for every string Constant anywhere in the tree.""" + for node in ast.walk(tree): + if isinstance(node, ast.Constant) and isinstance(node.value, str): + yield node.value, node.lineno + + +class _NotebookScanner(ast.NodeVisitor): + """Collects local-disk writes, child calls, and parent pulls in a cell.""" + + def __init__(self, cell: PythonCell, file_path: str): + self.cell = cell + self.file = file_path + self.var_map: dict[str, str] = {} + self.local_writes: list[tuple[str, int, str]] = [] # (path, line, snippet) + self.child_calls: list[tuple[str, int, str, str]] = [] # (path, line, snippet, callname) + self.parent_reads: list[tuple[str, int, str]] = [] # (path, line, snippet) + self.fs_cp_local_to_local: list[tuple[str, str, int, str]] = [] + self.bsi_hits: list[tuple[str, int, str]] = [] + self.all_local_paths: set[str] = set() + + # ---- entrypoint ---- + def scan(self) -> None: + try: + tree = ast.parse(self.cell.code) + except SyntaxError: + return + self.var_map = _build_var_map(tree) + # Walk every string constant in the cell once. This catches BSI + # signatures bound to variables (e.g. `tmp = "/local_disk0/.../trustedTemp/..."`) + # and seeds the "this cell touches these local paths" set used by + # the DAB sibling-sharing analysis. + for value, lineno in _all_string_constants(tree): + if is_local_disk_path(value): + self.all_local_paths.add(value) + if is_bsi_signature(value): + self.bsi_hits.append( + (value, self._real_line(lineno), self._snippet(lineno)) + ) + # Also include resolved variable values in case the constant is + # only an attribute of a longer chain we missed. + for value in self.var_map.values(): + if is_local_disk_path(value): + self.all_local_paths.add(value) + self.visit(tree) + + # ---- helpers ---- + def _real_line(self, lineno: int) -> int: + return self.cell.start_line + lineno - 1 + + def _snippet(self, lineno: int) -> str: + lines = self.cell.code.splitlines() + if 1 <= lineno <= len(lines): + return lines[lineno - 1].strip() + return "" + + # ---- visitors ---- + def visit_Call(self, node: ast.Call) -> None: + callname = _call_qualname(node) + strings = _string_args(node, self.var_map) + + # Child calls (parent writes flowing out) + if callname in CHILD_CALL_NAMES: + for s, ln in strings: + if is_local_disk_path(s): + self.child_calls.append( + (s, self._real_line(ln), self._snippet(ln), callname) + ) + + # File writes to local-disk paths (open(..., "w"), pandas to_*, spark.write.*) + write_path = _detect_write_target(node, callname, self.var_map) + if write_path is not None: + value, lineno = write_path + if is_local_disk_path(value): + self.local_writes.append( + (value, self._real_line(lineno), self._snippet(lineno)) + ) + self.all_local_paths.add(value) + + # dbutils.fs.cp local-to-local (heuristic) + if callname in ("dbutils.fs.cp", "dbutils.fs.mv"): + cp_strings = [s for s, _ in strings if isinstance(s, str)] + if ( + len(cp_strings) >= 2 + and is_local_disk_path(cp_strings[0]) + and is_local_disk_path(cp_strings[1]) + ): + ln = strings[0][1] + self.fs_cp_local_to_local.append( + ( + cp_strings[0], + cp_strings[1], + self._real_line(ln), + self._snippet(ln), + ) + ) + + self.generic_visit(node) + + def visit_Assign(self, node: ast.Assign) -> None: + # Detect: x = dbutils.widgets.get("path"); open(x); etc. + # We approximate: if RHS is a parent-pull call and the variable is + # later used as a path argument to open() or a read_* call, that + # would be FANOUT002. Without dataflow, we surface a softer signal: + # if RHS is a parent-pull AND any local-disk string literal exists + # in the same cell as a read target, we'll catch it via direct + # string-literal reads below. + self.generic_visit(node) + + +def _detect_write_target( + node: ast.Call, callname: str | None, var_map: dict[str, str] +) -> tuple[str, int] | None: + """Return (path_string, lineno) if the call writes to a path, else None. + + Resolves Name args via `var_map` so writes through a local variable + (e.g. `tmp = "/local_disk0/..."; pd.DataFrame(...).to_parquet(tmp)`) + are still detected. + """ + if callname is None: + return None + + def _resolve(arg: ast.AST) -> str | None: + return _resolve_string(arg, var_map) + + # open(path, "w"|"wb"|"a"|...) + if callname == "open" and node.args: + mode = None + for arg in node.args[1:]: + if isinstance(arg, ast.Constant) and isinstance(arg.value, str): + mode = arg.value + break + for kw in node.keywords: + if kw.arg == "mode" and isinstance(kw.value, ast.Constant): + if isinstance(kw.value.value, str): + mode = kw.value.value + if mode and any(c in mode for c in ("w", "a", "x")): + s = _resolve(node.args[0]) + if s is not None: + return s, node.args[0].lineno + + # spark.write.* / DataFrame.write.* (heuristic: any call whose name + # ends in .save / .saveAsTable / .parquet / .csv / .json / .text / .orc / + # .delta / .insertInto with a string arg) + write_terminals = { + "save", + "saveAsTable", + "parquet", + "csv", + "json", + "text", + "orc", + } + last = callname.split(".")[-1] + if last in write_terminals and node.args: + s = _resolve(node.args[0]) + if s is not None: + return s, node.args[0].lineno + + # pandas: df.to_csv, df.to_parquet, df.to_json, df.to_pickle + if last.startswith("to_") and node.args: + s = _resolve(node.args[0]) + if s is not None: + return s, node.args[0].lineno + + # shutil.copy / copyfile / move (dest is arg 1) + if callname in ("shutil.copy", "shutil.copyfile", "shutil.move") and len(node.args) >= 2: + s = _resolve(node.args[1]) + if s is not None: + return s, node.args[1].lineno + + # dbutils.fs.put(path, contents, overwrite?) + if callname == "dbutils.fs.put" and node.args: + s = _resolve(node.args[0]) + if s is not None: + return s, node.args[0].lineno + + return None + + +# --------------------------------------------------------------------------- +# Per-file analysis +# --------------------------------------------------------------------------- + + +def _read_targets_in_cell(scanner: _NotebookScanner) -> list[tuple[str, int, str]]: + """Best-effort detection of reads from local-disk string literals. + + Catches open(path, "r"), pd.read_*, spark.read.* with string args. + Resolves Name args via the scanner's var_map. + """ + out: list[tuple[str, int, str]] = [] + try: + tree = ast.parse(scanner.cell.code) + except SyntaxError: + return out + + read_terminals = { + "parquet", + "csv", + "json", + "text", + "orc", + "table", + "load", + } + vm = scanner.var_map + + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + callname = _call_qualname(node) + strings = _string_args(node, vm) + if callname == "open": + mode = "r" + for arg in node.args[1:]: + if isinstance(arg, ast.Constant) and isinstance(arg.value, str): + mode = arg.value + break + if "r" in mode and not any(c in mode for c in ("w", "a", "x")): + for s, ln in strings[:1]: + if is_local_disk_path(s): + out.append((s, scanner._real_line(ln), scanner._snippet(ln))) + scanner.all_local_paths.add(s) + elif callname and callname.split(".")[-1] in read_terminals: + for s, ln in strings[:1]: + if is_local_disk_path(s): + out.append((s, scanner._real_line(ln), scanner._snippet(ln))) + scanner.all_local_paths.add(s) + elif callname and callname.split(".")[-1].startswith("read_"): + for s, ln in strings[:1]: + if is_local_disk_path(s): + out.append((s, scanner._real_line(ln), scanner._snippet(ln))) + scanner.all_local_paths.add(s) + return out + + +def scan_notebook(file_path: Path) -> list[Finding]: + """Scan a single notebook and emit FANOUT findings.""" + findings: list[Finding] = [] + rel = str(file_path) + cells = extract_python_cells(file_path) + + has_child_call_anywhere = False + has_local_write_anywhere = False + has_local_read_anywhere = False + + cell_scanners: list[_NotebookScanner] = [] + for cell in cells: + scanner = _NotebookScanner(cell, rel) + scanner.scan() + cell_scanners.append(scanner) + if scanner.child_calls: + has_child_call_anywhere = True + if scanner.local_writes: + has_local_write_anywhere = True + + for scanner in cell_scanners: + # FANOUT006 — BSI signature (always blocker, regardless of context) + for path, line, snippet in scanner.bsi_hits: + findings.append( + Finding( + pattern_id="FANOUT006", + severity=SEVERITY_BLOCKER, + file=rel, + line=line, + snippet=snippet, + message=( + f"Hardcoded path matches the exact BSI trustedTemp " + f"signature: {path!r}. This is a known-bad cross-node " + f"path on serverless." + ), + fix=( + "Replace with /Volumes////" + "handoff//... or /Workspace/Shared//...; " + "see references/remediation-guide.md." + ), + ) + ) + + # FANOUT001 — local-disk path passed to a child call + for path, line, snippet, callname in scanner.child_calls: + findings.append( + Finding( + pattern_id="FANOUT001", + severity=SEVERITY_BLOCKER, + file=rel, + line=line, + snippet=snippet, + message=( + f"Local-disk path {path!r} passed to {callname}. " + f"Child tasks may run on a different node and will " + f"hit Permission denied." + ), + fix=( + "Write the handoff to /Volumes///" + "/... or /Workspace/Shared/... and pass that " + "path instead. For small payloads, use " + "dbutils.jobs.taskValues with no file." + ), + ) + ) + + # FANOUT002 — local-disk read in a notebook that is also called by a parent + # We can't see the caller statically, so we surface reads of /local_disk0 + # or /tmp as warnings when they appear in a notebook that ALSO contains + # widgets/taskValues.get (suggesting it's a child notebook). + is_likely_child = any( + re.search(r"dbutils\.(widgets|jobs\.taskValues|task_values)\.get", + c.code) + for c in cells + ) + for path, line, snippet in _read_targets_in_cell(scanner): + if is_likely_child: + findings.append( + Finding( + pattern_id="FANOUT002", + severity=SEVERITY_BLOCKER, + file=rel, + line=line, + snippet=snippet, + message=( + f"Child notebook reads from local-disk path " + f"{path!r}. On serverless, the parent task that " + f"wrote this file may have run on a different node." + ), + fix=( + "Have the parent write to /Volumes/... or " + "/Workspace/... and read from there. For scalars " + "and small JSON, use dbutils.jobs.taskValues." + ), + ) + ) + + # FANOUT005 — dbutils.fs.cp local→local in a multi-task context (heuristic) + for src, dst, line, snippet in scanner.fs_cp_local_to_local: + findings.append( + Finding( + pattern_id="FANOUT005", + severity=SEVERITY_INFO, + file=rel, + line=line, + snippet=snippet, + message=( + f"dbutils.fs.cp from {src!r} to {dst!r} — both on local " + f"disk. Safe within a single task only." + ), + fix=( + "If this notebook is invoked by a multi-task job, use " + "/Volumes/... or /Workspace/... for cross-task data." + ), + ) + ) + + return findings + + +def scan_path(target: Path) -> list[Finding]: + """Scan a single notebook or a directory of notebooks.""" + findings: list[Finding] = [] + if target.is_file(): + if target.suffix.lower() in (".py", ".ipynb"): + findings.extend(scan_notebook(target)) + return findings + if target.is_dir(): + for path in sorted(target.rglob("*")): + if path.suffix.lower() in (".py", ".ipynb"): + findings.extend(scan_notebook(path)) + return findings + + +# --------------------------------------------------------------------------- +# DAB YAML analysis +# --------------------------------------------------------------------------- + + +def _try_load_yaml(text: str) -> dict | None: + try: + import yaml # type: ignore + except ImportError: + return None + try: + return yaml.safe_load(text) + except Exception: + return None + + +def _leading_spaces(line: str) -> int: + """Count leading spaces. Treats a tab as 4 spaces (good enough for DABs).""" + n = 0 + for ch in line: + if ch == " ": + n += 1 + elif ch == "\t": + n += 4 + else: + break + return n + + +def _minimal_yaml_tasks(text: str) -> list[dict]: + """Stdlib-only fallback: extract a flat task list from a DAB YAML. + + Indent-aware. The top-level task indent is the column of the first + `- task_key:` line under `tasks:`. Any subsequent `- task_key:` line + at a DEEPER indent is treated as a depends_on entry, not a new task. + """ + tasks: list[dict] = [] + in_tasks = False + tasks_indent: int | None = None # indent of `tasks:` keyword + task_item_indent: int | None = None # indent of `- task_key:` lines + cur: dict | None = None + in_depends = False + depends_indent: int | None = None + + for raw in text.splitlines(): + line = raw.rstrip() + if not line: + continue + indent = _leading_spaces(line) + + # `tasks:` declaration + if re.match(r"^\s*tasks\s*:\s*$", line): + in_tasks = True + tasks_indent = indent + task_item_indent = None + cur = None + continue + + if not in_tasks: + continue + + # Left the tasks: block (we hit something at <= tasks_indent that + # isn't a child of tasks:). + if tasks_indent is not None and indent <= tasks_indent and not re.match( + r"^\s*tasks\s*:\s*$", line + ): + if cur is not None: + tasks.append(cur) + cur = None + in_tasks = False + continue + + # New top-level task entry + m = re.match(r"^(\s*)-\s*task_key\s*:\s*(\S+)\s*$", line) + if m and (task_item_indent is None or indent == task_item_indent): + if cur is not None: + tasks.append(cur) + task_item_indent = indent + cur = {"task_key": m.group(2).strip("\"'"), "depends_on": []} + in_depends = False + continue + + if cur is None: + continue + + # Enter / leave depends_on block + if re.match(r"^\s*depends_on\s*:\s*$", line): + in_depends = True + depends_indent = indent + continue + if in_depends and depends_indent is not None and indent <= depends_indent: + in_depends = False + + # depends_on entries: `- task_key: X` deeper than depends_indent + if in_depends: + m = re.match(r"^\s*-\s*task_key\s*:\s*(\S+)\s*$", line) + if m: + cur["depends_on"].append(m.group(1).strip("\"'")) + continue + + # Task-level keys + m = re.match(r"^\s*notebook_path\s*:\s*(\S+)\s*$", line) + if m: + cur["notebook_path"] = m.group(1).strip("\"'") + continue + m = re.match(r"^\s*pipeline_id\s*:\s*(\S+)\s*$", line) + if m: + cur["pipeline_id"] = m.group(1) + continue + if re.match(r"^\s*pipeline_task\s*:\s*$", line): + cur["is_pipeline_task"] = True + continue + + if cur is not None: + tasks.append(cur) + return tasks + + +def _tasks_from_loaded(doc: dict) -> list[dict]: + """Extract task dicts from a loaded DAB YAML doc.""" + out: list[dict] = [] + if not isinstance(doc, dict): + return out + resources = doc.get("resources") or {} + jobs = (resources.get("jobs") or {}) if isinstance(resources, dict) else {} + if not isinstance(jobs, dict): + return out + for job_def in jobs.values(): + if not isinstance(job_def, dict): + continue + for task in job_def.get("tasks") or []: + if not isinstance(task, dict): + continue + entry = { + "task_key": task.get("task_key"), + "depends_on": [ + d.get("task_key") + for d in (task.get("depends_on") or []) + if isinstance(d, dict) + ], + } + notebook = task.get("notebook_task") or {} + if isinstance(notebook, dict) and "notebook_path" in notebook: + entry["notebook_path"] = notebook["notebook_path"] + if "pipeline_task" in task: + entry["is_pipeline_task"] = True + out.append(entry) + return out + + +def scan_job_yaml(yaml_path: Path) -> list[Finding]: + """Scan a DAB job YAML for sibling-task local-disk sharing patterns.""" + findings: list[Finding] = [] + text = yaml_path.read_text(encoding="utf-8", errors="replace") + + doc = _try_load_yaml(text) + tasks = _tasks_from_loaded(doc) if doc else _minimal_yaml_tasks(text) + + # Resolve referenced notebooks (relative to the YAML's parent dir or + # to the bundle root, taking the simplest interpretation). + base = yaml_path.parent + bundle_root_candidates = [base, base.parent] + referenced: list[Path] = [] + for task in tasks: + nb = task.get("notebook_path") + if not nb: + continue + # Strip the .py/.ipynb suffix if missing; try both. + for root in bundle_root_candidates: + for ext in ("", ".py", ".ipynb"): + candidate = (root / (nb.lstrip("./") + ext)).resolve() + if candidate.exists(): + referenced.append(candidate) + break + else: + continue + break + + # Scan referenced notebooks for any local-disk paths the notebook + # touches (writes, reads, child-call args, or bare string literals). + notebook_local_paths: dict[Path, set[str]] = {} + for nb_path in referenced: + paths: set[str] = set() + for cell in extract_python_cells(nb_path): + scanner = _NotebookScanner(cell, str(nb_path)) + scanner.scan() + _read_targets_in_cell(scanner) # populates scanner.all_local_paths + paths.update(scanner.all_local_paths) + notebook_local_paths[nb_path] = paths + + # Per-notebook findings still apply when scanning a job. + findings.extend(scan_notebook(nb_path)) + + # FANOUT003 — sibling tasks share a local-disk path + path_to_tasks: dict[str, list[str]] = {} + for task in tasks: + nb = task.get("notebook_path") + if not nb: + continue + for resolved, paths in notebook_local_paths.items(): + if nb.lstrip("./") in resolved.as_posix(): + for p in paths: + path_to_tasks.setdefault(p, []).append(task["task_key"]) + + for path, keys in path_to_tasks.items(): + unique_keys = sorted(set(k for k in keys if k)) + if len(unique_keys) > 1: + findings.append( + Finding( + pattern_id="FANOUT003", + severity=SEVERITY_WARNING, + file=str(yaml_path), + line=0, + snippet=f"tasks: {', '.join(unique_keys)}", + message=( + f"Multiple sibling tasks reference local-disk path " + f"{path!r}. On serverless, these tasks may run on " + f"different nodes and cannot share local files." + ), + fix=( + "Move the shared artifact to /Volumes/... or " + "/Workspace/... and update both tasks to use that path." + ), + ) + ) + + # FANOUT004 — pipeline_task downstream of notebook_task that wrote local + task_by_key = {t.get("task_key"): t for t in tasks if t.get("task_key")} + notebook_wrote_local: set[str] = set() + for task in tasks: + key = task.get("task_key") + nb = task.get("notebook_path") + if not key or not nb: + continue + for resolved, paths in notebook_local_paths.items(): + if nb.lstrip("./") in resolved.as_posix() and paths: + notebook_wrote_local.add(key) + break + for task in tasks: + if not task.get("is_pipeline_task"): + continue + upstream = task.get("depends_on") or [] + if any(u in notebook_wrote_local for u in upstream): + findings.append( + Finding( + pattern_id="FANOUT004", + severity=SEVERITY_WARNING, + file=str(yaml_path), + line=0, + snippet=f"pipeline_task {task.get('task_key')} depends_on {upstream}", + message=( + f"pipeline_task {task.get('task_key')!r} depends on a " + f"notebook_task that wrote to local disk. The pipeline " + f"will not see those files." + ), + fix=( + "Have the upstream notebook write to /Volumes/... and " + "configure the pipeline to read from that location." + ), + ) + ) + + return findings + + +# --------------------------------------------------------------------------- +# Remote modes (--job-id, --run-id) — shell out to databricks CLI +# --------------------------------------------------------------------------- + + +def _databricks_cli(args: list[str], profile: str) -> str: + """Run `databricks` CLI with the given profile, return stdout.""" + cmd = ["databricks"] + args + ["--profile", profile, "--output", "json"] + result = subprocess.run( + cmd, check=False, capture_output=True, text=True, timeout=60 + ) + if result.returncode != 0: + raise RuntimeError( + f"databricks CLI failed: {' '.join(cmd)}\n{result.stderr}" + ) + return result.stdout + + +def scan_remote_job(job_id: str, profile: str) -> list[Finding]: + """Pull notebook source for every task in a remote job and scan.""" + raw = _databricks_cli(["jobs", "get", "--job-id", job_id], profile) + job = json.loads(raw) + tasks = (job.get("settings") or {}).get("tasks") or [] + + tmp_dir = Path("/tmp") / f"preflight-job-{job_id}" + tmp_dir.mkdir(parents=True, exist_ok=True) + + findings: list[Finding] = [] + notebook_paths: dict[str, Path] = {} + for task in tasks: + nb = (task.get("notebook_task") or {}).get("notebook_path") + if not nb: + continue + local = tmp_dir / (task["task_key"] + ".py") + try: + _databricks_cli( + [ + "workspace", + "export", + nb, + "--format", + "SOURCE", + "--file", + str(local), + ], + profile, + ) + except RuntimeError as exc: + findings.append( + Finding( + pattern_id="FANOUT000", + severity=SEVERITY_INFO, + file=nb, + line=0, + snippet="", + message=f"Could not export {nb}: {exc}", + fix="Verify the notebook path and your CLI permissions.", + ) + ) + continue + notebook_paths[task["task_key"]] = local + findings.extend(scan_notebook(local)) + + return findings + + +def scan_run_output(run_id: str, profile: str) -> list[Finding]: + """Pull run output and classify the error trace as fan-out vs env-sync.""" + raw = _databricks_cli(["jobs", "get-run-output", "--run-id", run_id], profile) + payload = json.loads(raw) + error = (payload.get("error") or "") + "\n" + (payload.get("error_trace") or "") + + findings: list[Finding] = [] + if ENV_SYNC_RE.search(error): + findings.append( + Finding( + pattern_id="ENV001", + severity=SEVERITY_INFO, + file=f"run/{run_id}", + line=0, + snippet="ENVIRONMENT_SETUP_ERROR.PYTHON_NOTEBOOK_ENVIRONMENT", + message=( + "The run failed with the rare, platform-side env-sync " + "error. This skill does not fix this — escalate to " + "Databricks engineering support." + ), + fix=( + "Open an ES ticket (use /jira-actions or /support-" + "escalation) with the run ID and full error trace. As a " + "mitigation, reduce dependency setup during child " + "notebook startup and add task retries." + ), + ) + ) + + bsi_hits = BSI_TRUSTED_TEMP_RE.findall(error) + for hit in bsi_hits: + findings.append( + Finding( + pattern_id="FANOUT006", + severity=SEVERITY_BLOCKER, + file=f"run/{run_id}", + line=0, + snippet=hit, + message=( + f"Run output contains the BSI trustedTemp signature " + f"{hit!r}. This is the cross-task local-disk antipattern." + ), + fix=( + "Locate the task that wrote to /local_disk0/spark-.../" + "trustedTemp-... and rewrite the handoff to use " + "/Volumes/... or /Workspace/..." + ), + ) + ) + + # Generic permission-denied on local-disk path + perm_re = re.compile( + r"Permission denied:\s*['\"]?(/local_disk0[^'\"\s]*|/tmp/[^'\"\s]*)" + ) + for m in perm_re.finditer(error): + path = m.group(1) + # Skip if already covered by FANOUT006 above. + if BSI_TRUSTED_TEMP_RE.search(path): + continue + findings.append( + Finding( + pattern_id="FANOUT001", + severity=SEVERITY_BLOCKER, + file=f"run/{run_id}", + line=0, + snippet=f"Permission denied: {path}", + message=( + f"Run failed with Permission denied on local-disk path " + f"{path!r}. Likely a cross-task handoff." + ), + fix=( + "Identify the writing task and move the handoff to " + "/Volumes/... or /Workspace/..." + ), + ) + ) + + return findings + + +# --------------------------------------------------------------------------- +# Output formatting +# --------------------------------------------------------------------------- + + +def format_human(findings: list[Finding]) -> str: + if not findings: + return "No serverless storage issues found.\n" + + by_sev: dict[str, list[Finding]] = { + SEVERITY_BLOCKER: [], + SEVERITY_WARNING: [], + SEVERITY_INFO: [], + } + for f in findings: + by_sev[f.severity].append(f) + + out: list[str] = [] + out.append( + f"Serverless storage preflight: {len(findings)} finding(s) " + f"({len(by_sev[SEVERITY_BLOCKER])} blocker, " + f"{len(by_sev[SEVERITY_WARNING])} warning, " + f"{len(by_sev[SEVERITY_INFO])} info)" + ) + out.append("=" * 72) + + label = { + SEVERITY_BLOCKER: "BLOCKER", + SEVERITY_WARNING: "WARNING", + SEVERITY_INFO: "INFO", + } + for sev in (SEVERITY_BLOCKER, SEVERITY_WARNING, SEVERITY_INFO): + items = by_sev[sev] + if not items: + continue + out.append("") + out.append(f"[{label[sev]}] {len(items)} finding(s)") + out.append("-" * 72) + for f in items: + location = ( + f"{f.file}:{f.line}" if f.line else f.file + ) + out.append(f" [{f.pattern_id}] {location}") + if f.snippet: + out.append(f" > {f.snippet}") + out.append(f" {f.message}") + out.append(f" Fix: {f.fix}") + out.append("") + return "\n".join(out) + + +def format_json(findings: list[Finding]) -> str: + payload = { + "findings": [f.to_dict() for f in findings], + "summary": { + "blocker": sum(1 for f in findings if f.severity == SEVERITY_BLOCKER), + "warning": sum(1 for f in findings if f.severity == SEVERITY_WARNING), + "info": sum(1 for f in findings if f.severity == SEVERITY_INFO), + "total": len(findings), + }, + } + return json.dumps(payload, indent=2) + + +def exit_code_for(findings: list[Finding]) -> int: + if any(f.severity == SEVERITY_BLOCKER for f in findings): + return 2 + if any(f.severity == SEVERITY_WARNING for f in findings): + return 1 + return 0 + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + prog="preflight.py", + description=( + "Detect cross-task local-disk handoffs in Databricks serverless " + "jobs and notebooks." + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + mode = p.add_mutually_exclusive_group(required=True) + mode.add_argument("--notebook", type=Path, help="Scan a single .py or .ipynb") + mode.add_argument("--dir", type=Path, help="Recursively scan a directory") + mode.add_argument("--job-yaml", type=Path, help="Scan a DAB job YAML") + mode.add_argument("--job-id", type=str, help="Scan a remote job by ID") + mode.add_argument("--run-id", type=str, help="Classify a failed run's error trace") + p.add_argument( + "--profile", + type=str, + default="DEFAULT", + help="Databricks CLI profile (required for --job-id / --run-id)", + ) + p.add_argument("--json", action="store_true", help="Machine-readable output") + return p + + +def main(argv: list[str] | None = None) -> int: + args = build_parser().parse_args(argv) + findings: list[Finding] = [] + + if args.notebook: + findings = scan_path(args.notebook) + elif args.dir: + findings = scan_path(args.dir) + elif args.job_yaml: + findings = scan_job_yaml(args.job_yaml) + elif args.job_id: + findings = scan_remote_job(args.job_id, args.profile) + elif args.run_id: + findings = scan_run_output(args.run_id, args.profile) + + findings.sort( + key=lambda f: ( + -SEVERITY_ORDER[f.severity], + f.file, + f.line, + f.pattern_id, + ) + ) + + if args.json: + print(format_json(findings)) + else: + print(format_human(findings)) + + return exit_code_for(findings) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/skills/databricks-serverless-storage-check/scripts/test_preflight.py b/skills/databricks-serverless-storage-check/scripts/test_preflight.py new file mode 100644 index 0000000..269459e --- /dev/null +++ b/skills/databricks-serverless-storage-check/scripts/test_preflight.py @@ -0,0 +1,288 @@ +#!/usr/bin/env python3 +"""Self-test fixtures and assertions for preflight.py. + +Run with: + python3 scripts/test_preflight.py + +Exits 0 if all assertions pass, 1 otherwise. Uses only the stdlib + +preflight.py itself; no test framework dependency. +""" + +from __future__ import annotations + +import json +import sys +import tempfile +from pathlib import Path + +HERE = Path(__file__).resolve().parent +sys.path.insert(0, str(HERE)) + +import preflight # noqa: E402 + + +# --------------------------------------------------------------------------- +# Fixture content +# --------------------------------------------------------------------------- + +BSI_PARENT_NOTEBOOK = """\ +# BSI-pattern parent: writes to trustedTemp and hands off via dbutils.notebook.run +import pandas as pd + +tmp = "/local_disk0/spark-d6bae111-42bd-4f54/trustedTemp-55adadbe/handoff.parquet" +pd.DataFrame({"x": [1, 2, 3]}).to_parquet(tmp) + +dbutils.notebook.run("./child", 600, {"handoff_path": tmp}) +""" + +BSI_CHILD_NOTEBOOK = """\ +# BSI-pattern child: reads from a /local_disk0 path passed in as a widget +import pandas as pd + +dbutils.widgets.text("handoff_path", "") +path = dbutils.widgets.get("handoff_path") +df = pd.read_parquet("/local_disk0/spark-d6bae111-42bd-4f54/trustedTemp-55adadbe/handoff.parquet") +print(df) +""" + +CLEAN_VOLUMES_NOTEBOOK = """\ +# Clean: uses /Volumes for cross-task handoff +import pandas as pd + +handoff = "/Volumes/main/analytics/handoffs/run_42/data.parquet" +pd.DataFrame({"x": [1, 2, 3]}).to_parquet(handoff) + +dbutils.notebook.run("./child", 600, {"handoff_path": handoff}) +""" + +DAB_YAML_SHARED_TMP = """\ +resources: + jobs: + my_job: + name: my_job + tasks: + - task_key: producer + notebook_task: + notebook_path: ./producer.py + - task_key: consumer + depends_on: + - task_key: producer + notebook_task: + notebook_path: ./consumer.py +""" + +PRODUCER_NOTEBOOK = """\ +import pandas as pd +shared = "/tmp/foo.parquet" +pd.DataFrame({"x": [1]}).to_parquet(shared) +""" + +CONSUMER_NOTEBOOK = """\ +import pandas as pd +shared = "/tmp/foo.parquet" +df = pd.read_parquet(shared) +""" + +ENV_SYNC_RUN_OUTPUT = json.dumps( + { + "error": "ENVIRONMENT_SETUP_ERROR.PYTHON_NOTEBOOK_ENVIRONMENT", + "error_trace": "Virtual environment changed while syncing", + } +) + + +# --------------------------------------------------------------------------- +# Assertion helpers +# --------------------------------------------------------------------------- + + +class TestFailure(Exception): + pass + + +def expect(cond: bool, msg: str) -> None: + if not cond: + raise TestFailure(msg) + + +def has_finding(findings, pattern_id: str) -> bool: + return any(f.pattern_id == pattern_id for f in findings) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_bsi_pattern_blockers(): + """BSI repro: parent + child notebooks together trigger 001, 002, 006.""" + with tempfile.TemporaryDirectory() as td: + d = Path(td) + (d / "parent.py").write_text(BSI_PARENT_NOTEBOOK) + (d / "child.py").write_text(BSI_CHILD_NOTEBOOK) + + findings = preflight.scan_path(d) + expect( + has_finding(findings, "FANOUT001"), + f"expected FANOUT001 in BSI parent, got: {[f.pattern_id for f in findings]}", + ) + expect( + has_finding(findings, "FANOUT002"), + f"expected FANOUT002 in BSI child, got: {[f.pattern_id for f in findings]}", + ) + expect( + has_finding(findings, "FANOUT006"), + f"expected FANOUT006 for trustedTemp signature, got: " + f"{[f.pattern_id for f in findings]}", + ) + expect( + preflight.exit_code_for(findings) == 2, + f"expected exit code 2 for blockers, got {preflight.exit_code_for(findings)}", + ) + + +def test_clean_volumes_notebook(): + """Notebook using /Volumes produces zero findings, exit 0.""" + with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f: + f.write(CLEAN_VOLUMES_NOTEBOOK) + path = Path(f.name) + try: + findings = preflight.scan_path(path) + expect( + findings == [], + f"expected no findings, got: {[(f.pattern_id, f.snippet) for f in findings]}", + ) + expect( + preflight.exit_code_for(findings) == 0, + "expected exit 0 on clean notebook", + ) + finally: + path.unlink() + + +def test_dab_yaml_shared_tmp(): + """DAB YAML with sibling tasks reading/writing /tmp triggers FANOUT003.""" + with tempfile.TemporaryDirectory() as td: + d = Path(td) + (d / "producer.py").write_text(PRODUCER_NOTEBOOK) + (d / "consumer.py").write_text(CONSUMER_NOTEBOOK) + yaml = d / "my_job.job.yml" + yaml.write_text(DAB_YAML_SHARED_TMP) + + findings = preflight.scan_job_yaml(yaml) + expect( + has_finding(findings, "FANOUT003"), + f"expected FANOUT003 for sibling-shared /tmp, got: " + f"{[f.pattern_id for f in findings]}", + ) + # Must be at least warning severity, not silent. + expect( + preflight.exit_code_for(findings) >= 1, + "expected exit code >= 1 for sibling-shared /tmp", + ) + + +def test_env_sync_run_classification(): + """Run-output mode produces ENV001 for env-sync error trace.""" + # We don't shell out — we call scan_run_output's inner classification + # by invoking the regex path directly through a tiny shim. + error_text = ( + "ENVIRONMENT_SETUP_ERROR.PYTHON_NOTEBOOK_ENVIRONMENT\n" + "Virtual environment changed while syncing" + ) + expect( + preflight.ENV_SYNC_RE.search(error_text) is not None, + "ENV_SYNC_RE failed to match canonical env-sync error", + ) + + +def test_bsi_signature_regex(): + """BSI trustedTemp regex matches the exact thread signature.""" + canonical = ( + "/local_disk0/spark-d6bae111-42bd-4f54-9136-a4e9fbdec3d6/" + "trustedTemp-55adadbe-d9ed-4278-a751-868797c1562f/tmpc58fz4pv" + ) + expect( + preflight.is_bsi_signature(canonical), + f"is_bsi_signature() failed on canonical BSI path: {canonical}", + ) + expect( + not preflight.is_bsi_signature("/Volumes/main/x/y.parquet"), + "is_bsi_signature() false-positive on a Volumes path", + ) + + +def test_exit_code_resolution(): + """exit_code_for follows blocker > warning > info > clean ordering.""" + expect(preflight.exit_code_for([]) == 0, "empty findings should exit 0") + info = preflight.Finding("X", "info", "f", 1, "s", "m", "fix") + warn = preflight.Finding("X", "warning", "f", 1, "s", "m", "fix") + block = preflight.Finding("X", "blocker", "f", 1, "s", "m", "fix") + expect(preflight.exit_code_for([info]) == 0, "info-only should exit 0") + expect(preflight.exit_code_for([warn]) == 1, "warning should exit 1") + expect(preflight.exit_code_for([block]) == 2, "blocker should exit 2") + expect( + preflight.exit_code_for([info, warn, block]) == 2, + "mixed severities should exit at highest (2)", + ) + + +def test_json_output_shape(): + """--json output has findings and summary keys with correct counts.""" + findings = [ + preflight.Finding("A", "blocker", "f", 1, "s", "m", "fix"), + preflight.Finding("B", "warning", "f", 2, "s", "m", "fix"), + preflight.Finding("C", "info", "f", 3, "s", "m", "fix"), + ] + payload = json.loads(preflight.format_json(findings)) + expect("findings" in payload, "JSON output missing 'findings'") + expect("summary" in payload, "JSON output missing 'summary'") + expect(payload["summary"]["blocker"] == 1, "wrong blocker count") + expect(payload["summary"]["warning"] == 1, "wrong warning count") + expect(payload["summary"]["info"] == 1, "wrong info count") + expect(payload["summary"]["total"] == 3, "wrong total count") + + +# --------------------------------------------------------------------------- +# Test runner +# --------------------------------------------------------------------------- + + +TESTS = [ + test_bsi_pattern_blockers, + test_clean_volumes_notebook, + test_dab_yaml_shared_tmp, + test_env_sync_run_classification, + test_bsi_signature_regex, + test_exit_code_resolution, + test_json_output_shape, +] + + +def main() -> int: + passed = 0 + failed: list[tuple[str, str]] = [] + for test in TESTS: + try: + test() + except TestFailure as exc: + failed.append((test.__name__, str(exc))) + except Exception as exc: # noqa: BLE001 + failed.append((test.__name__, f"unexpected error: {exc!r}")) + else: + passed += 1 + print(f"PASS {test.__name__}") + + print() + print(f"{passed}/{len(TESTS)} passed") + if failed: + print() + for name, msg in failed: + print(f"FAIL {name}") + print(f" {msg}") + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main())