diff --git a/experimental/databricks-execution-compute/SKILL.md b/experimental/databricks-execution-compute/SKILL.md index e7c825c..f29438a 100644 --- a/experimental/databricks-execution-compute/SKILL.md +++ b/experimental/databricks-execution-compute/SKILL.md @@ -13,9 +13,7 @@ description: >- # Databricks Execution & Compute -Run code on Databricks. Three execution modes—choose based on workload. - -> **Path convention:** `` in examples below = the directory containing this SKILL.md. Resolve it to the absolute path in your install (e.g. `~/.claude/skills/databricks-execution-compute`). Commands like `python /scripts/compute.py ...` work from any cwd. +Run code on Databricks. Three execution modes—choose based on workload. All examples below use the Databricks CLI; see the `databricks-core` skill for install and authentication. ## Execution Mode Decision Matrix @@ -95,10 +93,6 @@ Pure CLI flow: upload a local file as a workspace notebook, fire a one-time run Always use `dbutils.notebook.exit()` in the notebook — `print()` is not captured by `get-run-output`. For JSON results: `dbutils.notebook.exit(json.dumps({...}))` then parse `.notebook_output.result` client-side. -**Convenience wrapper.** `scripts/compute.py execute-code` does upload + submit + wait + cleanup in one command and returns a single tidy JSON: - -`python /scripts/compute.py execute-code --file /local/path/to/train.py --compute-type serverless --timeout 1500 --environments '[{"environment_key":"ml_env","spec":{"client":"4","dependencies":["scikit-learn==1.5.2","mlflow==2.22.0"]}}]' | jq '{success, state, output, error, run_id, run_page_url, execution_duration_ms}'` - ### Interactive Cluster → [reference](references/3-interactive-cluster.md) **Avoid by default — prefer Serverless Job.** Only use an interactive cluster when: @@ -108,14 +102,24 @@ Always use `dbutils.notebook.exit()` in the notebook — `print()` is no Interactive clusters are **slow to start (3-8 min)** and cost money while running. Don't start one implicitly. -## CLI Commands - -| Command | Purpose | -|---------|---------| -| `python /scripts/compute.py execute-code` | Run code on serverless or an existing cluster | -| `python /scripts/compute.py list-compute` | List clusters, node types, Spark versions | -| `python /scripts/compute.py manage-cluster` | Create/start/terminate/delete clusters (see [3-interactive-cluster.md](references/3-interactive-cluster.md)) | -| `databricks warehouses create/list` | Manage SQL warehouses | +## CLI Command Map + +All compute lifecycle and code-execution actions go through the Databricks CLI. Headline commands: + +| Action | Command | +|--------|---------| +| Upload local file as workspace notebook | `databricks workspace import --file --format SOURCE --language PYTHON --overwrite` | +| Run serverless code (upload + submit + wait) | `databricks jobs submit --json @submit.json` (see Serverless Job section above; with `--no-wait` for async) | +| Get run state / wait | `databricks jobs get-run ` (poll `.state.life_cycle_state`) | +| Fetch run output | `databricks jobs get-run-output ` | +| List clusters | `databricks clusters list --output json` | +| Get cluster details | `databricks clusters get ` | +| Start / restart / terminate cluster | `databricks clusters start/restart/delete ` | +| Permanently delete cluster | `databricks clusters permanent-delete ` | +| Create cluster | `databricks clusters create --json '{...}'` (see [3-interactive-cluster.md](references/3-interactive-cluster.md)) | +| List node types / Spark versions | `databricks clusters list-node-types` / `databricks clusters spark-versions` | +| Execute code on a running cluster | `databricks api post /api/1.2/contexts/create` + `databricks api post /api/1.2/commands/execute` (see [3-interactive-cluster.md](references/3-interactive-cluster.md)) | +| SQL warehouses | `databricks warehouses create/list/get/start/stop/edit/delete` (see SQL Warehouses below) | ### SQL Warehouses diff --git a/experimental/databricks-execution-compute/references/2-serverless-job.md b/experimental/databricks-execution-compute/references/2-serverless-job.md index d80c31e..3c1abe1 100644 --- a/experimental/databricks-execution-compute/references/2-serverless-job.md +++ b/experimental/databricks-execution-compute/references/2-serverless-job.md @@ -2,8 +2,6 @@ **Use when:** Running intensive Python code remotely (ML training, heavy processing) that doesn't need Spark, or when code shouldn't depend on the local machine staying connected. -> `` in examples = the directory containing the parent SKILL.md — substitute the absolute install path (e.g. `~/.claude/skills/databricks-execution-compute`). - ## When to Choose Serverless Job - ML model training (runs independently of local machine) @@ -85,22 +83,6 @@ dbutils.notebook.exit(json.dumps({"accuracy": 0.95, "model_path": "/Volumes/..." Max output size is 5 MB. Larger results should be written to a Volume/object store and referenced by path. -## Convenience wrapper - -`scripts/compute.py execute-code` does upload + submit + wait + cleanup in one command and returns a single JSON with `success`, `state`, `output` (the `dbutils.notebook.exit` payload), `error`, `run_id`, `run_page_url`, `execution_duration_ms`. - -Minimal: - -`python /scripts/compute.py execute-code --file train.py --compute-type serverless` - -With dependencies: - -`python /scripts/compute.py execute-code --file /path/to/train.py --compute-type serverless --timeout 1500 --environments '[{"environment_key":"ml_env","spec":{"client":"4","dependencies":["scikit-learn==1.5.2","mlflow==2.22.0","xgboost==2.1.3"]}}]'` - -Long dependency list from a file: - -`python /scripts/compute.py execute-code --file /path/to/train.py --compute-type serverless --environments @env.json` - ## Common Issues | Issue | Solution | @@ -109,7 +91,7 @@ Long dependency list from a file: | `ModuleNotFoundError` | Add the package to the environments spec with `"client": "4"` | | Dependencies listed but not installed | `"client": "1"` silently drops `dependencies`; use `"client": "4"` | | `get-run-output` returns empty `notebook_output` | You passed the parent run_id, not `.tasks[0].run_id` | -| Job times out | Default 1800 s on the script wrapper; raise `--timeout` or use `jobs submit --no-wait` + your own polling | +| Job times out | Use `databricks jobs submit --no-wait` and poll `get-run` yourself, or set `tasks[].timeout_seconds` in the submit JSON to extend the per-task limit | ## When NOT to Use diff --git a/experimental/databricks-execution-compute/references/3-interactive-cluster.md b/experimental/databricks-execution-compute/references/3-interactive-cluster.md index 7197334..608b81b 100644 --- a/experimental/databricks-execution-compute/references/3-interactive-cluster.md +++ b/experimental/databricks-execution-compute/references/3-interactive-cluster.md @@ -2,8 +2,6 @@ **Use when:** You have an existing running cluster and need to preserve state across multiple tool calls, or need Scala/R support. -> `` in examples = the directory containing the parent SKILL.md — substitute the absolute install path (e.g. `~/.claude/skills/databricks-execution-compute`). - ## When to Choose Interactive Cluster - Multiple sequential commands where variables must persist @@ -23,7 +21,7 @@ **Starting a cluster takes 3-8 minutes and costs money.** Always check first: ```bash -python /scripts/compute.py list-compute --resource clusters +databricks clusters list --cluster-sources UI,API --output json | jq '.[] | select(.state == "RUNNING") | {cluster_id, cluster_name, state, cluster_source}' ``` If no cluster is running, ask the user: @@ -32,135 +30,161 @@ If no cluster is running, ask the user: > 2. Use serverless (instant, no setup) > Which do you prefer?" -## Basic Usage +Filter to user-created clusters (exclude job clusters, which dominate the list on busy workspaces): + +```bash +databricks clusters list --cluster-sources UI,API --output json \ + | jq '.[] | select(.state == "RUNNING")' +``` + +## Code Execution Flow (1.2 commands API) + +The Databricks CLI doesn't ship a single "run code on a cluster" subcommand. Use the `1.2 commands` API directly via `databricks api`: + +1. **Create an execution context** (one per language per cluster; reuse across commands for state). +2. **Submit the command** — returns a `commandId`. +3. **Poll status** until `status == "Finished"` (or `Error`). +4. **(Optional) Destroy the context** when done. Contexts also expire when the cluster terminates. + +### 1. Create a context -### First Command: Creates Context +```bash +CTX=$(databricks api post /api/1.2/contexts/create --json '{ + "language": "python", + "clusterId": "1234-567890-abcdef" +}' | jq -r '.id') +echo "$CTX" # e.g. ctx_abc123 +``` + +Languages: `python`, `scala`, `sql`, `r`. You need one context per language; running `sql` requires a separate context from `python` on the same cluster. + +### 2. Submit a command ```bash -python /scripts/compute.py execute-code \ - --code "import pandas as pd; df = pd.DataFrame({'a': [1, 2, 3]}); print(df)" \ - --compute-type cluster \ - --cluster-id "1234-567890-abcdef" +CMD=$(databricks api post /api/1.2/commands/execute --json '{ + "language": "python", + "clusterId": "1234-567890-abcdef", + "contextId": "'"$CTX"'", + "command": "import pandas as pd; df = pd.DataFrame({\"a\": [1, 2, 3]}); print(df)" +}' | jq -r '.id') +echo "$CMD" ``` -Response includes `context_id` for reuse: -```json -{ - "success": true, - "output": " a\n0 1\n1 2\n2 3", - "context_id": "ctx_abc123", - "cluster_id": "1234-567890-abcdef" -} +### 3. Poll status and fetch results + +The `/api/1.2/commands/status` endpoint takes its parameters in the query string — a JSON body on a GET request gets dropped by the server. + +```bash +CID="1234-567890-abcdef" +while :; do + STATUS=$(databricks api get "/api/1.2/commands/status?clusterId=${CID}&contextId=${CTX}&commandId=${CMD}") + STATE=$(echo "$STATUS" | jq -r '.status') + [ "$STATE" = "Finished" ] && break + [ "$STATE" = "Error" ] && break + [ "$STATE" = "Cancelled" ] && break + sleep 2 +done +echo "$STATUS" | jq '{status, results: .results}' ``` -### Follow-up Commands: Reuse Context +`.results.resultType` indicates output type: +- `text` — `.results.data` is the captured stdout string. +- `error` — `.results.summary` has the error preamble; `.results.cause` has the traceback. +- `table` — `.results.schema` + `.results.data` (rows). + +### 4. Follow-up commands reuse the context + +State (variables, imports, `%pip install`-ed packages) persists across commands sharing the same `contextId`: ```bash -# Variables from first command still available -python /scripts/compute.py execute-code \ - --code "print(df.shape)" \ - --compute-type cluster \ - --cluster-id "1234-567890-abcdef" \ - --context-id "ctx_abc123" +CMD2=$(databricks api post /api/1.2/commands/execute --json '{ + "language": "python", + "clusterId": "1234-567890-abcdef", + "contextId": "'"$CTX"'", + "command": "print(df.shape)" +}' | jq -r '.id') +# poll as above ``` -### Auto-Select Best Running Cluster +### 5. (Optional) Destroy the context + +Contexts auto-expire when the cluster terminates. Destroy explicitly when you're done with a session: ```bash -# Get best running cluster -python /scripts/compute.py list-compute --auto-select -# Returns: {"cluster_id": "1234-567890-abcdef"} - -# Then execute on it -python /scripts/compute.py execute-code \ - --code "spark.range(100).show()" \ - --compute-type cluster \ - --cluster-id "1234-567890-abcdef" +databricks api post /api/1.2/contexts/destroy --json '{ + "clusterId": "1234-567890-abcdef", + "contextId": "'"$CTX"'" +}' ``` ## Language Support +The `language` field on context-create + command-execute controls the runtime: + ```bash # Scala -python /scripts/compute.py execute-code --code 'println("Hello")' --compute-type cluster --language scala --cluster-id ... - +databricks api post /api/1.2/contexts/create --json '{"language":"scala","clusterId":"..."}' + # SQL -python /scripts/compute.py execute-code --code "SELECT * FROM table LIMIT 10" --compute-type cluster --language sql --cluster-id ... +databricks api post /api/1.2/contexts/create --json '{"language":"sql","clusterId":"..."}' # R -python /scripts/compute.py execute-code --code 'print("Hello")' --compute-type cluster --language r --cluster-id ... +databricks api post /api/1.2/contexts/create --json '{"language":"r","clusterId":"..."}' ``` +Each language needs its own context on the same cluster. + ## Installing Libraries Install pip packages directly in the execution context: ```bash -python /scripts/compute.py execute-code \ - --code "%pip install faker" \ - --compute-type cluster \ - --cluster-id "..." \ - --context-id "..." -``` - -If needed, restart Python to pick up new packages: -```bash -python /scripts/compute.py execute-code \ - --code "dbutils.library.restartPython()" \ - --compute-type cluster \ - --cluster-id "..." \ - --context-id "..." +databricks api post /api/1.2/commands/execute --json '{ + "language":"python","clusterId":"...","contextId":"...", + "command":"%pip install faker" +}' ``` -## Context Lifecycle +If needed, restart Python in the same context to pick up new packages: -**Keep alive (default):** Context persists until cluster terminates. - -**Destroy when done:** ```bash -python /scripts/compute.py execute-code \ - --code "print('Done!')" \ - --compute-type cluster \ - --cluster-id "..." \ - --destroy-context +databricks api post /api/1.2/commands/execute --json '{ + "language":"python","clusterId":"...","contextId":"...", + "command":"dbutils.library.restartPython()" +}' ``` ## Managing Clusters -Two equivalent paths: the standalone script (convenience wrapper) or the raw `databricks` CLI (more fields exposed). Prefer the script for the common operations listed here. +All cluster lifecycle goes through `databricks clusters`: ```bash -# List all clusters -python /scripts/compute.py list-compute --resource clusters +# List all clusters (full output) +databricks clusters list --cluster-sources UI,API --output json -# Get specific cluster status -python /scripts/compute.py list-compute --cluster-id "1234-567890-abcdef" +# Get one cluster's state +databricks clusters get | jq '{state, cluster_id, cluster_name}' -# Start a cluster (WITH USER APPROVAL ONLY - costs money, 3-8min startup) -python /scripts/compute.py manage-cluster --action start --cluster-id "1234-567890-abcdef" +# Start a cluster (WITH USER APPROVAL ONLY — costs money, 3-8 min startup) +databricks clusters start -# Terminate a cluster (reversible) -python /scripts/compute.py manage-cluster --action terminate --cluster-id "1234-567890-abcdef" +# Terminate (reversible — cluster definition kept, state lost) +databricks clusters delete -# Create a new cluster -python /scripts/compute.py manage-cluster --action create --name "my-cluster" --num-workers 2 -``` - -### Filter running interactive clusters only (raw CLI) +# Permanent delete (irreversible) +databricks clusters permanent-delete -Useful before asking the user which cluster to reuse. `--cluster-sources UI,API` excludes job clusters (which would otherwise dominate the list on busy workspaces): +# Restart +databricks clusters restart -```bash -databricks clusters list --cluster-sources UI,API --output json \ - | jq '.[] | select(.state == "RUNNING")' +# Resize +databricks clusters resize --num-workers 4 ``` -### Create with a full spec (raw CLI) - -The script's `manage-cluster --action create` is fine for quick defaults; for full control (DBR version, instance type, tags) use the raw CLI: +### Create with a full spec ```bash -# SPARK_VERSION is positional; custom_tags recommended for resource tracking +# SPARK_VERSION is positional. custom_tags recommended for resource tracking. databricks clusters create 15.4.x-scala2.12 --json '{ "cluster_name": "my-cluster", "node_type_id": "i3.xlarge", @@ -170,13 +194,21 @@ databricks clusters create 15.4.x-scala2.12 --json '{ }' ``` +Discover node types and DBR versions: + +```bash +databricks clusters list-node-types | jq '.node_types[] | {node_type_id, memory_mb, num_cores}' +databricks clusters spark-versions | jq '.versions[] | {key, name}' +``` + ## Common Issues | Issue | Solution | |-------|----------| | "No running cluster" | Ask user to start or use serverless | -| Context not found | Context expired; create new one | -| Library not found | `%pip install ` then restart Python if needed | +| `Context not found` | Context expired (cluster restarted, or destroyed); create a new one | +| Library not found mid-session | `%pip install `, then `dbutils.library.restartPython()` if needed | +| Command stuck in `Running` | Send `databricks api post /api/1.2/commands/cancel --json '{"clusterId":"...","contextId":"...","commandId":"..."}'` | ## When NOT to Use diff --git a/experimental/databricks-execution-compute/scripts/compute.py b/experimental/databricks-execution-compute/scripts/compute.py deleted file mode 100644 index e90a4ac..0000000 --- a/experimental/databricks-execution-compute/scripts/compute.py +++ /dev/null @@ -1,743 +0,0 @@ -#!/usr/bin/env python3 -"""Compute CLI - Execute code and manage compute resources on Databricks. - -Standalone script with no external dependencies beyond databricks-sdk. - -Commands: -- execute-code: Run code on serverless or cluster compute -- list-compute: List clusters, node types, or spark versions -- manage-cluster: Create, start, terminate, or delete clusters - -Requires: pip install databricks-sdk -""" - -import argparse -import base64 -import json -import uuid -from dataclasses import dataclass -from datetime import timedelta -from typing import Any, Dict, List, Optional - -from databricks.sdk import WorkspaceClient -from databricks.sdk.service.compute import ( - ClusterSource, - CommandStatus, - ContextStatus, - Environment, - Language, - ListClustersFilterBy, - ResultType, - State, -) -from databricks.sdk.service.jobs import ( - JobEnvironment, - NotebookTask, - RunResultState, - Source, - SubmitTask, -) -from databricks.sdk.service.workspace import ImportFormat, Language as WsLang - - -# --------------------------------------------------------------------------- -# Authentication -# --------------------------------------------------------------------------- - -def get_workspace_client() -> WorkspaceClient: - """Get authenticated WorkspaceClient using standard auth chain.""" - return WorkspaceClient() - - -def get_current_username() -> str: - """Get the current user's username.""" - w = get_workspace_client() - return w.current_user.me().user_name - - -# --------------------------------------------------------------------------- -# Exceptions -# --------------------------------------------------------------------------- - -class NoRunningClusterError(Exception): - """Raised when no running cluster is available.""" - - def __init__(self, message: str, suggestions: List[str] = None, startable_clusters: List[Dict] = None): - super().__init__(message) - self.suggestions = suggestions or [] - self.startable_clusters = startable_clusters or [] - - -# --------------------------------------------------------------------------- -# Result Classes -# --------------------------------------------------------------------------- - -@dataclass -class ExecutionResult: - """Result from cluster command execution.""" - success: bool - output: str = "" - error: str = "" - cluster_id: str = "" - context_id: str = "" - status: str = "" - result_type: str = "" - - def to_dict(self) -> Dict[str, Any]: - return { - "success": self.success, - "output": self.output, - "error": self.error, - "cluster_id": self.cluster_id, - "context_id": self.context_id, - "status": self.status, - "result_type": self.result_type, - } - - -@dataclass -class ServerlessRunResult: - """Result from serverless code execution.""" - success: bool - output: str = "" - error: str = "" - run_id: int = 0 - run_page_url: str = "" - state: str = "" - execution_duration_ms: int = 0 - - def to_dict(self) -> Dict[str, Any]: - return { - "success": self.success, - "output": self.output, - "error": self.error, - "run_id": self.run_id, - "run_page_url": self.run_page_url, - "state": self.state, - "execution_duration_ms": self.execution_duration_ms, - } - - -# --------------------------------------------------------------------------- -# Cluster Execution -# --------------------------------------------------------------------------- - -def list_clusters() -> List[Dict[str, Any]]: - """List interactive clusters created by humans (UI/API, not jobs).""" - w = get_workspace_client() - clusters = [] - # Filter to only UI and API created clusters (interactive, human-created) - # Excludes JOB clusters (created by jobs) and other system clusters - filter_by = ListClustersFilterBy( - cluster_sources=[ClusterSource.UI, ClusterSource.API] - ) - for c in w.clusters.list(filter_by=filter_by, page_size=100): - clusters.append({ - "cluster_id": c.cluster_id, - "cluster_name": c.cluster_name, - "state": c.state.value if c.state else "UNKNOWN", - "creator_user_name": c.creator_user_name, - "spark_version": c.spark_version, - "node_type_id": c.node_type_id, - "num_workers": c.num_workers, - }) - return clusters - - -def get_best_cluster() -> str: - """Get the best running interactive cluster ID, or raise NoRunningClusterError.""" - w = get_workspace_client() - running = [] - startable = [] - - # Filter to only interactive clusters (UI/API created) - filter_by = ListClustersFilterBy( - cluster_sources=[ClusterSource.UI, ClusterSource.API] - ) - for c in w.clusters.list(filter_by=filter_by, page_size=100): - info = { - "cluster_id": c.cluster_id, - "cluster_name": c.cluster_name, - "state": c.state.value if c.state else "UNKNOWN", - } - if c.state == State.RUNNING: - running.append(info) - elif c.state in (State.TERMINATED, State.PENDING): - startable.append(info) - - if running: - return running[0]["cluster_id"] - - raise NoRunningClusterError( - "No running cluster available.", - suggestions=[ - "Start an existing cluster with: python compute.py manage-cluster --action start --cluster-id ", - "Use serverless compute: python compute.py execute-code --compute-type serverless --code '...'", - ], - startable_clusters=startable, - ) - - -def start_cluster(cluster_id: str) -> Dict[str, Any]: - """Start a cluster and wait for it to be running.""" - w = get_workspace_client() - w.clusters.start(cluster_id=cluster_id) - # Don't wait - just return immediately - return {"success": True, "cluster_id": cluster_id, "message": "Cluster start initiated"} - - -def get_cluster_status(cluster_id: str) -> Dict[str, Any]: - """Get the status of a specific cluster.""" - w = get_workspace_client() - c = w.clusters.get(cluster_id=cluster_id) - return { - "cluster_id": c.cluster_id, - "cluster_name": c.cluster_name, - "state": c.state.value if c.state else "UNKNOWN", - "state_message": c.state_message, - "creator_user_name": c.creator_user_name, - "spark_version": c.spark_version, - "node_type_id": c.node_type_id, - "num_workers": c.num_workers, - } - - -def _get_or_create_context(w: WorkspaceClient, cluster_id: str, context_id: Optional[str], language: str) -> str: - """Get existing context or create a new one.""" - lang_map = {"python": Language.PYTHON, "scala": Language.SCALA, "sql": Language.SQL, "r": Language.R} - lang = lang_map.get(language.lower(), Language.PYTHON) - - if context_id: - # Verify context exists - try: - status = w.command_execution.context_status(cluster_id=cluster_id, context_id=context_id) - if status.status == ContextStatus.RUNNING: - return context_id - except Exception: - pass # Context doesn't exist, create new one - - # Create new context - ctx = w.command_execution.create(cluster_id=cluster_id, language=lang).result() - return ctx.id - - -def execute_databricks_command( - code: str, - cluster_id: Optional[str] = None, - context_id: Optional[str] = None, - language: str = "python", - timeout: int = 120, - destroy_context_on_completion: bool = False, -) -> ExecutionResult: - """Execute code on a Databricks cluster using Command Execution API.""" - w = get_workspace_client() - - # Get cluster ID if not provided - if not cluster_id: - cluster_id = get_best_cluster() - - # Get or create context - ctx_id = _get_or_create_context(w, cluster_id, context_id, language) - - # Execute command - lang_map = {"python": Language.PYTHON, "scala": Language.SCALA, "sql": Language.SQL, "r": Language.R} - lang = lang_map.get(language.lower(), Language.PYTHON) - - try: - cmd = w.command_execution.execute( - cluster_id=cluster_id, - context_id=ctx_id, - language=lang, - command=code, - ).result(timeout=timedelta(seconds=timeout)) - - # Parse results - output = "" - error = "" - result_type = cmd.results.result_type.value if cmd.results and cmd.results.result_type else "" - - if cmd.results: - if cmd.results.result_type == ResultType.TEXT: - output = cmd.results.data or "" - elif cmd.results.result_type == ResultType.TABLE: - output = json.dumps(cmd.results.data) if cmd.results.data else "" - elif cmd.results.result_type == ResultType.ERROR: - error = cmd.results.cause or str(cmd.results.data) or "Unknown error" - - success = cmd.status == CommandStatus.FINISHED and cmd.results.result_type != ResultType.ERROR - - return ExecutionResult( - success=success, - output=output, - error=error, - cluster_id=cluster_id, - context_id=ctx_id, - status=cmd.status.value if cmd.status else "", - result_type=result_type, - ) - - finally: - if destroy_context_on_completion and ctx_id: - try: - w.command_execution.destroy(cluster_id=cluster_id, context_id=ctx_id) - except Exception: - pass - - -# --------------------------------------------------------------------------- -# Serverless Execution -# --------------------------------------------------------------------------- - -def run_code_on_serverless( - code: str, - language: str = "python", - timeout: int = 1800, - environments: Optional[List[Any]] = None, -) -> ServerlessRunResult: - """Run code on serverless compute using Jobs API runs/submit. - - Args: - code: Source to execute. - language: "python" or "sql". - timeout: Max wait time in seconds. - environments: Optional list of environments to install dependencies. - Each entry may be a dict (documented shape) or a typed - ``JobEnvironment``. Dict shape: - {"environment_key": "my_env", - "spec": {"client": "4", "dependencies": ["pandas", "mlflow"]}} - ``client`` must be ``"4"`` (or higher) for dependencies to install; - ``"1"`` is the default but does NOT install ``dependencies``. - """ - w = get_workspace_client() - - # Create temp notebook - username = get_current_username() - notebook_name = f"_tmp_serverless_{uuid.uuid4().hex[:8]}" - notebook_path = f"/Workspace/Users/{username}/.tmp/{notebook_name}" - - # Ensure directory exists - try: - w.workspace.mkdirs(f"/Workspace/Users/{username}/.tmp") - except Exception: - pass - - # Upload notebook content - if language.lower() == "sql": - notebook_content = f"-- Databricks notebook source\n{code}" - else: - notebook_content = f"# Databricks notebook source\n{code}" - - content_b64 = base64.b64encode(notebook_content.encode()).decode() - - ws_lang_map = {"python": WsLang.PYTHON, "sql": WsLang.SQL} - ws_lang = ws_lang_map.get(language.lower(), WsLang.PYTHON) - - w.workspace.import_( - path=notebook_path, - content=content_b64, - format=ImportFormat.SOURCE, - language=ws_lang, - overwrite=True, - ) - - # Normalize environments (accept dicts or typed JobEnvironment). - # The SDK serializes each list item via .as_dict(), so raw dicts fail there; - # typed objects also lack .get(), so we need to canonicalize before reading - # environment_key for the task binding. - if environments: - normalized = [] - for e in environments: - if isinstance(e, JobEnvironment): - normalized.append(e) - elif isinstance(e, dict): - spec = e.get("spec", {}) - if isinstance(spec, dict): - spec = Environment(**spec) - elif not isinstance(spec, Environment): - raise TypeError( - f"environments[].spec must be a dict or Environment, got {type(spec).__name__}" - ) - normalized.append( - JobEnvironment( - environment_key=e.get("environment_key", "default"), - spec=spec, - ) - ) - else: - raise TypeError( - f"environments[] entries must be dict or JobEnvironment, got {type(e).__name__}" - ) - job_envs = normalized - env_key = job_envs[0].environment_key or "default" - else: - job_envs = [JobEnvironment(environment_key="default", spec=Environment(client="1"))] - env_key = "default" - - try: - # Submit run - run = w.jobs.submit( - run_name=f"serverless-run-{uuid.uuid4().hex[:8]}", - tasks=[ - SubmitTask( - task_key="main", - notebook_task=NotebookTask( - notebook_path=notebook_path, - source=Source.WORKSPACE, - ), - environment_key=env_key, - ) - ], - environments=job_envs, - ).result(timeout=timedelta(seconds=timeout)) - - # Get run output - run_output = w.jobs.get_run_output(run_id=run.tasks[0].run_id) - - output = "" - error = "" - success = run.state.result_state == RunResultState.SUCCESS - - if run_output.notebook_output and run_output.notebook_output.result: - output = run_output.notebook_output.result - if run_output.error: - error = run_output.error - - return ServerlessRunResult( - success=success, - output=output, - error=error, - run_id=run.run_id, - run_page_url=run.run_page_url or "", - state=run.state.result_state.value if run.state and run.state.result_state else "", - execution_duration_ms=run.execution_duration or 0, - ) - - finally: - # Cleanup temp notebook - try: - w.workspace.delete(notebook_path) - except Exception: - pass - - -# --------------------------------------------------------------------------- -# Cluster Management -# --------------------------------------------------------------------------- - -def create_cluster( - name: str, - num_workers: int = 1, - autotermination_minutes: int = 120, - spark_version: Optional[str] = None, - node_type_id: Optional[str] = None, -) -> Dict[str, Any]: - """Create a new cluster.""" - w = get_workspace_client() - - # Get defaults if not provided - if not spark_version: - versions = list(w.clusters.spark_versions()) - # Pick latest LTS - for v in versions: - if "LTS" in v.name and "ML" not in v.name: - spark_version = v.key - break - if not spark_version and versions: - spark_version = versions[0].key - - if not node_type_id: - node_types = list(w.clusters.list_node_types().node_types) - # Pick smallest available - for nt in sorted(node_types, key=lambda x: x.memory_mb or 0): - if nt.is_deprecated is not True: - node_type_id = nt.node_type_id - break - - cluster = w.clusters.create( - cluster_name=name, - spark_version=spark_version, - node_type_id=node_type_id, - num_workers=num_workers, - autotermination_minutes=autotermination_minutes, - ).result() - - return { - "success": True, - "cluster_id": cluster.cluster_id, - "cluster_name": name, - "message": "Cluster created", - } - - -def terminate_cluster(cluster_id: str) -> Dict[str, Any]: - """Terminate a cluster (can be restarted).""" - w = get_workspace_client() - w.clusters.delete(cluster_id=cluster_id) - return {"success": True, "cluster_id": cluster_id, "message": "Cluster terminated"} - - -def delete_cluster(cluster_id: str) -> Dict[str, Any]: - """Permanently delete a cluster.""" - w = get_workspace_client() - w.clusters.permanent_delete(cluster_id=cluster_id) - return {"success": True, "cluster_id": cluster_id, "message": "Cluster permanently deleted"} - - -def list_node_types() -> List[Dict[str, Any]]: - """List available node types.""" - w = get_workspace_client() - result = [] - for nt in w.clusters.list_node_types().node_types: - result.append({ - "node_type_id": nt.node_type_id, - "memory_mb": nt.memory_mb, - "num_cores": nt.num_cores, - "description": nt.description, - "is_deprecated": nt.is_deprecated, - }) - return result - - -def list_spark_versions() -> List[Dict[str, Any]]: - """List available Spark versions.""" - w = get_workspace_client() - result = [] - response = w.clusters.spark_versions() - for v in response.versions or []: - result.append({ - "key": v.key, - "name": v.name, - }) - return result - - -# --------------------------------------------------------------------------- -# CLI Commands -# --------------------------------------------------------------------------- - -def _none_if_empty(value): - """Convert empty strings to None.""" - return None if value == "" else value - - -def _no_cluster_error_response(e: NoRunningClusterError) -> Dict[str, Any]: - """Build a structured error response when no running cluster is available.""" - return { - "success": False, - "error": str(e), - "suggestions": e.suggestions, - "startable_clusters": e.startable_clusters, - } - - -def cmd_execute_code(args): - """Execute code on Databricks via serverless or cluster compute.""" - code = _none_if_empty(args.code) - file_path = _none_if_empty(args.file) - cluster_id = _none_if_empty(args.cluster_id) - context_id = _none_if_empty(args.context_id) - language = _none_if_empty(args.language) or "python" - compute_type = args.compute_type - timeout = args.timeout - destroy_context = args.destroy_context - - # Parse --environments (JSON string or @path/to/file.json) for serverless - environments = None - env_arg = _none_if_empty(getattr(args, "environments", None)) - if env_arg: - try: - if env_arg.startswith("@"): - with open(env_arg[1:], "r", encoding="utf-8") as fh: - environments = json.load(fh) - else: - environments = json.loads(env_arg) - except (OSError, json.JSONDecodeError) as e: - return {"success": False, "error": f"Invalid --environments: {e}"} - if not isinstance(environments, list): - return {"success": False, - "error": "--environments must be a JSON array of environment objects"} - - if not code and not file_path: - return {"success": False, "error": "Either --code or --file must be provided."} - - # Read code from file if provided - if file_path and not code: - try: - with open(file_path, "r", encoding="utf-8") as f: - code = f.read() - except FileNotFoundError: - return {"success": False, "error": f"File not found: {file_path}"} - - # Resolve "auto" compute type - if compute_type == "auto": - if cluster_id or context_id: - compute_type = "cluster" - elif language.lower() in ("scala", "r"): - compute_type = "cluster" - else: - compute_type = "serverless" - - # Serverless execution - if compute_type == "serverless": - default_timeout = timeout if timeout else 1800 - try: - result = run_code_on_serverless( - code=code, - language=language, - timeout=default_timeout, - environments=environments, - ) - except TypeError as e: - return {"success": False, "error": str(e)} - return result.to_dict() - - if environments: - return {"success": False, - "error": "--environments is only supported with --compute-type serverless"} - - # Cluster execution - default_timeout = timeout if timeout else 120 - try: - result = execute_databricks_command( - code=code, - cluster_id=cluster_id, - context_id=context_id, - language=language, - timeout=default_timeout, - destroy_context_on_completion=destroy_context, - ) - return result.to_dict() - except NoRunningClusterError as e: - return _no_cluster_error_response(e) - - -def cmd_list_compute(args): - """List compute resources: clusters, node types, or spark versions.""" - resource = args.resource.lower() - cluster_id = _none_if_empty(args.cluster_id) - auto_select = args.auto_select - - if resource == "clusters": - if cluster_id: - return get_cluster_status(cluster_id) - if auto_select: - try: - best = get_best_cluster() - return {"cluster_id": best} - except NoRunningClusterError as e: - return _no_cluster_error_response(e) - return {"clusters": list_clusters()} - - elif resource == "node_types": - return {"node_types": list_node_types()} - - elif resource == "spark_versions": - return {"spark_versions": list_spark_versions()} - - else: - return {"success": False, "error": f"Unknown resource: {resource}. Use: clusters, node_types, spark_versions"} - - -def cmd_manage_cluster(args): - """Create, start, terminate, or delete a cluster.""" - action = args.action.lower() - cluster_id = _none_if_empty(args.cluster_id) - name = _none_if_empty(args.name) - - if action == "create": - if not name: - return {"success": False, "error": "name is required for create action."} - return create_cluster( - name=name, - num_workers=args.num_workers or 1, - autotermination_minutes=args.autotermination_minutes or 120, - ) - - elif action == "start": - if not cluster_id: - return {"success": False, "error": "cluster_id is required for start action."} - return start_cluster(cluster_id) - - elif action == "terminate": - if not cluster_id: - return {"success": False, "error": "cluster_id is required for terminate action."} - return terminate_cluster(cluster_id) - - elif action == "delete": - if not cluster_id: - return {"success": False, "error": "cluster_id is required for delete action."} - return delete_cluster(cluster_id) - - elif action == "get": - if not cluster_id: - return {"success": False, "error": "cluster_id is required for get action."} - try: - return get_cluster_status(cluster_id) - except Exception as e: - if "does not exist" in str(e).lower(): - return {"success": True, "cluster_id": cluster_id, "state": "DELETED", "exists": False} - return {"success": False, "error": str(e)} - - else: - return {"success": False, "error": f"Unknown action: {action}. Use: create, start, terminate, delete, get"} - - -# --------------------------------------------------------------------------- -# CLI Setup -# --------------------------------------------------------------------------- - -def main(): - parser = argparse.ArgumentParser( - description="Execute code and manage compute on Databricks", - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - subparsers = parser.add_subparsers(dest="command", required=True) - - # execute-code - exec_parser = subparsers.add_parser("execute-code", help="Run code on Databricks") - exec_parser.add_argument("--code", help="Code to execute") - exec_parser.add_argument("--file", help="File to execute") - exec_parser.add_argument("--compute-type", default="auto", choices=["auto", "serverless", "cluster"], - help="Compute type (default: auto)") - exec_parser.add_argument("--cluster-id", help="Cluster ID (for cluster compute)") - exec_parser.add_argument("--context-id", help="Context ID (reuse existing context)") - exec_parser.add_argument("--language", default="python", choices=["python", "scala", "sql", "r"], - help="Language (default: python)") - exec_parser.add_argument("--timeout", type=int, help="Timeout in seconds") - exec_parser.add_argument("--destroy-context", action="store_true", help="Destroy context after execution") - exec_parser.add_argument( - "--environments", - help=( - "Serverless only. JSON array of environments (or @path/to/file.json). " - 'Example: \'[{"environment_key":"ml_env","spec":{"client":"4",' - '"dependencies":["mlflow","scikit-learn"]}}]\'. ' - 'IMPORTANT: "client":"4" installs dependencies; "1" does not.' - ), - ) - exec_parser.set_defaults(func=cmd_execute_code) - - # list-compute - list_parser = subparsers.add_parser("list-compute", help="List compute resources") - list_parser.add_argument("--resource", default="clusters", choices=["clusters", "node_types", "spark_versions"], - help="Resource to list (default: clusters)") - list_parser.add_argument("--cluster-id", help="Get specific cluster status") - list_parser.add_argument("--auto-select", action="store_true", help="Return best running cluster") - list_parser.set_defaults(func=cmd_list_compute) - - # manage-cluster - manage_parser = subparsers.add_parser("manage-cluster", help="Manage clusters") - manage_parser.add_argument("--action", required=True, choices=["create", "start", "terminate", "delete", "get"], - help="Action to perform") - manage_parser.add_argument("--cluster-id", help="Cluster ID") - manage_parser.add_argument("--name", help="Cluster name (for create)") - manage_parser.add_argument("--num-workers", type=int, help="Number of workers (for create)") - manage_parser.add_argument("--autotermination-minutes", type=int, help="Auto-termination minutes (for create)") - manage_parser.set_defaults(func=cmd_manage_cluster) - - args = parser.parse_args() - result = args.func(args) - print(json.dumps(result, indent=2, default=str)) - - -if __name__ == "__main__": - main() diff --git a/manifest.json b/manifest.json index d3f7e77..e6148e5 100644 --- a/manifest.json +++ b/manifest.json @@ -273,8 +273,7 @@ "assets/databricks.svg", "references/1-databricks-connect.md", "references/2-serverless-job.md", - "references/3-interactive-cluster.md", - "scripts/compute.py" + "references/3-interactive-cluster.md" ] }, "databricks-iceberg": { diff --git a/skills/databricks-pipelines/references/workflows.md b/skills/databricks-pipelines/references/workflows.md index 671157c..6e41be4 100644 --- a/skills/databricks-pipelines/references/workflows.md +++ b/skills/databricks-pipelines/references/workflows.md @@ -349,7 +349,7 @@ If the user already has `bronze/`, `silver/`, `gold/` folders without a bundle, | Issue | Fix | |-------|-----| -| `Command not found: databricks` | `pip install databricks-cli` | +| `Command not found: databricks` | Install the Databricks CLI — see the parent `databricks-core` skill (CLI installation reference) | | `Invalid catalog name` | `databricks catalogs list` and verify; create with `databricks catalogs create --json '{"name": "..."}'` | | `Language option not recognized` | Use lowercase `"sql"` / `"python"`, not `"SQL"` / `"Python"` | | Files deploy but pipeline doesn't pick them up | Glob pattern in `libraries` doesn't match — re-check `include` path relative to the resource file |