From d94edad59d4ce76ebd33f5aeb08f4938a337098f Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Tue, 26 May 2026 08:51:44 +0000 Subject: [PATCH 1/4] skills(execution-compute): drop bundled compute.py, document the CLI surface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per Lennart's audit on #73 (item #7): "for getting this out of experimental I think we need to just rely on the Databricks CLI." Going ahead and doing this now rather than waiting for stable promotion — the CLI already covers everything compute.py wrapped, just verbosely for the cluster-code-execution path. Removed: experimental/databricks-execution-compute/scripts/compute.py (743 lines, three subcommands). Replacement mapping: - `compute.py list-compute` (clusters/node-types/spark-versions) → `databricks clusters list/list-node-types/spark-versions` - `compute.py manage-cluster --action ...` → `databricks clusters create/start/restart/delete/permanent-delete/resize` - `compute.py execute-code --compute-type serverless` (upload+submit+ wait+cleanup) → existing `databricks workspace import` + `databricks jobs submit [--no-wait]` + `databricks jobs get-run` + `databricks jobs get-run-output` flow (already documented in SKILL.md / 2-serverless-job.md; the "Convenience wrapper" section that pointed at compute.py is removed). - `compute.py execute-code --compute-type cluster` (code execution on a running cluster with persisted context) → multi-step `databricks api post /api/1.2/contexts/create` → `/api/1.2/commands/execute` → poll `/api/1.2/commands/status` → optional `/api/1.2/contexts/destroy`. The full recipe (create context, submit, poll, fetch results, follow-up commands reusing contextId, %pip install, language switching) is in references/3-interactive-cluster.md. Side effects: - Removed the `` path-convention note from SKILL.md (was only there to reference scripts/compute.py paths). - Added a "CLI Command Map" table to SKILL.md replacing the three-row compute.py table — now covers upload/submit/run-state/ output, cluster list/get/start/stop/create, code-execution-on-cluster, and warehouses. Manifest regenerated. `python3 scripts/skills.py validate` passes. Co-authored-by: Isaac --- .../databricks-execution-compute/SKILL.md | 34 +- .../references/2-serverless-job.md | 20 +- .../references/3-interactive-cluster.md | 200 +++-- .../scripts/compute.py | 743 ------------------ manifest.json | 57 +- 5 files changed, 163 insertions(+), 891 deletions(-) delete mode 100644 experimental/databricks-execution-compute/scripts/compute.py diff --git a/experimental/databricks-execution-compute/SKILL.md b/experimental/databricks-execution-compute/SKILL.md index e7c825c..6f673dc 100644 --- a/experimental/databricks-execution-compute/SKILL.md +++ b/experimental/databricks-execution-compute/SKILL.md @@ -13,9 +13,7 @@ description: >- # Databricks Execution & Compute -Run code on Databricks. Three execution modes—choose based on workload. - -> **Path convention:** `` in examples below = the directory containing this SKILL.md. Resolve it to the absolute path in your install (e.g. `~/.claude/skills/databricks-execution-compute`). Commands like `python /scripts/compute.py ...` work from any cwd. +Run code on Databricks. Three execution modes—choose based on workload. All examples below use the Databricks CLI; install with `pip install databricks-cli` (or follow the workspace-native `databricks` quickstart) and authenticate with `databricks auth login` (see the parent `databricks-core` skill for profile setup). ## Execution Mode Decision Matrix @@ -95,10 +93,6 @@ Pure CLI flow: upload a local file as a workspace notebook, fire a one-time run Always use `dbutils.notebook.exit()` in the notebook — `print()` is not captured by `get-run-output`. For JSON results: `dbutils.notebook.exit(json.dumps({...}))` then parse `.notebook_output.result` client-side. -**Convenience wrapper.** `scripts/compute.py execute-code` does upload + submit + wait + cleanup in one command and returns a single tidy JSON: - -`python /scripts/compute.py execute-code --file /local/path/to/train.py --compute-type serverless --timeout 1500 --environments '[{"environment_key":"ml_env","spec":{"client":"4","dependencies":["scikit-learn==1.5.2","mlflow==2.22.0"]}}]' | jq '{success, state, output, error, run_id, run_page_url, execution_duration_ms}'` - ### Interactive Cluster → [reference](references/3-interactive-cluster.md) **Avoid by default — prefer Serverless Job.** Only use an interactive cluster when: @@ -108,14 +102,24 @@ Always use `dbutils.notebook.exit()` in the notebook — `print()` is no Interactive clusters are **slow to start (3-8 min)** and cost money while running. Don't start one implicitly. -## CLI Commands - -| Command | Purpose | -|---------|---------| -| `python /scripts/compute.py execute-code` | Run code on serverless or an existing cluster | -| `python /scripts/compute.py list-compute` | List clusters, node types, Spark versions | -| `python /scripts/compute.py manage-cluster` | Create/start/terminate/delete clusters (see [3-interactive-cluster.md](references/3-interactive-cluster.md)) | -| `databricks warehouses create/list` | Manage SQL warehouses | +## CLI Command Map + +All compute lifecycle and code-execution actions go through the Databricks CLI. Headline commands: + +| Action | Command | +|--------|---------| +| Upload local file as workspace notebook | `databricks workspace import --file --format SOURCE --language PYTHON --overwrite` | +| Run serverless code (upload + submit + wait) | `databricks jobs submit --json @submit.json` (see Serverless Job section above; with `--no-wait` for async) | +| Get run state / wait | `databricks jobs get-run ` (poll `.state.life_cycle_state`) | +| Fetch run output | `databricks jobs get-run-output ` | +| List clusters | `databricks clusters list --output json` | +| Get cluster details | `databricks clusters get ` | +| Start / restart / terminate cluster | `databricks clusters start/restart/delete ` | +| Permanently delete cluster | `databricks clusters permanent-delete ` | +| Create cluster | `databricks clusters create --json '{...}'` (see [3-interactive-cluster.md](references/3-interactive-cluster.md)) | +| List node types / Spark versions | `databricks clusters list-node-types` / `databricks clusters spark-versions` | +| Execute code on a running cluster | `databricks api post /api/1.2/contexts/create` + `databricks api post /api/1.2/commands/execute` (see [3-interactive-cluster.md](references/3-interactive-cluster.md)) | +| SQL warehouses | `databricks warehouses create/list/get/start/stop/edit/delete` (see SQL Warehouses below) | ### SQL Warehouses diff --git a/experimental/databricks-execution-compute/references/2-serverless-job.md b/experimental/databricks-execution-compute/references/2-serverless-job.md index d80c31e..3c1abe1 100644 --- a/experimental/databricks-execution-compute/references/2-serverless-job.md +++ b/experimental/databricks-execution-compute/references/2-serverless-job.md @@ -2,8 +2,6 @@ **Use when:** Running intensive Python code remotely (ML training, heavy processing) that doesn't need Spark, or when code shouldn't depend on the local machine staying connected. -> `` in examples = the directory containing the parent SKILL.md — substitute the absolute install path (e.g. `~/.claude/skills/databricks-execution-compute`). - ## When to Choose Serverless Job - ML model training (runs independently of local machine) @@ -85,22 +83,6 @@ dbutils.notebook.exit(json.dumps({"accuracy": 0.95, "model_path": "/Volumes/..." Max output size is 5 MB. Larger results should be written to a Volume/object store and referenced by path. -## Convenience wrapper - -`scripts/compute.py execute-code` does upload + submit + wait + cleanup in one command and returns a single JSON with `success`, `state`, `output` (the `dbutils.notebook.exit` payload), `error`, `run_id`, `run_page_url`, `execution_duration_ms`. - -Minimal: - -`python /scripts/compute.py execute-code --file train.py --compute-type serverless` - -With dependencies: - -`python /scripts/compute.py execute-code --file /path/to/train.py --compute-type serverless --timeout 1500 --environments '[{"environment_key":"ml_env","spec":{"client":"4","dependencies":["scikit-learn==1.5.2","mlflow==2.22.0","xgboost==2.1.3"]}}]'` - -Long dependency list from a file: - -`python /scripts/compute.py execute-code --file /path/to/train.py --compute-type serverless --environments @env.json` - ## Common Issues | Issue | Solution | @@ -109,7 +91,7 @@ Long dependency list from a file: | `ModuleNotFoundError` | Add the package to the environments spec with `"client": "4"` | | Dependencies listed but not installed | `"client": "1"` silently drops `dependencies`; use `"client": "4"` | | `get-run-output` returns empty `notebook_output` | You passed the parent run_id, not `.tasks[0].run_id` | -| Job times out | Default 1800 s on the script wrapper; raise `--timeout` or use `jobs submit --no-wait` + your own polling | +| Job times out | Use `databricks jobs submit --no-wait` and poll `get-run` yourself, or set `tasks[].timeout_seconds` in the submit JSON to extend the per-task limit | ## When NOT to Use diff --git a/experimental/databricks-execution-compute/references/3-interactive-cluster.md b/experimental/databricks-execution-compute/references/3-interactive-cluster.md index 7197334..2c5150b 100644 --- a/experimental/databricks-execution-compute/references/3-interactive-cluster.md +++ b/experimental/databricks-execution-compute/references/3-interactive-cluster.md @@ -2,8 +2,6 @@ **Use when:** You have an existing running cluster and need to preserve state across multiple tool calls, or need Scala/R support. -> `` in examples = the directory containing the parent SKILL.md — substitute the absolute install path (e.g. `~/.claude/skills/databricks-execution-compute`). - ## When to Choose Interactive Cluster - Multiple sequential commands where variables must persist @@ -23,7 +21,7 @@ **Starting a cluster takes 3-8 minutes and costs money.** Always check first: ```bash -python /scripts/compute.py list-compute --resource clusters +databricks clusters list --output json | jq '.[] | select(.state == "RUNNING") | {cluster_id, cluster_name, state, cluster_source}' ``` If no cluster is running, ask the user: @@ -32,135 +30,159 @@ If no cluster is running, ask the user: > 2. Use serverless (instant, no setup) > Which do you prefer?" -## Basic Usage +Filter to user-created clusters (exclude job clusters, which dominate the list on busy workspaces): + +```bash +databricks clusters list --cluster-sources UI,API --output json \ + | jq '.[] | select(.state == "RUNNING")' +``` + +## Code Execution Flow (1.2 commands API) + +The Databricks CLI doesn't ship a single "run code on a cluster" subcommand. Use the `1.2 commands` API directly via `databricks api`: + +1. **Create an execution context** (one per language per cluster; reuse across commands for state). +2. **Submit the command** — returns a `commandId`. +3. **Poll status** until `status == "Finished"` (or `Error`). +4. **(Optional) Destroy the context** when done. Contexts also expire when the cluster terminates. + +### 1. Create a context -### First Command: Creates Context +```bash +CTX=$(databricks api post /api/1.2/contexts/create --json '{ + "language": "python", + "clusterId": "1234-567890-abcdef" +}' | jq -r '.id') +echo "$CTX" # e.g. ctx_abc123 +``` + +Languages: `python`, `scala`, `sql`, `r`. You need one context per language; running `sql` requires a separate context from `python` on the same cluster. + +### 2. Submit a command ```bash -python /scripts/compute.py execute-code \ - --code "import pandas as pd; df = pd.DataFrame({'a': [1, 2, 3]}); print(df)" \ - --compute-type cluster \ - --cluster-id "1234-567890-abcdef" +CMD=$(databricks api post /api/1.2/commands/execute --json '{ + "language": "python", + "clusterId": "1234-567890-abcdef", + "contextId": "'"$CTX"'", + "command": "import pandas as pd; df = pd.DataFrame({\"a\": [1, 2, 3]}); print(df)" +}' | jq -r '.id') +echo "$CMD" ``` -Response includes `context_id` for reuse: -```json -{ - "success": true, - "output": " a\n0 1\n1 2\n2 3", - "context_id": "ctx_abc123", - "cluster_id": "1234-567890-abcdef" -} +### 3. Poll status and fetch results + +```bash +while :; do + STATUS=$(databricks api get /api/1.2/commands/status \ + --json '{"clusterId":"1234-567890-abcdef","contextId":"'"$CTX"'","commandId":"'"$CMD"'"}') + STATE=$(echo "$STATUS" | jq -r '.status') + [ "$STATE" = "Finished" ] && break + [ "$STATE" = "Error" ] && break + [ "$STATE" = "Cancelled" ] && break + sleep 2 +done +echo "$STATUS" | jq '{status, results: .results}' ``` -### Follow-up Commands: Reuse Context +`.results.resultType` indicates output type: +- `text` — `.results.data` is the captured stdout string. +- `error` — `.results.summary` has the error preamble; `.results.cause` has the traceback. +- `table` — `.results.schema` + `.results.data` (rows). + +### 4. Follow-up commands reuse the context + +State (variables, imports, `%pip install`-ed packages) persists across commands sharing the same `contextId`: ```bash -# Variables from first command still available -python /scripts/compute.py execute-code \ - --code "print(df.shape)" \ - --compute-type cluster \ - --cluster-id "1234-567890-abcdef" \ - --context-id "ctx_abc123" +CMD2=$(databricks api post /api/1.2/commands/execute --json '{ + "language": "python", + "clusterId": "1234-567890-abcdef", + "contextId": "'"$CTX"'", + "command": "print(df.shape)" +}' | jq -r '.id') +# poll as above ``` -### Auto-Select Best Running Cluster +### 5. (Optional) Destroy the context + +Contexts auto-expire when the cluster terminates. Destroy explicitly when you're done with a session: ```bash -# Get best running cluster -python /scripts/compute.py list-compute --auto-select -# Returns: {"cluster_id": "1234-567890-abcdef"} - -# Then execute on it -python /scripts/compute.py execute-code \ - --code "spark.range(100).show()" \ - --compute-type cluster \ - --cluster-id "1234-567890-abcdef" +databricks api post /api/1.2/contexts/destroy --json '{ + "clusterId": "1234-567890-abcdef", + "contextId": "'"$CTX"'" +}' ``` ## Language Support +The `language` field on context-create + command-execute controls the runtime: + ```bash # Scala -python /scripts/compute.py execute-code --code 'println("Hello")' --compute-type cluster --language scala --cluster-id ... - +databricks api post /api/1.2/contexts/create --json '{"language":"scala","clusterId":"..."}' + # SQL -python /scripts/compute.py execute-code --code "SELECT * FROM table LIMIT 10" --compute-type cluster --language sql --cluster-id ... +databricks api post /api/1.2/contexts/create --json '{"language":"sql","clusterId":"..."}' # R -python /scripts/compute.py execute-code --code 'print("Hello")' --compute-type cluster --language r --cluster-id ... +databricks api post /api/1.2/contexts/create --json '{"language":"r","clusterId":"..."}' ``` +Each language needs its own context on the same cluster. + ## Installing Libraries Install pip packages directly in the execution context: ```bash -python /scripts/compute.py execute-code \ - --code "%pip install faker" \ - --compute-type cluster \ - --cluster-id "..." \ - --context-id "..." -``` - -If needed, restart Python to pick up new packages: -```bash -python /scripts/compute.py execute-code \ - --code "dbutils.library.restartPython()" \ - --compute-type cluster \ - --cluster-id "..." \ - --context-id "..." +databricks api post /api/1.2/commands/execute --json '{ + "language":"python","clusterId":"...","contextId":"...", + "command":"%pip install faker" +}' ``` -## Context Lifecycle - -**Keep alive (default):** Context persists until cluster terminates. +If needed, restart Python in the same context to pick up new packages: -**Destroy when done:** ```bash -python /scripts/compute.py execute-code \ - --code "print('Done!')" \ - --compute-type cluster \ - --cluster-id "..." \ - --destroy-context +databricks api post /api/1.2/commands/execute --json '{ + "language":"python","clusterId":"...","contextId":"...", + "command":"dbutils.library.restartPython()" +}' ``` ## Managing Clusters -Two equivalent paths: the standalone script (convenience wrapper) or the raw `databricks` CLI (more fields exposed). Prefer the script for the common operations listed here. +All cluster lifecycle goes through `databricks clusters`: ```bash -# List all clusters -python /scripts/compute.py list-compute --resource clusters +# List all clusters (full output) +databricks clusters list --output json -# Get specific cluster status -python /scripts/compute.py list-compute --cluster-id "1234-567890-abcdef" +# Get one cluster's state +databricks clusters get | jq '{state, cluster_id, cluster_name}' -# Start a cluster (WITH USER APPROVAL ONLY - costs money, 3-8min startup) -python /scripts/compute.py manage-cluster --action start --cluster-id "1234-567890-abcdef" +# Start a cluster (WITH USER APPROVAL ONLY — costs money, 3-8 min startup) +databricks clusters start -# Terminate a cluster (reversible) -python /scripts/compute.py manage-cluster --action terminate --cluster-id "1234-567890-abcdef" +# Terminate (reversible — cluster definition kept, state lost) +databricks clusters delete -# Create a new cluster -python /scripts/compute.py manage-cluster --action create --name "my-cluster" --num-workers 2 -``` +# Permanent delete (irreversible) +databricks clusters permanent-delete -### Filter running interactive clusters only (raw CLI) +# Restart +databricks clusters restart -Useful before asking the user which cluster to reuse. `--cluster-sources UI,API` excludes job clusters (which would otherwise dominate the list on busy workspaces): - -```bash -databricks clusters list --cluster-sources UI,API --output json \ - | jq '.[] | select(.state == "RUNNING")' +# Resize +databricks clusters resize --num-workers 4 ``` -### Create with a full spec (raw CLI) - -The script's `manage-cluster --action create` is fine for quick defaults; for full control (DBR version, instance type, tags) use the raw CLI: +### Create with a full spec ```bash -# SPARK_VERSION is positional; custom_tags recommended for resource tracking +# SPARK_VERSION is positional. custom_tags recommended for resource tracking. databricks clusters create 15.4.x-scala2.12 --json '{ "cluster_name": "my-cluster", "node_type_id": "i3.xlarge", @@ -170,13 +192,21 @@ databricks clusters create 15.4.x-scala2.12 --json '{ }' ``` +Discover node types and DBR versions: + +```bash +databricks clusters list-node-types | jq '.node_types[] | {node_type_id, memory_mb, num_cores}' +databricks clusters spark-versions | jq '.versions[] | {key, name}' +``` + ## Common Issues | Issue | Solution | |-------|----------| | "No running cluster" | Ask user to start or use serverless | -| Context not found | Context expired; create new one | -| Library not found | `%pip install ` then restart Python if needed | +| `Context not found` | Context expired (cluster restarted, or destroyed); create a new one | +| Library not found mid-session | `%pip install `, then `dbutils.library.restartPython()` if needed | +| Command stuck in `Running` | Send `databricks api post /api/1.2/commands/cancel --json '{"clusterId":"...","contextId":"...","commandId":"..."}'` | ## When NOT to Use diff --git a/experimental/databricks-execution-compute/scripts/compute.py b/experimental/databricks-execution-compute/scripts/compute.py deleted file mode 100644 index e90a4ac..0000000 --- a/experimental/databricks-execution-compute/scripts/compute.py +++ /dev/null @@ -1,743 +0,0 @@ -#!/usr/bin/env python3 -"""Compute CLI - Execute code and manage compute resources on Databricks. - -Standalone script with no external dependencies beyond databricks-sdk. - -Commands: -- execute-code: Run code on serverless or cluster compute -- list-compute: List clusters, node types, or spark versions -- manage-cluster: Create, start, terminate, or delete clusters - -Requires: pip install databricks-sdk -""" - -import argparse -import base64 -import json -import uuid -from dataclasses import dataclass -from datetime import timedelta -from typing import Any, Dict, List, Optional - -from databricks.sdk import WorkspaceClient -from databricks.sdk.service.compute import ( - ClusterSource, - CommandStatus, - ContextStatus, - Environment, - Language, - ListClustersFilterBy, - ResultType, - State, -) -from databricks.sdk.service.jobs import ( - JobEnvironment, - NotebookTask, - RunResultState, - Source, - SubmitTask, -) -from databricks.sdk.service.workspace import ImportFormat, Language as WsLang - - -# --------------------------------------------------------------------------- -# Authentication -# --------------------------------------------------------------------------- - -def get_workspace_client() -> WorkspaceClient: - """Get authenticated WorkspaceClient using standard auth chain.""" - return WorkspaceClient() - - -def get_current_username() -> str: - """Get the current user's username.""" - w = get_workspace_client() - return w.current_user.me().user_name - - -# --------------------------------------------------------------------------- -# Exceptions -# --------------------------------------------------------------------------- - -class NoRunningClusterError(Exception): - """Raised when no running cluster is available.""" - - def __init__(self, message: str, suggestions: List[str] = None, startable_clusters: List[Dict] = None): - super().__init__(message) - self.suggestions = suggestions or [] - self.startable_clusters = startable_clusters or [] - - -# --------------------------------------------------------------------------- -# Result Classes -# --------------------------------------------------------------------------- - -@dataclass -class ExecutionResult: - """Result from cluster command execution.""" - success: bool - output: str = "" - error: str = "" - cluster_id: str = "" - context_id: str = "" - status: str = "" - result_type: str = "" - - def to_dict(self) -> Dict[str, Any]: - return { - "success": self.success, - "output": self.output, - "error": self.error, - "cluster_id": self.cluster_id, - "context_id": self.context_id, - "status": self.status, - "result_type": self.result_type, - } - - -@dataclass -class ServerlessRunResult: - """Result from serverless code execution.""" - success: bool - output: str = "" - error: str = "" - run_id: int = 0 - run_page_url: str = "" - state: str = "" - execution_duration_ms: int = 0 - - def to_dict(self) -> Dict[str, Any]: - return { - "success": self.success, - "output": self.output, - "error": self.error, - "run_id": self.run_id, - "run_page_url": self.run_page_url, - "state": self.state, - "execution_duration_ms": self.execution_duration_ms, - } - - -# --------------------------------------------------------------------------- -# Cluster Execution -# --------------------------------------------------------------------------- - -def list_clusters() -> List[Dict[str, Any]]: - """List interactive clusters created by humans (UI/API, not jobs).""" - w = get_workspace_client() - clusters = [] - # Filter to only UI and API created clusters (interactive, human-created) - # Excludes JOB clusters (created by jobs) and other system clusters - filter_by = ListClustersFilterBy( - cluster_sources=[ClusterSource.UI, ClusterSource.API] - ) - for c in w.clusters.list(filter_by=filter_by, page_size=100): - clusters.append({ - "cluster_id": c.cluster_id, - "cluster_name": c.cluster_name, - "state": c.state.value if c.state else "UNKNOWN", - "creator_user_name": c.creator_user_name, - "spark_version": c.spark_version, - "node_type_id": c.node_type_id, - "num_workers": c.num_workers, - }) - return clusters - - -def get_best_cluster() -> str: - """Get the best running interactive cluster ID, or raise NoRunningClusterError.""" - w = get_workspace_client() - running = [] - startable = [] - - # Filter to only interactive clusters (UI/API created) - filter_by = ListClustersFilterBy( - cluster_sources=[ClusterSource.UI, ClusterSource.API] - ) - for c in w.clusters.list(filter_by=filter_by, page_size=100): - info = { - "cluster_id": c.cluster_id, - "cluster_name": c.cluster_name, - "state": c.state.value if c.state else "UNKNOWN", - } - if c.state == State.RUNNING: - running.append(info) - elif c.state in (State.TERMINATED, State.PENDING): - startable.append(info) - - if running: - return running[0]["cluster_id"] - - raise NoRunningClusterError( - "No running cluster available.", - suggestions=[ - "Start an existing cluster with: python compute.py manage-cluster --action start --cluster-id ", - "Use serverless compute: python compute.py execute-code --compute-type serverless --code '...'", - ], - startable_clusters=startable, - ) - - -def start_cluster(cluster_id: str) -> Dict[str, Any]: - """Start a cluster and wait for it to be running.""" - w = get_workspace_client() - w.clusters.start(cluster_id=cluster_id) - # Don't wait - just return immediately - return {"success": True, "cluster_id": cluster_id, "message": "Cluster start initiated"} - - -def get_cluster_status(cluster_id: str) -> Dict[str, Any]: - """Get the status of a specific cluster.""" - w = get_workspace_client() - c = w.clusters.get(cluster_id=cluster_id) - return { - "cluster_id": c.cluster_id, - "cluster_name": c.cluster_name, - "state": c.state.value if c.state else "UNKNOWN", - "state_message": c.state_message, - "creator_user_name": c.creator_user_name, - "spark_version": c.spark_version, - "node_type_id": c.node_type_id, - "num_workers": c.num_workers, - } - - -def _get_or_create_context(w: WorkspaceClient, cluster_id: str, context_id: Optional[str], language: str) -> str: - """Get existing context or create a new one.""" - lang_map = {"python": Language.PYTHON, "scala": Language.SCALA, "sql": Language.SQL, "r": Language.R} - lang = lang_map.get(language.lower(), Language.PYTHON) - - if context_id: - # Verify context exists - try: - status = w.command_execution.context_status(cluster_id=cluster_id, context_id=context_id) - if status.status == ContextStatus.RUNNING: - return context_id - except Exception: - pass # Context doesn't exist, create new one - - # Create new context - ctx = w.command_execution.create(cluster_id=cluster_id, language=lang).result() - return ctx.id - - -def execute_databricks_command( - code: str, - cluster_id: Optional[str] = None, - context_id: Optional[str] = None, - language: str = "python", - timeout: int = 120, - destroy_context_on_completion: bool = False, -) -> ExecutionResult: - """Execute code on a Databricks cluster using Command Execution API.""" - w = get_workspace_client() - - # Get cluster ID if not provided - if not cluster_id: - cluster_id = get_best_cluster() - - # Get or create context - ctx_id = _get_or_create_context(w, cluster_id, context_id, language) - - # Execute command - lang_map = {"python": Language.PYTHON, "scala": Language.SCALA, "sql": Language.SQL, "r": Language.R} - lang = lang_map.get(language.lower(), Language.PYTHON) - - try: - cmd = w.command_execution.execute( - cluster_id=cluster_id, - context_id=ctx_id, - language=lang, - command=code, - ).result(timeout=timedelta(seconds=timeout)) - - # Parse results - output = "" - error = "" - result_type = cmd.results.result_type.value if cmd.results and cmd.results.result_type else "" - - if cmd.results: - if cmd.results.result_type == ResultType.TEXT: - output = cmd.results.data or "" - elif cmd.results.result_type == ResultType.TABLE: - output = json.dumps(cmd.results.data) if cmd.results.data else "" - elif cmd.results.result_type == ResultType.ERROR: - error = cmd.results.cause or str(cmd.results.data) or "Unknown error" - - success = cmd.status == CommandStatus.FINISHED and cmd.results.result_type != ResultType.ERROR - - return ExecutionResult( - success=success, - output=output, - error=error, - cluster_id=cluster_id, - context_id=ctx_id, - status=cmd.status.value if cmd.status else "", - result_type=result_type, - ) - - finally: - if destroy_context_on_completion and ctx_id: - try: - w.command_execution.destroy(cluster_id=cluster_id, context_id=ctx_id) - except Exception: - pass - - -# --------------------------------------------------------------------------- -# Serverless Execution -# --------------------------------------------------------------------------- - -def run_code_on_serverless( - code: str, - language: str = "python", - timeout: int = 1800, - environments: Optional[List[Any]] = None, -) -> ServerlessRunResult: - """Run code on serverless compute using Jobs API runs/submit. - - Args: - code: Source to execute. - language: "python" or "sql". - timeout: Max wait time in seconds. - environments: Optional list of environments to install dependencies. - Each entry may be a dict (documented shape) or a typed - ``JobEnvironment``. Dict shape: - {"environment_key": "my_env", - "spec": {"client": "4", "dependencies": ["pandas", "mlflow"]}} - ``client`` must be ``"4"`` (or higher) for dependencies to install; - ``"1"`` is the default but does NOT install ``dependencies``. - """ - w = get_workspace_client() - - # Create temp notebook - username = get_current_username() - notebook_name = f"_tmp_serverless_{uuid.uuid4().hex[:8]}" - notebook_path = f"/Workspace/Users/{username}/.tmp/{notebook_name}" - - # Ensure directory exists - try: - w.workspace.mkdirs(f"/Workspace/Users/{username}/.tmp") - except Exception: - pass - - # Upload notebook content - if language.lower() == "sql": - notebook_content = f"-- Databricks notebook source\n{code}" - else: - notebook_content = f"# Databricks notebook source\n{code}" - - content_b64 = base64.b64encode(notebook_content.encode()).decode() - - ws_lang_map = {"python": WsLang.PYTHON, "sql": WsLang.SQL} - ws_lang = ws_lang_map.get(language.lower(), WsLang.PYTHON) - - w.workspace.import_( - path=notebook_path, - content=content_b64, - format=ImportFormat.SOURCE, - language=ws_lang, - overwrite=True, - ) - - # Normalize environments (accept dicts or typed JobEnvironment). - # The SDK serializes each list item via .as_dict(), so raw dicts fail there; - # typed objects also lack .get(), so we need to canonicalize before reading - # environment_key for the task binding. - if environments: - normalized = [] - for e in environments: - if isinstance(e, JobEnvironment): - normalized.append(e) - elif isinstance(e, dict): - spec = e.get("spec", {}) - if isinstance(spec, dict): - spec = Environment(**spec) - elif not isinstance(spec, Environment): - raise TypeError( - f"environments[].spec must be a dict or Environment, got {type(spec).__name__}" - ) - normalized.append( - JobEnvironment( - environment_key=e.get("environment_key", "default"), - spec=spec, - ) - ) - else: - raise TypeError( - f"environments[] entries must be dict or JobEnvironment, got {type(e).__name__}" - ) - job_envs = normalized - env_key = job_envs[0].environment_key or "default" - else: - job_envs = [JobEnvironment(environment_key="default", spec=Environment(client="1"))] - env_key = "default" - - try: - # Submit run - run = w.jobs.submit( - run_name=f"serverless-run-{uuid.uuid4().hex[:8]}", - tasks=[ - SubmitTask( - task_key="main", - notebook_task=NotebookTask( - notebook_path=notebook_path, - source=Source.WORKSPACE, - ), - environment_key=env_key, - ) - ], - environments=job_envs, - ).result(timeout=timedelta(seconds=timeout)) - - # Get run output - run_output = w.jobs.get_run_output(run_id=run.tasks[0].run_id) - - output = "" - error = "" - success = run.state.result_state == RunResultState.SUCCESS - - if run_output.notebook_output and run_output.notebook_output.result: - output = run_output.notebook_output.result - if run_output.error: - error = run_output.error - - return ServerlessRunResult( - success=success, - output=output, - error=error, - run_id=run.run_id, - run_page_url=run.run_page_url or "", - state=run.state.result_state.value if run.state and run.state.result_state else "", - execution_duration_ms=run.execution_duration or 0, - ) - - finally: - # Cleanup temp notebook - try: - w.workspace.delete(notebook_path) - except Exception: - pass - - -# --------------------------------------------------------------------------- -# Cluster Management -# --------------------------------------------------------------------------- - -def create_cluster( - name: str, - num_workers: int = 1, - autotermination_minutes: int = 120, - spark_version: Optional[str] = None, - node_type_id: Optional[str] = None, -) -> Dict[str, Any]: - """Create a new cluster.""" - w = get_workspace_client() - - # Get defaults if not provided - if not spark_version: - versions = list(w.clusters.spark_versions()) - # Pick latest LTS - for v in versions: - if "LTS" in v.name and "ML" not in v.name: - spark_version = v.key - break - if not spark_version and versions: - spark_version = versions[0].key - - if not node_type_id: - node_types = list(w.clusters.list_node_types().node_types) - # Pick smallest available - for nt in sorted(node_types, key=lambda x: x.memory_mb or 0): - if nt.is_deprecated is not True: - node_type_id = nt.node_type_id - break - - cluster = w.clusters.create( - cluster_name=name, - spark_version=spark_version, - node_type_id=node_type_id, - num_workers=num_workers, - autotermination_minutes=autotermination_minutes, - ).result() - - return { - "success": True, - "cluster_id": cluster.cluster_id, - "cluster_name": name, - "message": "Cluster created", - } - - -def terminate_cluster(cluster_id: str) -> Dict[str, Any]: - """Terminate a cluster (can be restarted).""" - w = get_workspace_client() - w.clusters.delete(cluster_id=cluster_id) - return {"success": True, "cluster_id": cluster_id, "message": "Cluster terminated"} - - -def delete_cluster(cluster_id: str) -> Dict[str, Any]: - """Permanently delete a cluster.""" - w = get_workspace_client() - w.clusters.permanent_delete(cluster_id=cluster_id) - return {"success": True, "cluster_id": cluster_id, "message": "Cluster permanently deleted"} - - -def list_node_types() -> List[Dict[str, Any]]: - """List available node types.""" - w = get_workspace_client() - result = [] - for nt in w.clusters.list_node_types().node_types: - result.append({ - "node_type_id": nt.node_type_id, - "memory_mb": nt.memory_mb, - "num_cores": nt.num_cores, - "description": nt.description, - "is_deprecated": nt.is_deprecated, - }) - return result - - -def list_spark_versions() -> List[Dict[str, Any]]: - """List available Spark versions.""" - w = get_workspace_client() - result = [] - response = w.clusters.spark_versions() - for v in response.versions or []: - result.append({ - "key": v.key, - "name": v.name, - }) - return result - - -# --------------------------------------------------------------------------- -# CLI Commands -# --------------------------------------------------------------------------- - -def _none_if_empty(value): - """Convert empty strings to None.""" - return None if value == "" else value - - -def _no_cluster_error_response(e: NoRunningClusterError) -> Dict[str, Any]: - """Build a structured error response when no running cluster is available.""" - return { - "success": False, - "error": str(e), - "suggestions": e.suggestions, - "startable_clusters": e.startable_clusters, - } - - -def cmd_execute_code(args): - """Execute code on Databricks via serverless or cluster compute.""" - code = _none_if_empty(args.code) - file_path = _none_if_empty(args.file) - cluster_id = _none_if_empty(args.cluster_id) - context_id = _none_if_empty(args.context_id) - language = _none_if_empty(args.language) or "python" - compute_type = args.compute_type - timeout = args.timeout - destroy_context = args.destroy_context - - # Parse --environments (JSON string or @path/to/file.json) for serverless - environments = None - env_arg = _none_if_empty(getattr(args, "environments", None)) - if env_arg: - try: - if env_arg.startswith("@"): - with open(env_arg[1:], "r", encoding="utf-8") as fh: - environments = json.load(fh) - else: - environments = json.loads(env_arg) - except (OSError, json.JSONDecodeError) as e: - return {"success": False, "error": f"Invalid --environments: {e}"} - if not isinstance(environments, list): - return {"success": False, - "error": "--environments must be a JSON array of environment objects"} - - if not code and not file_path: - return {"success": False, "error": "Either --code or --file must be provided."} - - # Read code from file if provided - if file_path and not code: - try: - with open(file_path, "r", encoding="utf-8") as f: - code = f.read() - except FileNotFoundError: - return {"success": False, "error": f"File not found: {file_path}"} - - # Resolve "auto" compute type - if compute_type == "auto": - if cluster_id or context_id: - compute_type = "cluster" - elif language.lower() in ("scala", "r"): - compute_type = "cluster" - else: - compute_type = "serverless" - - # Serverless execution - if compute_type == "serverless": - default_timeout = timeout if timeout else 1800 - try: - result = run_code_on_serverless( - code=code, - language=language, - timeout=default_timeout, - environments=environments, - ) - except TypeError as e: - return {"success": False, "error": str(e)} - return result.to_dict() - - if environments: - return {"success": False, - "error": "--environments is only supported with --compute-type serverless"} - - # Cluster execution - default_timeout = timeout if timeout else 120 - try: - result = execute_databricks_command( - code=code, - cluster_id=cluster_id, - context_id=context_id, - language=language, - timeout=default_timeout, - destroy_context_on_completion=destroy_context, - ) - return result.to_dict() - except NoRunningClusterError as e: - return _no_cluster_error_response(e) - - -def cmd_list_compute(args): - """List compute resources: clusters, node types, or spark versions.""" - resource = args.resource.lower() - cluster_id = _none_if_empty(args.cluster_id) - auto_select = args.auto_select - - if resource == "clusters": - if cluster_id: - return get_cluster_status(cluster_id) - if auto_select: - try: - best = get_best_cluster() - return {"cluster_id": best} - except NoRunningClusterError as e: - return _no_cluster_error_response(e) - return {"clusters": list_clusters()} - - elif resource == "node_types": - return {"node_types": list_node_types()} - - elif resource == "spark_versions": - return {"spark_versions": list_spark_versions()} - - else: - return {"success": False, "error": f"Unknown resource: {resource}. Use: clusters, node_types, spark_versions"} - - -def cmd_manage_cluster(args): - """Create, start, terminate, or delete a cluster.""" - action = args.action.lower() - cluster_id = _none_if_empty(args.cluster_id) - name = _none_if_empty(args.name) - - if action == "create": - if not name: - return {"success": False, "error": "name is required for create action."} - return create_cluster( - name=name, - num_workers=args.num_workers or 1, - autotermination_minutes=args.autotermination_minutes or 120, - ) - - elif action == "start": - if not cluster_id: - return {"success": False, "error": "cluster_id is required for start action."} - return start_cluster(cluster_id) - - elif action == "terminate": - if not cluster_id: - return {"success": False, "error": "cluster_id is required for terminate action."} - return terminate_cluster(cluster_id) - - elif action == "delete": - if not cluster_id: - return {"success": False, "error": "cluster_id is required for delete action."} - return delete_cluster(cluster_id) - - elif action == "get": - if not cluster_id: - return {"success": False, "error": "cluster_id is required for get action."} - try: - return get_cluster_status(cluster_id) - except Exception as e: - if "does not exist" in str(e).lower(): - return {"success": True, "cluster_id": cluster_id, "state": "DELETED", "exists": False} - return {"success": False, "error": str(e)} - - else: - return {"success": False, "error": f"Unknown action: {action}. Use: create, start, terminate, delete, get"} - - -# --------------------------------------------------------------------------- -# CLI Setup -# --------------------------------------------------------------------------- - -def main(): - parser = argparse.ArgumentParser( - description="Execute code and manage compute on Databricks", - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - subparsers = parser.add_subparsers(dest="command", required=True) - - # execute-code - exec_parser = subparsers.add_parser("execute-code", help="Run code on Databricks") - exec_parser.add_argument("--code", help="Code to execute") - exec_parser.add_argument("--file", help="File to execute") - exec_parser.add_argument("--compute-type", default="auto", choices=["auto", "serverless", "cluster"], - help="Compute type (default: auto)") - exec_parser.add_argument("--cluster-id", help="Cluster ID (for cluster compute)") - exec_parser.add_argument("--context-id", help="Context ID (reuse existing context)") - exec_parser.add_argument("--language", default="python", choices=["python", "scala", "sql", "r"], - help="Language (default: python)") - exec_parser.add_argument("--timeout", type=int, help="Timeout in seconds") - exec_parser.add_argument("--destroy-context", action="store_true", help="Destroy context after execution") - exec_parser.add_argument( - "--environments", - help=( - "Serverless only. JSON array of environments (or @path/to/file.json). " - 'Example: \'[{"environment_key":"ml_env","spec":{"client":"4",' - '"dependencies":["mlflow","scikit-learn"]}}]\'. ' - 'IMPORTANT: "client":"4" installs dependencies; "1" does not.' - ), - ) - exec_parser.set_defaults(func=cmd_execute_code) - - # list-compute - list_parser = subparsers.add_parser("list-compute", help="List compute resources") - list_parser.add_argument("--resource", default="clusters", choices=["clusters", "node_types", "spark_versions"], - help="Resource to list (default: clusters)") - list_parser.add_argument("--cluster-id", help="Get specific cluster status") - list_parser.add_argument("--auto-select", action="store_true", help="Return best running cluster") - list_parser.set_defaults(func=cmd_list_compute) - - # manage-cluster - manage_parser = subparsers.add_parser("manage-cluster", help="Manage clusters") - manage_parser.add_argument("--action", required=True, choices=["create", "start", "terminate", "delete", "get"], - help="Action to perform") - manage_parser.add_argument("--cluster-id", help="Cluster ID") - manage_parser.add_argument("--name", help="Cluster name (for create)") - manage_parser.add_argument("--num-workers", type=int, help="Number of workers (for create)") - manage_parser.add_argument("--autotermination-minutes", type=int, help="Auto-termination minutes (for create)") - manage_parser.set_defaults(func=cmd_manage_cluster) - - args = parser.parse_args() - result = args.func(args) - print(json.dumps(result, indent=2, default=str)) - - -if __name__ == "__main__": - main() diff --git a/manifest.json b/manifest.json index f0fe8fe..0cda7df 100644 --- a/manifest.json +++ b/manifest.json @@ -1,12 +1,12 @@ { "version": "2", - "updated_at": "2026-05-22T20:18:49Z", + "updated_at": "2026-05-26T08:51:23Z", "skills": { "databricks-apps": { "version": "0.1.2", "description": "Databricks Apps development and deployment (evaluates analytics vs synced tables data access)", "repo_dir": "skills", - "updated_at": "2026-05-22T15:54:04Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -33,7 +33,7 @@ "version": "0.1.0", "description": "Core Databricks skill for CLI, auth, and data exploration", "repo_dir": "skills", - "updated_at": "2026-05-15T09:44:24Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -48,7 +48,7 @@ "version": "0.0.1", "description": "Declarative Automation Bundles (DABs) for deploying and managing Databricks resources", "repo_dir": "skills", - "updated_at": "2026-05-12T15:39:50Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -66,7 +66,7 @@ "version": "0.2.0", "description": "Develop and deploy Lakeflow Jobs on Databricks via DABs, Python SDK, or the CLI \u2014 covers all task types, triggers, notifications, and worked examples", "repo_dir": "skills", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -82,7 +82,7 @@ "version": "0.1.0", "description": "Databricks Lakebase Postgres: projects, scaling, connectivity, synced tables, and Data API", "repo_dir": "skills", - "updated_at": "2026-05-22T15:54:04Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -101,7 +101,7 @@ "version": "0.1.0", "description": "Databricks Model Serving endpoint management", "repo_dir": "skills", - "updated_at": "2026-05-22T15:54:04Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -114,7 +114,7 @@ "version": "0.1.0", "description": "Databricks Spark Declarative Pipelines (SDP) for ETL and streaming", "repo_dir": "skills", - "updated_at": "2026-05-12T15:39:50Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -161,7 +161,7 @@ "version": "0.1.0", "description": "Migrate Databricks workloads from classic compute to serverless compute, including compatibility checks and concrete fixes", "repo_dir": "skills", - "updated_at": "2026-05-12T15:39:50Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -178,7 +178,7 @@ "version": "0.0.1", "description": "Create Agent Bricks: Knowledge Assistants (KA) for document Q&A and Supervisor Agents for multi-agent orchestration (MAS).", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:18:49Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "1-knowledge-assistants.md", "2-supervisor-agents.md", @@ -192,7 +192,7 @@ "version": "0.0.1", "description": "Use Databricks built-in AI Functions (ai_classify, ai_extract, ai_summarize, ai_mask, ai_translate, ai_fix_grammar, ai_gen, ai_analyze_sentiment, ai_similarity, ai_parse_document, ai_query, ai_forecast) to add AI capabilities directly to SQL and PySpark pipelines without managing model endpoints. Also covers document parsing and building custom RAG pipelines (parse \u2192 chunk \u2192 index \u2192 query).", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "1-task-functions.md", "2-ai-query.md", @@ -208,7 +208,7 @@ "version": "0.0.1", "description": "Create Databricks AI/BI dashboards. Must use when creating, updating, or deploying Lakeview dashboards as Databricks Dashboard have a unique json structure. CRITICAL: You MUST test ALL SQL queries via CLI BEFORE deploying. Follow guidelines strictly.", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "1-widget-specifications.md", "2-advanced-widget-specifications.md", @@ -225,7 +225,7 @@ "version": "0.0.1", "description": "Builds Databricks applications. Prefers AppKit (TypeScript + React SDK) for new apps; falls back to Python frameworks (Dash, Streamlit, Gradio, Flask, FastAPI, Reflex) when Python is required. Handles OAuth authorization, app resources, SQL warehouse and Lakebase connectivity, model serving, foundation model APIs, and deployment. Use when building web apps, dashboards, ML demos, or REST APIs for Databricks, or when the user mentions AppKit, Streamlit, Dash, Gradio, Flask, FastAPI, Reflex, or Databricks app.", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "1-authorization.md", "2-app-resources.md", @@ -247,7 +247,7 @@ "version": "0.0.1", "description": "Databricks SQL (DBSQL) advanced features and SQL warehouse capabilities. This skill MUST be invoked when the user mentions: \"DBSQL\", \"Databricks SQL\", \"SQL warehouse\", \"SQL scripting\", \"stored procedure\", \"CALL procedure\", \"materialized view\", \"CREATE MATERIALIZED VIEW\", \"pipe syntax\", \"|>\", \"geospatial\", \"H3\", \"ST_\", \"spatial SQL\", \"collation\", \"COLLATE\", \"ai_query\", \"ai_classify\", \"ai_extract\", \"ai_gen\", \"AI function\", \"http_request\", \"remote_query\", \"read_files\", \"Lakehouse Federation\", \"recursive CTE\", \"WITH RECURSIVE\", \"multi-statement transaction\", \"temp table\", \"temporary view\", \"pipe operator\". SHOULD also invoke when the user asks about SQL best practices, data modeling patterns, or advanced SQL features on Databricks.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -264,7 +264,7 @@ "version": "0.0.1", "description": "Databricks documentation reference via llms.txt index. Use when other skills do not cover a topic, looking up unfamiliar Databricks features, or needing authoritative docs on APIs, configurations, or platform capabilities.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -276,7 +276,7 @@ "version": "0.0.1", "description": "Execute code and manage compute on Databricks. Use this skill when the user mentions: \"run code\", \"execute\", \"run on databricks\", \"serverless\", \"no cluster\", \"run python\", \"run scala\", \"run sql\", \"run R\", \"run file\", \"push and run\", \"notebook run\", \"batch script\", \"model training\", \"run script on cluster\", \"create cluster\", \"new cluster\", \"resize cluster\", \"modify cluster\", \"delete cluster\", \"terminate cluster\", \"create warehouse\", \"new warehouse\", \"resize warehouse\", \"delete warehouse\", \"node types\", \"runtime versions\", \"DBR versions\", \"spin up compute\", \"provision cluster\".", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:57:09Z", + "updated_at": "2026-05-26T08:51:05Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -284,15 +284,14 @@ "assets/databricks.svg", "references/1-databricks-connect.md", "references/2-serverless-job.md", - "references/3-interactive-cluster.md", - "scripts/compute.py" + "references/3-interactive-cluster.md" ] }, "databricks-iceberg": { "version": "0.0.1", "description": "Apache Iceberg tables on Databricks \u2014 Managed Iceberg tables, External Iceberg Reads (fka Uniform), Compatibility Mode, Iceberg REST Catalog (IRC), Iceberg v3, Snowflake interop, PyIceberg, OSS Spark, external engine access and credential vending. Use when creating Iceberg tables, enabling External Iceberg Reads (uniform) on Delta tables (including Streaming Tables and Materialized Views via compatibility mode), configuring external engines to read Databricks tables via Unity Catalog IRC, integrating with Snowflake catalog to read Foreign Iceberg tables", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "1-managed-iceberg-tables.md", "2-uniform-and-compatibility.md", @@ -309,7 +308,7 @@ "version": "0.0.1", "description": "Unity Catalog metric views: define, create, query, and manage governed business metrics in YAML. Use when building standardized KPIs, revenue metrics, order analytics, or any reusable business metrics that need consistent definitions across teams and tools.", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -323,7 +322,7 @@ "version": "0.0.1", "description": "MLflow 3 GenAI agent evaluation. Use when writing mlflow.genai.evaluate() code, creating @scorer functions, using built-in scorers (Guidelines, Correctness, Safety, RetrievalGroundedness), building eval datasets from traces, setting up trace ingestion and production monitoring, aligning judges with MemAlign from domain expert feedback, or running optimize_prompts() with GEPA for automated prompt improvement.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:56:43Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -346,7 +345,7 @@ "version": "0.0.1", "description": "Databricks development guidance including Python SDK, Databricks Connect, CLI, and REST API. Use when working with databricks-sdk, databricks-connect, or Databricks APIs.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -364,7 +363,7 @@ "version": "0.0.1", "description": "Comprehensive guide to Spark Structured Streaming for production workloads. Use when building streaming pipelines, working with Kafka ingestion, implementing Real-Time Mode (RTM), configuring triggers (processingTime, availableNow), handling stateful operations with watermarks, optimizing checkpoints, performing stream-stream or stream-static joins, writing to multiple sinks, or tuning streaming cost and performance.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -385,7 +384,7 @@ "version": "0.0.1", "description": "Generate realistic synthetic data using Spark + Faker (strongly recommended). Supports serverless execution, multiple output formats (Parquet/JSON/CSV/Delta), and scales from thousands to millions of rows. For small datasets (<10K rows), can optionally generate locally and upload to volumes. Use when user mentions 'synthetic data', 'test data', 'generate data', 'demo dataset', 'Faker', or 'sample data'.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -400,7 +399,7 @@ "version": "0.0.1", "description": "Unity Catalog system tables and volumes. Use when querying system tables (audit, lineage, billing) or working with volume file operations (upload, download, list files in /Volumes/).", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "5-system-tables.md", "6-volumes.md", @@ -415,7 +414,7 @@ "version": "0.0.1", "description": "Generate PDF documents from HTML and upload to Unity Catalog volumes. Use for creating test PDFs, demo documents, reports, or evaluation datasets.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:56:43Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -428,7 +427,7 @@ "version": "0.0.1", "description": "Patterns for Databricks Vector Search: create endpoints and indexes, query with filters, manage embeddings. Use when building RAG applications, semantic search, or similarity matching. Covers both storage-optimized and standard endpoints.", "repo_dir": "experimental", - "updated_at": "2026-05-22T15:54:01Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", @@ -444,7 +443,7 @@ "version": "0.0.1", "description": "Build Zerobus Ingest clients for near real-time data ingestion into Databricks Delta tables via gRPC. Use when creating producers that write directly to Unity Catalog tables without a message bus, working with the Zerobus Ingest SDK in Python/Java/Go/TypeScript/Rust, generating Protobuf schemas from UC tables, or implementing stream-based ingestion with ACK handling and retry logic.", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "1-setup-and-authentication.md", "2-python-client.md", @@ -461,7 +460,7 @@ "version": "0.0.1", "description": "Build custom Python data sources for Apache Spark using the PySpark DataSource API \u2014 batch and streaming readers/writers for external systems. Use this skill whenever someone wants to connect Spark to an external system (database, API, message queue, custom protocol), build a Spark connector or plugin in Python, implement a DataSourceReader or DataSourceWriter, pull data from or push data to a system via Spark, or work with the PySpark DataSource API in any way. Even if they just say \"read from X in Spark\" or \"write DataFrame to Y\" and there's no native connector, this skill applies.", "repo_dir": "experimental", - "updated_at": "2026-05-22T20:17:46Z", + "updated_at": "2026-05-25T13:56:43Z", "files": [ "SKILL.md", "agents/openai.yaml", From 3d4d88bd25a4b9f6dadc90eeedd42dae7f1c739f Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Tue, 26 May 2026 16:45:54 +0000 Subject: [PATCH 2/4] skills: stop recommending legacy `pip install databricks-cli` That PyPI package is the legacy CLI; the modern CLI is a binary. Per @lennartkats-db review on #90, point readers at the `databricks-core` skill (which has install + auth references) instead. Two hits fixed: - experimental/databricks-execution-compute/SKILL.md intro - skills/databricks-pipelines/references/workflows.md troubleshooting table (carried over from the a-d-k port in #85) Co-authored-by: Isaac --- experimental/databricks-execution-compute/SKILL.md | 2 +- skills/databricks-pipelines/references/workflows.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/experimental/databricks-execution-compute/SKILL.md b/experimental/databricks-execution-compute/SKILL.md index 6f673dc..f29438a 100644 --- a/experimental/databricks-execution-compute/SKILL.md +++ b/experimental/databricks-execution-compute/SKILL.md @@ -13,7 +13,7 @@ description: >- # Databricks Execution & Compute -Run code on Databricks. Three execution modes—choose based on workload. All examples below use the Databricks CLI; install with `pip install databricks-cli` (or follow the workspace-native `databricks` quickstart) and authenticate with `databricks auth login` (see the parent `databricks-core` skill for profile setup). +Run code on Databricks. Three execution modes—choose based on workload. All examples below use the Databricks CLI; see the `databricks-core` skill for install and authentication. ## Execution Mode Decision Matrix diff --git a/skills/databricks-pipelines/references/workflows.md b/skills/databricks-pipelines/references/workflows.md index 671157c..6e41be4 100644 --- a/skills/databricks-pipelines/references/workflows.md +++ b/skills/databricks-pipelines/references/workflows.md @@ -349,7 +349,7 @@ If the user already has `bronze/`, `silver/`, `gold/` folders without a bundle, | Issue | Fix | |-------|-----| -| `Command not found: databricks` | `pip install databricks-cli` | +| `Command not found: databricks` | Install the Databricks CLI — see the parent `databricks-core` skill (CLI installation reference) | | `Invalid catalog name` | `databricks catalogs list` and verify; create with `databricks catalogs create --json '{"name": "..."}'` | | `Language option not recognized` | Use lowercase `"sql"` / `"python"`, not `"SQL"` / `"Python"` | | Files deploy but pipeline doesn't pick them up | Glob pattern in `libraries` doesn't match — re-check `include` path relative to the resource file | From 7ebcdb0e4f320b7f1b2f705378757a6c89671b0d Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Tue, 26 May 2026 23:10:33 +0000 Subject: [PATCH 3/4] skills(execution-compute): fix GET-with-body on /api/1.2/commands/status Per @lennartkats-db review: that endpoint takes clusterId/contextId/ commandId as query string parameters, and HTTP GET bodies are typically dropped by intermediaries. The legacy 1.2 execution-context API is the only surface here (no native CLI subcommand exists), so the fix is to move the params into the URL. Co-authored-by: Isaac --- .../references/3-interactive-cluster.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/experimental/databricks-execution-compute/references/3-interactive-cluster.md b/experimental/databricks-execution-compute/references/3-interactive-cluster.md index 2c5150b..76d0097 100644 --- a/experimental/databricks-execution-compute/references/3-interactive-cluster.md +++ b/experimental/databricks-execution-compute/references/3-interactive-cluster.md @@ -72,10 +72,12 @@ echo "$CMD" ### 3. Poll status and fetch results +The `/api/1.2/commands/status` endpoint takes its parameters in the query string — a JSON body on a GET request gets dropped by the server. + ```bash +CID="1234-567890-abcdef" while :; do - STATUS=$(databricks api get /api/1.2/commands/status \ - --json '{"clusterId":"1234-567890-abcdef","contextId":"'"$CTX"'","commandId":"'"$CMD"'"}') + STATUS=$(databricks api get "/api/1.2/commands/status?clusterId=${CID}&contextId=${CTX}&commandId=${CMD}") STATE=$(echo "$STATUS" | jq -r '.status') [ "$STATE" = "Finished" ] && break [ "$STATE" = "Error" ] && break From 377b1c1eb074a052df980a87236c11ba2a086af1 Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Thu, 28 May 2026 11:12:06 +0000 Subject: [PATCH 4/4] docs(execution-compute): default cluster list to --cluster-sources UI,API Per @QuentinAmbard: on busy workspaces the cluster listing is dominated by job clusters and the unfiltered API call is slow. Apply the server-side filter to the "first action" snippet at line 24 and the secondary reference at line 163, matching the convention the file already uses at line 36. This PR was prepared by Claude. --- .../references/3-interactive-cluster.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/experimental/databricks-execution-compute/references/3-interactive-cluster.md b/experimental/databricks-execution-compute/references/3-interactive-cluster.md index 76d0097..608b81b 100644 --- a/experimental/databricks-execution-compute/references/3-interactive-cluster.md +++ b/experimental/databricks-execution-compute/references/3-interactive-cluster.md @@ -21,7 +21,7 @@ **Starting a cluster takes 3-8 minutes and costs money.** Always check first: ```bash -databricks clusters list --output json | jq '.[] | select(.state == "RUNNING") | {cluster_id, cluster_name, state, cluster_source}' +databricks clusters list --cluster-sources UI,API --output json | jq '.[] | select(.state == "RUNNING") | {cluster_id, cluster_name, state, cluster_source}' ``` If no cluster is running, ask the user: @@ -160,7 +160,7 @@ All cluster lifecycle goes through `databricks clusters`: ```bash # List all clusters (full output) -databricks clusters list --output json +databricks clusters list --cluster-sources UI,API --output json # Get one cluster's state databricks clusters get | jq '{state, cluster_id, cluster_name}'