diff --git a/manifest.json b/manifest.json index 16820d5..b708242 100644 --- a/manifest.json +++ b/manifest.json @@ -249,16 +249,19 @@ "version": "0.0.1" }, "databricks-model-serving": { - "description": "Manage Databricks Model Serving endpoints via CLI.", + "description": "Databricks Model Serving (ops) plus MLflow model development (dev): manage serving endpoints, train and register models to Unity Catalog with @prod aliases, batch-score via spark_udf, build custom...", "files": [ "SKILL.md", "agents/openai.yaml", "assets/databricks.png", "assets/databricks.svg", - "references/off-platform-streaming.md" + "references/custom-pyfunc.md", + "references/genai-agents.md", + "references/off-platform-streaming.md", + "references/training-and-serving.md" ], "repo_dir": "skills", - "version": "0.1.0" + "version": "0.3.0" }, "databricks-pipelines": { "description": "Develop Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables) on Databricks.", diff --git a/skills/databricks-model-serving/SKILL.md b/skills/databricks-model-serving/SKILL.md index da751ed..634b8c6 100644 --- a/skills/databricks-model-serving/SKILL.md +++ b/skills/databricks-model-serving/SKILL.md @@ -1,9 +1,9 @@ --- name: databricks-model-serving -description: "Manage Databricks Model Serving endpoints via CLI. Use when asked to create, configure, query, or manage model serving endpoints for LLM inference, custom models, or external models." +description: "Databricks Model Serving (ops) plus MLflow model development (dev): manage serving endpoints, train and register models to Unity Catalog with @prod aliases, batch-score via spark_udf, build custom PyFunc / ResponsesAgent models, and discover Foundation Model API endpoints." compatibility: Requires databricks CLI (>= v0.294.0) metadata: - version: "0.1.0" + version: "0.3.0" parent: databricks-core --- @@ -17,7 +17,7 @@ Model Serving provides managed endpoints for serving LLMs, custom ML models, and | Type | When to Use | Key Detail | |------|-------------|------------| -| Pay-per-token | Foundation Model APIs (Llama, DBRX, etc.) | Uses `system.ai.*` catalog models, simplest setup | +| Pay-per-token | Foundation Model APIs (Llama, GPT-5, Claude, Gemini, etc.) | Uses `system.ai.*` catalog models, simplest setup. Discover endpoints at runtime — see [references/training-and-serving.md § Foundation Model API endpoints](references/training-and-serving.md#foundation-model-api-endpoints). | | Provisioned throughput | Dedicated GPU capacity | Guaranteed throughput, higher cost | | Custom model | Your own MLflow models or containers | Deploy any model with an MLflow signature | @@ -74,7 +74,7 @@ databricks serving-endpoints create \ }' --profile ``` -- Discover available Foundation Models: check the `system.ai` catalog in Unity Catalog, or use `databricks serving-endpoints list --profile ` to see available endpoints. Use `databricks serving-endpoints get-open-api --profile ` to inspect the endpoint's API schema. +- Discover available Foundation Models: see [references/training-and-serving.md § Foundation Model API endpoints](references/training-and-serving.md#foundation-model-api-endpoints) for the runtime-list snippet and default-picking rules. You can also check the `system.ai` catalog in Unity Catalog, or run `databricks serving-endpoints list --profile ` to see what's deployed in the workspace. Use `databricks serving-endpoints get-open-api --profile ` to inspect a specific endpoint's API schema. - Long-running operation; the CLI waits for completion by default. Use `--no-wait` to return immediately, then poll: ```bash databricks serving-endpoints get --profile @@ -177,6 +177,16 @@ env: Then add a tRPC route to call it from your app. For the full app integration pattern, use the **`databricks-apps`** skill and read the [Model Serving Guide](../databricks-apps/references/appkit/model-serving.md). +### Develop & deploy new models + +This skill is ops-focused (manage existing endpoints). For the dev-side flow — train a model, register to Unity Catalog, log a PyFunc or `ResponsesAgent`, deploy — see the references below. + +| Reference | When to read | +|---|---| +| [references/training-and-serving.md](references/training-and-serving.md) | Train + register classical ML with `mlflow.autolog`, alias-based promotion (`@prod`), batch scoring via `spark_udf`, real-time endpoint create + zero-downtime version swap, async deploy via `jobs submit --no-wait`. Includes the Foundation Model API endpoints runtime-list and the gotchas table. | +| [references/custom-pyfunc.md](references/custom-pyfunc.md) | When `autolog` isn't enough — file-based `PythonModel` ("Models from Code"), `infer_signature`, `code_paths`, pre-deploy validation with `mlflow.models.predict(env_manager="uv")`. | +| [references/genai-agents.md](references/genai-agents.md) | Hand-rolled `ResponsesAgent` with LangGraph + `UCFunctionToolkit` + `VectorSearchRetrieverTool`. Includes the `create_text_output_item` helper-method gotcha and the `resources=[...]` passthrough-auth list. | + ## Troubleshooting | Error | Solution | diff --git a/skills/databricks-model-serving/references/custom-pyfunc.md b/skills/databricks-model-serving/references/custom-pyfunc.md new file mode 100644 index 0000000..8d85c3e --- /dev/null +++ b/skills/databricks-model-serving/references/custom-pyfunc.md @@ -0,0 +1,106 @@ +# Custom pyfunc model + +When sklearn / XGBoost autolog isn't enough: custom preprocessing not captured by a sklearn pipeline, multiple sub-models behind one endpoint, external API calls during inference, business-logic-heavy post-processing. + +Same UC registry + serving story as classical ML — only the *logging* step changes. + +## End-to-end example: file-based pyfunc with preprocessing + sub-model + +Project layout: + +``` +my_model/ +├── model.py # PythonModel + mlflow.models.set_model(...) +├── log_model.py # Logs + registers to UC +└── artifacts/ + ├── preprocessor.pkl + └── booster.json +``` + +```python +# model.py — logged verbatim via python_model="model.py" (Models from Code). +# DO NOT pickle a class instance; use this file-path pattern instead. +import json, pickle, pandas as pd +import mlflow +from mlflow.pyfunc import PythonModel + +class TurbineRiskModel(PythonModel): + def load_context(self, context): + with open(context.artifacts["preprocessor"], "rb") as f: + self.pre = pickle.load(f) + from xgboost import Booster + self.booster = Booster() + self.booster.load_model(context.artifacts["booster"]) + + def predict(self, context, model_input: pd.DataFrame, params=None) -> pd.DataFrame: + X = self.pre.transform(model_input) + proba = self.booster.predict(X) + return pd.DataFrame({ + "risk_score": proba, + "risk_level": ["HIGH" if p > 0.7 else "MEDIUM" if p > 0.4 else "LOW" for p in proba], + }) + +mlflow.models.set_model(TurbineRiskModel()) +``` + +```python +# log_model.py +import mlflow +from mlflow.models import infer_signature +from mlflow.tracking import MlflowClient + +mlflow.set_registry_uri("databricks-uc") +mlflow.set_experiment("/Users/me@example.com/turbine_risk") + +CATALOG, SCHEMA, NAME = "ai_demo_gen", "wind_farm", "turbine_risk" +FULL_NAME = f"{CATALOG}.{SCHEMA}.{NAME}" + +sample_input = pd.DataFrame({"vib_rms": [0.4], "rpm_mean": [18.2], "bearing_temp_max": [71.3]}) +sample_output = pd.DataFrame({"risk_score": [0.0], "risk_level": ["LOW"]}) + +with mlflow.start_run(): + info = mlflow.pyfunc.log_model( + name="model", + python_model="model.py", # file path, not an instance + artifacts={ + "preprocessor": "artifacts/preprocessor.pkl", + "booster": "artifacts/booster.json", + }, + signature=infer_signature(sample_input, sample_output), + input_example=sample_input, + # Pin exact versions — endpoint rebuilds the env from these: + pip_requirements=["mlflow==2.22.0", "xgboost==2.1.3", "scikit-learn==1.5.2", "pandas"], + # Extra modules to ship with the model (e.g. shared util libs): + # code_paths=["src/utils.py"], + registered_model_name=FULL_NAME, + ) + +# Pre-deploy validation — rebuilds the env locally and runs predict(). +# Catches missing deps / signature drift BEFORE the endpoint does. +mlflow.models.predict( + model_uri=info.model_uri, + input_data=sample_input, + env_manager="uv", # MLflow ≥ 2.22; falls back to "virtualenv" otherwise +) + +# Promote to @prod +client = MlflowClient(registry_uri="databricks-uc") +v = max(client.search_model_versions(f"name='{FULL_NAME}'"), key=lambda x: int(x.version)).version +client.set_registered_model_alias(FULL_NAME, "prod", v) +``` + +**Why `python_model="model.py"`**: file logged verbatim, no class pickling — avoids Python-version unpickle crashes between training and serving runtimes. Pair with `code_paths=[...]` to ship companion modules; `mlflow.models.set_model(instance)` at end of file is the contract (exactly one call). + +## Consume + +Same two paths as autologged classical ML — see [training-and-serving.md](training-and-serving.md#consume-batch-scoring-over-delta). + +- **Batch**: `mlflow.pyfunc.spark_udf(spark, model_uri=f"models:/{FULL_NAME}@prod", env_manager="local")` over a Delta table. +- **Real-time**: `client.create_endpoint(...)` (see training-and-serving.md). Query returns a DataFrame-shaped JSON since `predict` returns a DataFrame. + +```bash +databricks serving-endpoints query turbine-risk-endpoint --json '{ + "dataframe_records": [{"vib_rms": 0.6, "rpm_mean": 19.0, "bearing_temp_max": 78.0}] +}' +# → {"predictions": [{"risk_score": 0.82, "risk_level": "HIGH"}]} +``` diff --git a/skills/databricks-model-serving/references/genai-agents.md b/skills/databricks-model-serving/references/genai-agents.md new file mode 100644 index 0000000..5302e7b --- /dev/null +++ b/skills/databricks-model-serving/references/genai-agents.md @@ -0,0 +1,251 @@ +# Custom GenAI agents with MLflow ResponsesAgent + +Edge case. **For most demos, use the `databricks-agent-bricks` skill** — pre-built Knowledge Assistants and Supervisor Agents wire up Genie + KAs + tools without any agent code. Hand-roll a `ResponsesAgent` only when you need a custom orchestration the supervisor can't express (custom routing logic, multi-step plans, agent that calls another agent over HTTP). + +## What ResponsesAgent is + +MLflow 3's standardized agent interface. OpenAI-compatible request/response (`{input: [{role, content}]}` → `{output: [...]}`). Supports streaming. Logs with `python_model="agent.py"` (file-based) and deploys via `databricks.agents.deploy()` to a serving endpoint with built-in tracing and eval hooks. + +## Full example: LangGraph agent with UC Function + Vector Search tools + +Project layout: + +``` +my_agent/ +├── agent.py # LangGraphAgent + tools + mlflow.models.set_model(...) +├── log_model.py # Logs with resources= for auto-auth, registers to UC +└── deploy_agent.py # Submitted as a job because deploy takes ~15 min +``` + +```python +# agent.py +import mlflow +from mlflow.pyfunc import ResponsesAgent +from mlflow.types.responses import ( + ResponsesAgentRequest, ResponsesAgentResponse, ResponsesAgentStreamEvent, + output_to_responses_items_stream, to_chat_completions_input, +) +from databricks_langchain import ( + ChatDatabricks, UCFunctionToolkit, VectorSearchRetrieverTool, +) +from langchain_core.messages import AIMessage +from langchain_core.runnables import RunnableLambda +from langgraph.graph import END, StateGraph +from langgraph.graph.message import add_messages +from langgraph.prebuilt.tool_node import ToolNode +from typing import Annotated, Generator, Sequence, TypedDict + +LLM_ENDPOINT = "databricks-claude-sonnet-4-6" # resolve at runtime — see training-and-serving.md +VS_INDEX = "ai_demo_gen.wind_farm.docs_index" +UC_FUNCTIONS = ["ai_demo_gen.wind_farm.lookup_turbine_history"] +SYSTEM_PROMPT = ( + "You are a turbine ops assistant. Use lookup_turbine_history for hardware " + "history queries, the docs retriever for procedure questions." +) + +class State(TypedDict): + messages: Annotated[Sequence, add_messages] + +class TurbineAgent(ResponsesAgent): + def __init__(self): + self.llm = ChatDatabricks(endpoint=LLM_ENDPOINT, temperature=0.1) + # Tools — UC functions and Vector Search both come from databricks_langchain. + self.tools = list(UCFunctionToolkit(function_names=UC_FUNCTIONS).tools) + self.vs_tool = VectorSearchRetrieverTool( + index_name=VS_INDEX, num_results=5, + columns=["content", "doc_uri", "title"], + ) + self.tools.append(self.vs_tool) + self.llm_with_tools = self.llm.bind_tools(self.tools) + + def _graph(self): + def call_model(state): + msgs = [{"role": "system", "content": SYSTEM_PROMPT}] + state["messages"] + return {"messages": [self.llm_with_tools.invoke(msgs)]} + def should_continue(state): + last = state["messages"][-1] + return "tools" if isinstance(last, AIMessage) and last.tool_calls else "end" + + g = StateGraph(State) + g.add_node("agent", RunnableLambda(call_model)) + g.add_node("tools", ToolNode(self.tools)) + g.set_entry_point("agent") + g.add_conditional_edges("agent", should_continue, {"tools": "tools", "end": END}) + g.add_edge("tools", "agent") + return g.compile() + + def predict_stream(self, req: ResponsesAgentRequest) -> Generator[ResponsesAgentStreamEvent, None, None]: + msgs = to_chat_completions_input([m.model_dump() for m in req.input]) + for kind, payload in self._graph().stream({"messages": msgs}, stream_mode=["updates"]): + if kind != "updates": continue + for node in payload.values(): + if node.get("messages"): + yield from output_to_responses_items_stream(node["messages"]) + + def predict(self, req: ResponsesAgentRequest) -> ResponsesAgentResponse: + items = [ev.item for ev in self.predict_stream(req) + if ev.type == "response.output_item.done"] + return ResponsesAgentResponse(output=items) + +mlflow.langchain.autolog() +mlflow.models.set_model(TurbineAgent()) +``` + +### CRITICAL: output items must use helper methods + +The supervisor will silently drop your output if you return raw dicts: + +```python +# WRONG — raw dicts silently fail +return ResponsesAgentResponse(output=[{"role": "assistant", "content": "..."}]) + +# CORRECT +return ResponsesAgentResponse(output=[ + self.create_text_output_item(text="...", id="msg_1"), +]) +``` + +Three helpers on `ResponsesAgent`: +- `self.create_text_output_item(text, id)` — text response. +- `self.create_function_call_item(id, call_id, name, arguments)` — tool call. +- `self.create_function_call_output_item(call_id, output)` — tool result. + +LangGraph's `output_to_responses_items_stream` (used above) emits these correctly, so the helpers are mainly relevant when hand-building events. + +## Log + register + +The non-obvious bit: `resources=[...]` is mandatory for auto-passthrough auth. Without it the deployed endpoint has no creds for the LLM, the UC functions, or the Vector Search index — every query returns `PERMISSION_DENIED` and the error doesn't explain why. + +```python +# log_model.py +import mlflow +from mlflow.models.resources import ( + DatabricksServingEndpoint, DatabricksFunction, DatabricksVectorSearchIndex, +) +from mlflow.tracking import MlflowClient +from agent import LLM_ENDPOINT, VS_INDEX, UC_FUNCTIONS + +mlflow.set_registry_uri("databricks-uc") +mlflow.set_experiment("/Users/me@example.com/turbine_agent") + +FULL_NAME = "ai_demo_gen.wind_farm.turbine_agent" + +resources = [ + DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT), + DatabricksVectorSearchIndex(index_name=VS_INDEX), + *[DatabricksFunction(function_name=f) for f in UC_FUNCTIONS], +] + +with mlflow.start_run(): + info = mlflow.pyfunc.log_model( + name="agent", + python_model="agent.py", # file path; agent.py calls set_model() + resources=resources, # auto-auth — DO NOT skip + input_example={"input": [{"role": "user", "content": "What's the maintenance history for turbine WTG-12?"}]}, + pip_requirements=[ + "mlflow==2.22.0", + "databricks-langchain", + "langgraph==0.3.4", + "databricks-agents", + "pydantic>=2", + ], + registered_model_name=FULL_NAME, + ) + +# Pre-deploy validation — rebuild the env, run a request, surface failures early. +mlflow.models.predict( + model_uri=info.model_uri, + input_data={"input": [{"role": "user", "content": "ping"}]}, + env_manager="uv", +) + +client = MlflowClient(registry_uri="databricks-uc") +v = max(client.search_model_versions(f"name='{FULL_NAME}'"), key=lambda x: int(x.version)).version +client.set_registered_model_alias(FULL_NAME, "prod", v) +``` + +### Resources that need passthrough auth + +| Resource | Import (`mlflow.models.resources`) | +|---|---| +| Foundation Model API / custom serving endpoint | `DatabricksServingEndpoint(endpoint_name=...)` | +| UC SQL/Python function | `DatabricksFunction(function_name=...)` | +| Vector Search index | `DatabricksVectorSearchIndex(index_name=...)` | +| Lakebase Postgres | `DatabricksLakebase(database_instance_name=...)` | + +Anything the agent calls that isn't covered here will hit auth errors at the endpoint. + +## Deploy (async job, ~15 min) + +`databricks.agents.deploy()` blocks for ~15 minutes — don't run it inline from the CLI. Submit as a serverless job so the chat session doesn't hold the connection. + +**Before submitting, check whether a deploy is already in flight or already done.** Re-submitting on top of a running deploy wastes ~15 min of serverless and can race for the same endpoint name. + +```bash +# 1. Is a deploy_agent run already active for this model? Match on run_name. +databricks jobs list-runs --active-only --output json \ + | jq --arg name "deploy_${MODEL_NAME}" '.runs[]? | select(.run_name == $name) | {run_id, state}' + +# 2. Does the target endpoint already exist? If READY on the right version, skip the redeploy. +databricks serving-endpoints get 2>/dev/null \ + | jq '{ready: .state.ready, served: [.config.served_models[] | {name, model_version}]}' +``` + +If either check returns a hit, follow the existing run with `jobs get-run ` instead of submitting a new one. + +```python +# deploy_agent.py +import json, sys +from databricks import agents + +model_name = sys.argv[1] +version = sys.argv[2] +endpoint_name = sys.argv[3] if len(sys.argv) > 3 else None + +# Always pass endpoint_name explicitly — auto-derived names are +# `agents_--` with dots → dashes, which is unpredictable. +kwargs = {"tags": {"project": "demo"}} +if endpoint_name: + kwargs["endpoint_name"] = endpoint_name + +deployment = agents.deploy(model_name, version, **kwargs) + +# Land structured output via dbutils.notebook.exit — print() unreliable on serverless. +dbutils.notebook.exit(json.dumps({ + "endpoint_name": deployment.endpoint_name, + "query_endpoint": deployment.query_endpoint, +})) +``` + +Submit via the same `jobs submit --no-wait` pattern shown in [training-and-serving.md](training-and-serving.md#train--deploy-as-a-serverless-job) — same script, just `deploy_agent.py` as the notebook. + +## Query + +```bash +databricks serving-endpoints query turbine-agent-endpoint --json '{ + "messages": [{"role": "user", "content": "What is the maintenance history for WTG-12?"}], + "max_tokens": 800 +}' +``` + +OpenAI-compatible client also works: + +```python +from openai import OpenAI +client = OpenAI( + base_url=f"{WORKSPACE_URL}/serving-endpoints/turbine-agent-endpoint", + api_key=DATABRICKS_TOKEN, +) +client.chat.completions.create( + model="turbine-agent-endpoint", + messages=[{"role": "user", "content": "..."}], +) +``` + +## Iteration + +`databricks workspace import-dir ./my_agent ... --overwrite` then re-run `log_model.py`. `agents.deploy()` with a new version **updates the existing endpoint in place** — no need to recreate. Re-deploy only when changing endpoint config (workload size, route splits). + +## Packages + +DBR 16.1+ has `mlflow` 3.x, `langchain`, `pydantic`, `databricks-sdk` pre-installed. Typically only need `%pip install -q databricks-langchain langgraph databricks-agents`. diff --git a/skills/databricks-model-serving/references/training-and-serving.md b/skills/databricks-model-serving/references/training-and-serving.md new file mode 100644 index 0000000..725131d --- /dev/null +++ b/skills/databricks-model-serving/references/training-and-serving.md @@ -0,0 +1,300 @@ +# ML Training & Serving on Databricks + +Train with MLflow → register to Unity Catalog → consume the **same artifact** as either a batch Spark UDF over Delta or a real-time REST endpoint (~5–15 min cold start, quota-bound — only when the user asks for per-request low-latency scoring). + +> **Always train on Databricks** (serverless job or notebook), never in the local Python process the agent is running in. Local training has no access to the silver tables, no MLflow tracking server, no UC registry path, and dies if the chat session drops — submit `databricks jobs submit --no-wait` (see "Train + deploy as a serverless job" below). Only fall back to local execution if the user explicitly asks for it. + +| Consumption | When | How | +|---|---|---| +| **Batch UDF** | Dashboards, daily/hourly scores, precomputed ~daily predictions, read by Genie/Dashboards, or app (typically synched to a lakebase table) | `mlflow.pyfunc.spark_udf(...)` → `INSERT INTO gold_predictions` | +| **Real-time endpoint** | Score on a user action (fraud at authorization, rec at page load) — sub-100ms | `mlflow.deployments.get_deploy_client()` (classical) / `agents.deploy()` (agents) | + +## Canonical flow + +``` +silver_ + silver_ + ▼ + notebook (as a serverless job): + ├── train with mlflow.autolog (XGBoost / sklearn / etc.) + ├── mlflow.register_model → UC: {catalog}.{schema}.{model} + ├── set_registered_model_alias(name, "prod", version) + └── spark_udf(@prod) over latest features → MERGE into gold_predictions + ▼ +gold__predictions ◄── dashboards, apps, Genie read this +``` + +One notebook, one artifact. Re-running = retraining. Gold is where truth lives — read paths never call the model directly. Keep label-window logic (`failure occurred within 7 days`) in the notebook during dev; once stable, promote to a silver materialized view in SDP. + +--- + +## Train and register (the 90% case) + +`mlflow.autolog()` captures params, metrics, code, and the model artifact for every run; `registered_model_name=...` auto-registers the best run to UC (auto-incremented version). Wrap training with **Optuna** so each trial is a child run and the best one is what gets registered. + +**Always `mlflow.set_registry_uri("databricks-uc")`** — without it, models land in the deprecated workspace registry. **The experiment's parent folder must exist** — `set_experiment` does NOT auto-create it (fails with `NOT_FOUND: Parent directory does not exist`). Pre-create it once with `databricks workspace mkdirs` before the job runs. + +```bash +# Once per project — create the parent folder for the MLflow experiment. +databricks workspace mkdirs /Users/me@example.com/turbine_project +``` + +```python +import mlflow, mlflow.xgboost, optuna +from mlflow.tracking import MlflowClient +from xgboost import XGBClassifier +from sklearn.metrics import roc_auc_score + +mlflow.set_registry_uri("databricks-uc") +mlflow.set_experiment("/Users/me@example.com/turbine_project/mlflow_experiment") + +CATALOG, SCHEMA, NAME = "ai_demo_gen", "wind_farm", "turbine_failure" +FULL_NAME = f"{CATALOG}.{SCHEMA}.{NAME}" + +mlflow.xgboost.autolog(log_input_examples=True, registered_model_name=FULL_NAME) + +# For imbalanced labels: stratify the split, set scale_pos_weight = neg/pos. +def objective(trial): + params = { + "n_estimators": trial.suggest_int("n_estimators", 100, 400), + "max_depth": trial.suggest_int("max_depth", 3, 10), + "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True), + } + with mlflow.start_run(nested=True): + m = XGBClassifier(**params).fit(X_train, y_train) + return roc_auc_score(y_test, m.predict_proba(X_test)[:, 1]) + +with mlflow.start_run(run_name="hpo"): + optuna.create_study(direction="maximize").optimize(objective, n_trials=20) + +# Move @prod alias to the just-registered version. Stages are deprecated — aliases only. +client = MlflowClient(registry_uri="databricks-uc") +latest = max(client.search_model_versions(f"name='{FULL_NAME}'"), + key=lambda v: int(v.version)) +client.set_registered_model_alias(FULL_NAME, "prod", latest.version) +``` + +**Framework autolog**: `mlflow.{sklearn,xgboost,lightgbm,pytorch,tensorflow,spark}.autolog()`. + +**Aliases, not stages**: UC dropped `Staging`/`Production`. Use movable `@prod`/`@challenger`; load with `models:/{full_name}@prod`. Promoting a new version is one `set_registered_model_alias` call. + +--- + +## Consume: batch scoring over Delta + +The cheap, default path. Load the registered model as a Spark UDF and score a Delta table; write predictions to a gold table that downstream consumers read. + +```python +import mlflow + +# env_manager rules: +# "local" → same runtime as training (same notebook/job). Fastest for the demo, keep that. +# "virtualenv"→ different runtime than training; rebuilds the model's env. +# "uv" → same as virtualenv but faster (MLflow ≥ 2.22). +predict = mlflow.pyfunc.spark_udf( + spark, + model_uri=f"models:/{FULL_NAME}@prod", + env_manager="local", +) + +features = spark.table(f"{CATALOG}.{SCHEMA}.silver_turbine_features_latest") +scored = features.withColumn("risk_score", predict(*[features[c] for c in feature_cols])) + +# Overwrite-per-run pattern for "latest score per entity": +scored.select("turbine_id", "risk_score", F.current_timestamp().alias("scored_at")) \ + .write.mode("overwrite").saveAsTable(f"{CATALOG}.{SCHEMA}.gold_turbine_predictions") +``` + +For incremental scoring with history, MERGE into the predictions table instead of overwrite. + +--- + +## Consume: real-time serving endpoint (only when required) + +Use the MLflow Deployments client. `workload_size: "Small"` + `scale_to_zero_enabled: true` is the default for demos and dev. First deploy can take ~5 min for classical ML + +```python +from mlflow.deployments import get_deploy_client + +client = get_deploy_client("databricks") +client.create_endpoint( + name="turbine-risk-endpoint", + config={ + "served_entities": [{ + "entity_name": FULL_NAME, + "entity_version": latest.version, + "workload_size": "Small", + "scale_to_zero_enabled": True, # Always + }], + # served_model_name = "-"; the API auto-derives it but + # you reference this exact string in traffic_config. + "traffic_config": {"routes": [ + {"served_model_name": f"{NAME}-{latest.version}", "traffic_percentage": 100} + ]}, + }, + # Tags are TOP-LEVEL — NOT inside `config`. Same {key, value} shape used + # by `serving-endpoints patch --add-tags`. Tag every demo resource for cleanup. + tags=[{"key": "project", "value": "demo"}], +) +``` + +**Zero-downtime version swap.** Repoint the alias *and* call `update_endpoint`: + +```python +client.set_registered_model_alias(FULL_NAME, "prod", new_version) +client.update_endpoint(endpoint="turbine-risk-endpoint", config={ + "served_entities": [{"entity_name": FULL_NAME, "entity_version": new_version, + "workload_size": "Small", "scale_to_zero_enabled": True}], + "traffic_config": {"routes": [ + {"served_model_name": f"{NAME}-{new_version}", "traffic_percentage": 100} + ]}, +}) +``` + +### Endpoint management (CLI) + +```bash +databricks serving-endpoints list +databricks serving-endpoints get turbine-risk-endpoint +databricks serving-endpoints delete turbine-risk-endpoint + +# Query a classical ML endpoint +databricks serving-endpoints query turbine-risk-endpoint --json '{ + "dataframe_records": [{"vibration": 0.42, "rpm": 18.3, "temp_c": 71.2}] +}' + +# Query a chat/agent endpoint +databricks serving-endpoints query my-agent-endpoint --json '{ + "messages": [{"role":"user","content":"Hello"}], "max_tokens": 500 +}' + +# Tag for project tracking +databricks serving-endpoints patch turbine-risk-endpoint --json '{ + "add_tags": [{"key": "project", "value": "demo"}] +}' +``` + +### Readiness has TWO state fields + +`databricks serving-endpoints get` returns both: + +- `state.ready` — `READY` once the endpoint has any working config (first deploy). +- `state.config_update` — `NOT_UPDATING` once the *current* config update finishes; `IN_PROGRESS` during a version swap. + +A loop watching only `state.ready` will say "ready" mid version-swap while the old version is still serving. Poll **both**: + +```bash +databricks serving-endpoints get turbine-risk-endpoint \ + | jq '{ready: .state.ready, config_update: .state.config_update}' +``` + +--- + +## Train + deploy as a serverless job + +Training notebooks run a few minutes (Optuna + UC register; endpoint warmup adds 5–15 min if you also deploy). Submit as a serverless one-time run so the CLI doesn't block. The notebook ends with `dbutils.notebook.exit(json.dumps({...}))` so the structured result (`model_version`, `val_auc`, `endpoint_name`) reaches `.notebook_output.result`. + +```bash +# 1. Upload the training notebook +databricks workspace import /Workspace/Users/me@example.com/turbine_project/train \ + --file ./train_notebook.py --format SOURCE --language PYTHON --overwrite + +# 2. Submit as serverless one-time run (returns {"run_id": N} immediately with --no-wait) +RUN_ID=$(databricks jobs submit --no-wait --json '{ + "run_name": "turbine-train-and-deploy", + "tasks": [{ + "task_key": "train", + "notebook_task": {"notebook_path": "/Workspace/Users/me@example.com/turbine_project/train"}, + "environment_key": "ml_env" + }], + "environments": [{ + "environment_key": "ml_env", + "spec": { + "client": "4", + "dependencies": ["mlflow==2.22.0", "xgboost==2.1.3", "optuna==4.1.0", "scikit-learn==1.5.2"] + } + }] +}' | jq -r .run_id) + +# 3. Poll until a terminal life_cycle_state. +for _ in $(seq 60); do + STATE=$(databricks jobs get-run "$RUN_ID" | jq -r '.state.life_cycle_state // "UNKNOWN"') + echo "$(date +%H:%M:%S) $STATE" + [[ "$STATE" =~ ^(TERMINATED|SKIPPED|INTERNAL_ERROR)$ ]] && break + sleep 30 +done +[[ "$STATE" =~ ^(TERMINATED|SKIPPED|INTERNAL_ERROR)$ ]] || { databricks jobs cancel-run "$RUN_ID"; exit 1; } + +# life_cycle_state TERMINATED only means "the run ended" — check result_state +# (SUCCESS / FAILED / TIMEDOUT / CANCELED / SUCCESS_WITH_FAILURES / …) for outcome. +RESULT=$(databricks jobs get-run "$RUN_ID" | jq -r '.state.result_state // "UNKNOWN"') +echo "result_state=$RESULT" +[[ "$RESULT" == "SUCCESS" ]] || { echo "Run did not succeed"; exit 1; } + +# 4. Pull structured output via the TASK run_id (NOT the submit run_id). +TASK_RUN_ID=$(databricks jobs get-run "$RUN_ID" | jq -r '.tasks[0].run_id') +databricks jobs get-run-output "$TASK_RUN_ID" | jq '.notebook_output.result' +# → '{"model_version":"3","val_auc":0.91,"rows_scored":124,"endpoint":"turbine-risk-endpoint"}' +``` + +**Serving UI hides SP-owned endpoints by default.** If the deploy ran as a service principal, the Serving page won't show the new endpoint until you switch from "Owned by me" to "All". Or just `databricks serving-endpoints list`. + +For the four `jobs submit` traps (`spec.client: "4"` requirement, TASK-vs-submit run_id, `print()` unreliable, tags rejected) and full debugging flow, see **[databricks-jobs](../../databricks-jobs/SKILL.md#one-time-runs-jobs-submit--async-pattern-for-notebooks)**. + +--- + +## Custom pyfunc + +When sklearn/XGBoost autolog isn't enough — custom preprocessing, multiple sub-models, external API calls, ensemble logic. See **[custom-pyfunc.md](custom-pyfunc.md)** for a full worked example. Two non-obvious things: + +- **`python_model="path/to/file.py"`** (file path, not class instance) + `mlflow.models.set_model(MyModel())` at the end of that file. This is the "Models from Code" pattern — the file is logged verbatim, no pickling of the class. +- **`mlflow.models.predict(model_uri=..., input_data=..., env_manager="uv")`** before deploying. Catches missing deps before the endpoint does. + +--- + +## Foundation Model API endpoints + +Pay-per-token, pre-provisioned in every workspace. New models land regularly and a static skill list goes stale fast — **always list at runtime instead of hard-coding names**. Filter by the `databricks-` name prefix AND by the served entity being in `system.ai.*` (other endpoints like `databricks-app-template-serving` share the prefix but aren't FM API endpoints). + +```bash +# Foundation Model API endpoints in this workspace, grouped by task (chat / embeddings / etc.) +databricks serving-endpoints list \ + | jq -r '.[] + | select(.name | startswith("databricks-")) + | select((.config.served_entities[0].entity_name // "") | startswith("system.ai.")) + | "\(.task)\t\(.name)"' \ + | sort +``` + +**Defaults when the user doesn't specify**: pick the highest-numbered Claude Sonnet for agents, the highest-numbered `-codex-max` for code, `databricks-gte-large-en` for embeddings — resolve actual names from the live list above. + +--- + +## Gotchas (the ones that cost time) + +| Trap | Fix | +|---|---| +| Model lands in workspace registry, not UC | `mlflow.set_registry_uri("databricks-uc")` *before* logging | +| Endpoint returns PERMISSION_DENIED at first query | Pass `resources=[...]` to `log_model` (covers UC functions, VS indexes, other endpoints, Lakebase) — see [genai-agents.md](genai-agents.md#resources-that-need-passthrough-auth) for the full list | +| Used `transition_model_version_stage` | Stages are deprecated in UC. Use `client.set_registered_model_alias(name, "prod", version)` | +| `spark_udf` rebuilds a virtualenv on every call | Pass `env_manager="local"` when training+scoring share a runtime | +| Endpoint version swap says "ready" but old version still serving | Poll **both** `state.ready` AND `state.config_update` — see "Readiness has TWO state fields" | +| `pip_requirements` mismatch crashes endpoint at load | Pin exact versions; or pull live with `f"mlflow=={get_distribution('mlflow').version}"` | +| `agents.deploy()` produced a weirdly-named endpoint | Pass `endpoint_name=...` explicitly. Auto-derived name is `agents_--` | +| Endpoint missing from Serving UI | UI filter defaults to "Owned by me"; deploy jobs run as SP. Switch to "All" or use `serving-endpoints list` | + +--- + +## Reference files + +| File | Contents | +|---|---| +| [custom-pyfunc.md](custom-pyfunc.md) | Single end-to-end custom pyfunc example: artifacts, signature, code_paths, log → register → deploy → query. | +| [genai-agents.md](genai-agents.md) | Edge case: deploying a LangGraph `ResponsesAgent` with UC Function + Vector Search tools. For supervised multi-agent tiles, use **databricks-agent-bricks** instead. | + +## Related skills + +- **`databricks-agent-bricks`** skill — no-code Knowledge Assistants and Supervisor Agents. Prefer this over hand-rolling agents. +- **`databricks-mlflow-evaluation`** skill — evaluate model/agent quality before promoting `@prod`. +- **`databricks-vector-search`** skill — vector indexes used as retrieval tools in agents. +- **`databricks-jobs`** skill — async deploy pattern (`--no-wait`, TASK run_id trap). +- **`databricks-unity-catalog`** skill — UC governs the registered model: permissions, lineage, audit.