diff --git a/.claude-plugin/marketplace.json b/.claude-plugin/marketplace.json index 47d2dc2b..82430908 100644 --- a/.claude-plugin/marketplace.json +++ b/.claude-plugin/marketplace.json @@ -10,7 +10,7 @@ "plugins": [ { "name": "kbagent", - "version": "0.66.0", + "version": "0.67.0", "source": "./plugins/kbagent", "description": "AI-friendly interface to Keboola Connection projects — explore configs, jobs, lineage, call MCP tools, manage dev branches, and debug SQL in workspaces", "category": "development" diff --git a/plugins/kbagent/.claude-plugin/plugin.json b/plugins/kbagent/.claude-plugin/plugin.json index 45549029..dc49a64d 100644 --- a/plugins/kbagent/.claude-plugin/plugin.json +++ b/plugins/kbagent/.claude-plugin/plugin.json @@ -1,6 +1,6 @@ { "name": "kbagent", - "version": "0.66.0", + "version": "0.67.0", "description": "AI-friendly interface to Keboola Connection projects — explore configs, jobs, lineage, call MCP tools, manage dev branches, and debug SQL in workspaces", "author": { "name": "Keboola", diff --git a/pyproject.toml b/pyproject.toml index 5117765e..7833e25f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "keboola-cli" -version = "0.66.0" +version = "0.67.0" description = "AI-friendly CLI for managing Keboola projects" readme = "README.md" requires-python = ">=3.12" diff --git a/src/keboola_agent_cli/changelog.py b/src/keboola_agent_cli/changelog.py index ce778498..6b4f1e62 100644 --- a/src/keboola_agent_cli/changelog.py +++ b/src/keboola_agent_cli/changelog.py @@ -24,6 +24,20 @@ # Ordered newest-first. Each value is a list of brief one-line descriptions. CHANGELOG: dict[str, list[str]] = { + "0.67.0": [ + "New (web UI): the `kbagent serve --ui` table detail now has a **Repartition** tab " + "for BigQuery tables. Pick a time or integer-range partitioning layout plus optional " + "clustering fields, and the UI copies the table into the new layout (`create-table " + "--source-table-id`) and atomically swaps it into place (`swap-tables`) -- the same " + "supported repartition flow as the CLI. After the swap it offers to delete the leftover " + "old table. Runs in the branch selected in the top bar; with no dev branch selected it " + "repartitions production behind an explicit confirm.", + "The `serve` create-table endpoint (`POST /storage/tables/{project}`) now forwards the " + "source-copy and BigQuery partition/clustering fields (`source_table_id`, " + "`time_partitioning_*`, `range_partitioning_*`, `clustering_fields`) and makes `columns` " + "optional, matching the CLI. Table detail responses now include the owning bucket's " + "`backend` so the UI can gate BigQuery-only features.", + ], "0.66.0": [ "New: `storage create-table` can copy from an existing table and apply a " "BigQuery partition/clustering layout. `--source-table-id` (with optional " diff --git a/src/keboola_agent_cli/server/routers/storage.py b/src/keboola_agent_cli/server/routers/storage.py index f4c5b215..9c2618e9 100644 --- a/src/keboola_agent_cli/server/routers/storage.py +++ b/src/keboola_agent_cli/server/routers/storage.py @@ -8,7 +8,7 @@ from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile from fastapi.responses import FileResponse -from pydantic import BaseModel +from pydantic import BaseModel, model_validator from ..dependencies import ServiceRegistry, get_registry @@ -26,12 +26,43 @@ class CreateBucket(BaseModel): class CreateTable(BaseModel): bucket_id: str name: str - columns: list[str] + # Optional: exactly one of `columns` / `source_table_id` is required, mirroring + # the CLI. Source mode derives the schema from an existing (BigQuery) table. + columns: list[str] | None = None primary_key: list[str] | None = None not_null_columns: list[str] | None = None defaults: list[str] | None = None branch_id: int | None = None if_not_exists: bool = False + # Source-copy + BigQuery partition/clustering layout. BigQuery-only; the + # service applies a pre-flight backend guard. Shapes mirror + # `kbagent storage create-table` (see services.storage_service.create_table). + source_table_id: str | None = None + source_branch_id: int | None = None + time_partitioning_type: str | None = None + time_partitioning_field: str | None = None + time_partitioning_expiration_ms: str | None = None + range_partitioning_field: str | None = None + range_partitioning_start: str | None = None + range_partitioning_end: str | None = None + range_partitioning_interval: str | None = None + clustering_fields: list[str] | None = None + + @model_validator(mode="after") + def _columns_xor_source(self) -> CreateTable: + """Enforce the columns/source XOR at the request boundary so a bad body + returns a clean 422 instead of bubbling up as a 500 from the service. + + This mirrors the first two checks in ``StorageService.create_table``; + the service stays the source of truth for the full ruleset (not-null / + default placement, partitioning completeness, the BigQuery backend + guard) and for the CLI path, so a small overlap here is deliberate. + """ + if self.columns and self.source_table_id: + raise ValueError("columns and source_table_id are mutually exclusive") + if not self.columns and not self.source_table_id: + raise ValueError("one of columns or source_table_id is required") + return self class DescribeBucket(BaseModel): @@ -267,6 +298,16 @@ def create_table( not_null_columns=body.not_null_columns, defaults=body.defaults, if_not_exists=body.if_not_exists, + source_table_id=body.source_table_id, + source_branch_id=body.source_branch_id, + time_partitioning_type=body.time_partitioning_type, + time_partitioning_field=body.time_partitioning_field, + time_partitioning_expiration_ms=body.time_partitioning_expiration_ms, + range_partitioning_field=body.range_partitioning_field, + range_partitioning_start=body.range_partitioning_start, + range_partitioning_end=body.range_partitioning_end, + range_partitioning_interval=body.range_partitioning_interval, + clustering_fields=body.clustering_fields, ) diff --git a/src/keboola_agent_cli/services/storage_service.py b/src/keboola_agent_cli/services/storage_service.py index aa8a6357..dbf44514 100644 --- a/src/keboola_agent_cli/services/storage_service.py +++ b/src/keboola_agent_cli/services/storage_service.py @@ -696,6 +696,9 @@ def get_table_detail( "name": table.get("name", ""), "display_name": table.get("displayName", ""), "bucket_id": table.get("bucket", {}).get("id", ""), + # Storage backend of the owning bucket (e.g. "snowflake", "bigquery"). + # The web UI keys BigQuery-only features (repartition) off this. + "backend": table.get("bucket", {}).get("backend", ""), "description": description, "columns": columns, "column_details": column_details, diff --git a/tests/test_server_router_calls.py b/tests/test_server_router_calls.py index 31c44745..81605f14 100644 --- a/tests/test_server_router_calls.py +++ b/tests/test_server_router_calls.py @@ -227,6 +227,103 @@ def test_storage_describe_columns_passes_columns_kwarg(tmp_path: Path) -> None: ) +# --------------------------------------------------------------------------- +# storage.py POST /{p} (create table) +# Service: storage.create_table(source_table_id=..., time_partitioning_*=..., +# clustering_fields=...) -- the source-copy + BigQuery layout params +# must be forwarded so the web "repartition" flow reaches the service. +# --------------------------------------------------------------------------- + + +def test_storage_create_table_forwards_source_and_partition_kwargs(tmp_path: Path) -> None: + """Router must forward the source-copy + BigQuery partition/clustering body + fields to StorageService.create_table (the web repartition flow).""" + storage_svc = MagicMock() + storage_svc.create_table.return_value = {"table_id": "in.c-main.events_repart"} + registry = _mock_registry(storage=storage_svc) + app = _make_app_with_registry(tmp_path, registry) + + body = { + "bucket_id": "in.c-main", + "name": "events_repart", + "source_table_id": "in.c-main.events", + "branch_id": 123, + "time_partitioning_type": "DAY", + "time_partitioning_field": "created_at", + "clustering_fields": ["tenant_id"], + "primary_key": ["id"], + } + with TestClient(app) as client: + res = client.post(f"/storage/tables/{PROJECT}", headers=AUTH, json=body) + + assert res.status_code == 200, res.text + kwargs = storage_svc.create_table.call_args.kwargs + assert kwargs["source_table_id"] == "in.c-main.events" + assert kwargs["time_partitioning_type"] == "DAY" + assert kwargs["time_partitioning_field"] == "created_at" + assert kwargs["clustering_fields"] == ["tenant_id"] + # columns is optional now (source mode); router passes None, not a crash. + assert kwargs["columns"] is None + + +def test_storage_create_table_columns_optional(tmp_path: Path) -> None: + """`columns` is no longer required by the request model -- a source-only + body must validate (HTTP 200), not 422.""" + storage_svc = MagicMock() + storage_svc.create_table.return_value = {"table_id": "in.c-main.t"} + registry = _mock_registry(storage=storage_svc) + app = _make_app_with_registry(tmp_path, registry) + + with TestClient(app) as client: + res = client.post( + f"/storage/tables/{PROJECT}", + headers=AUTH, + json={"bucket_id": "in.c-main", "name": "t", "source_table_id": "in.c-main.src"}, + ) + + assert res.status_code == 200, res.text + + +def test_storage_create_table_columns_and_source_is_422(tmp_path: Path) -> None: + """Both columns and source_table_id given -> clean 422 at the request + boundary (not a 500 from the service).""" + storage_svc = MagicMock() + registry = _mock_registry(storage=storage_svc) + app = _make_app_with_registry(tmp_path, registry) + + with TestClient(app) as client: + res = client.post( + f"/storage/tables/{PROJECT}", + headers=AUTH, + json={ + "bucket_id": "in.c-main", + "name": "t", + "columns": ["id:INTEGER"], + "source_table_id": "in.c-main.src", + }, + ) + + assert res.status_code == 422, res.text + storage_svc.create_table.assert_not_called() + + +def test_storage_create_table_neither_columns_nor_source_is_422(tmp_path: Path) -> None: + """Neither columns nor source_table_id -> clean 422, service untouched.""" + storage_svc = MagicMock() + registry = _mock_registry(storage=storage_svc) + app = _make_app_with_registry(tmp_path, registry) + + with TestClient(app) as client: + res = client.post( + f"/storage/tables/{PROJECT}", + headers=AUTH, + json={"bucket_id": "in.c-main", "name": "t"}, + ) + + assert res.status_code == 422, res.text + storage_svc.create_table.assert_not_called() + + # --------------------------------------------------------------------------- # storage.py GET /{p}/{fid}/file-download # Service: storage.download_file(output_path=...) (was output_dir= in broken version) diff --git a/tests/test_storage_describe_service.py b/tests/test_storage_describe_service.py index 8098546c..2f30e92c 100644 --- a/tests/test_storage_describe_service.py +++ b/tests/test_storage_describe_service.py @@ -747,3 +747,45 @@ def test_null_numeric_fields_coerced_to_zero(self, tmp_path: Path) -> None: assert result["rows_count"] == 0 assert result["data_size_bytes"] == 0 + + def test_backend_surfaced_from_bucket(self, tmp_path: Path) -> None: + """The owning bucket's backend is exposed so the web UI can gate + BigQuery-only features (repartition) on it.""" + store = _make_store(tmp_path) + mock_client = MagicMock() + mock_client.get_table_detail.return_value = { + "id": "in.c-sales.orders", + "name": "orders", + "displayName": "orders", + "bucket": {"id": "in.c-sales", "backend": "bigquery"}, + "columns": ["a"], + "primaryKey": [], + "columnMetadata": {}, + "metadata": [], + } + service = _make_service(store, mock_client) + + result = service.get_table_detail(alias="prod", table_id="in.c-sales.orders") + + assert result["backend"] == "bigquery" + + def test_backend_defaults_to_empty_when_absent(self, tmp_path: Path) -> None: + """A bucket object without a backend key yields an empty string, not a + KeyError -- the UI simply hides the BigQuery-only tab.""" + store = _make_store(tmp_path) + mock_client = MagicMock() + mock_client.get_table_detail.return_value = { + "id": "in.c-sales.orders", + "name": "orders", + "displayName": "orders", + "bucket": {"id": "in.c-sales"}, + "columns": ["a"], + "primaryKey": [], + "columnMetadata": {}, + "metadata": [], + } + service = _make_service(store, mock_client) + + result = service.get_table_detail(alias="prod", table_id="in.c-sales.orders") + + assert result["backend"] == "" diff --git a/uv.lock b/uv.lock index 9e5ad008..5b88c5f0 100644 --- a/uv.lock +++ b/uv.lock @@ -590,7 +590,7 @@ wheels = [ [[package]] name = "keboola-cli" -version = "0.66.0" +version = "0.67.0" source = { editable = "." } dependencies = [ { name = "croniter" }, diff --git a/web/frontend/src/pages/Storage.tsx b/web/frontend/src/pages/Storage.tsx index d480da80..81007411 100644 --- a/web/frontend/src/pages/Storage.tsx +++ b/web/frontend/src/pages/Storage.tsx @@ -1,13 +1,13 @@ -import { useQuery } from "@tanstack/react-query"; -import { Download, Eye, Info } from "lucide-react"; +import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; +import { AlertTriangle, Check, Download, Eye, Info, Layers, Loader2, Trash2 } from "lucide-react"; import { useState } from "react"; -import { api } from "../api/client"; +import { api, ApiError } from "../api/client"; import { Drawer } from "../components/Drawer"; import { Empty, ErrorBox, Loading, PageTitle } from "../components/Empty"; import { JsonView } from "../components/JsonView"; import { DataTable } from "../components/Table"; import { useUIState } from "../state"; -import type { Bucket, ProjectError, Table as TableT } from "../types"; +import type { Branch, Bucket, ProjectError, Table as TableT } from "../types"; interface TablePreview { header: string[]; @@ -20,6 +20,7 @@ interface TableDetail { table_id: string; name: string; bucket_id: string; + backend: string; description: string; columns: string[]; column_details: Array<{ @@ -181,7 +182,7 @@ function TableDetailDrawer({ onClose: () => void; }) { const { branchId } = useUIState(); - const [tab, setTab] = useState<"info" | "schema" | "preview" | "raw">("info"); + const [tab, setTab] = useState<"info" | "schema" | "preview" | "raw" | "repartition">("info"); const detailQ = useQuery({ queryKey: ["table-detail", table?.project_alias, table?.id, branchId], @@ -234,7 +235,7 @@ function TableDetailDrawer({ >
- {(["info", "schema", "preview", "raw"] as const).map((t) => ( + {tabsFor(detailQ.data).map((t) => ( @@ -270,11 +273,489 @@ function TableDetailDrawer({ ) : null ) : null} {detailQ.data && tab === "raw" ? : null} + {detailQ.data && tab === "repartition" ? ( + + ) : null}
); } +const REPARTITION_TAB = "repartition" as const; +type DrawerTab = "info" | "schema" | "preview" | "raw" | typeof REPARTITION_TAB; + +// The repartition tab is BigQuery-only: partition/clustering layout changes are +// a BigQuery feature, and the server enforces this with a pre-flight backend +// guard anyway. Hide the tab elsewhere so the UI never offers an action that +// the backend will reject. +function tabsFor(d: TableDetail | undefined): DrawerTab[] { + const base: DrawerTab[] = ["info", "schema", "preview", "raw"]; + if (d && d.backend.toLowerCase() === "bigquery" && !d.is_alias) { + base.push(REPARTITION_TAB); + } + return base; +} + +const repartInputCls = + "w-full bg-transparent border border-zinc-200 dark:border-zinc-800 rounded px-2 py-1 text-xs text-zinc-700 dark:text-zinc-300 focus:outline-none focus:border-keboola disabled:opacity-50 disabled:cursor-not-allowed"; + +// BigQuery allows at most 4 clustering fields. We cap the picker so the server +// never has to reject an over-long list with a less obvious error. +const MAX_CLUSTERING_FIELDS = 4; + +// Repartition a (BigQuery) table into a new partition/clustering layout. +// +// There is no in-place "ALTER TABLE ... PARTITION BY" on a populated table; the +// supported path is copy-into-new-layout then atomic swap: +// 1. create-table --source-table-id --time/range-partitioning ... --clustering +// -> a sibling table (_repartition) with the desired layout + copied rows +// 2. swap-tables -> the original id now exposes the new layout +// +// swap-tables is branch-scoped; the active branch from the top bar is used. +// Production (the default branch) is allowed and is the only branch whose swap +// reaches the live table -- a dev-branch swap never merges back -- so we run in +// production by default and gate it behind an explicit confirm. +// +// After the swap the OLD data/layout lives under the sibling id; we surface it +// and let the user delete it (per their choice) rather than dropping it silently. +function RepartitionTab({ d, onClose }: { d: TableDetail; onClose: () => void }) { + const { project, branchId } = useUIState(); + const qc = useQueryClient(); + + // swap-tables needs a concrete branch id. When no dev branch is pinned in the + // top bar (branchId === null => production), resolve the project's default + // branch id from /branches. + const branchesQ = useQuery<{ branches: Branch[] }>({ + queryKey: ["branches", project], + queryFn: () => api.get("/branches", { query: { project: project! } }), + enabled: !!project && branchId === null, + }); + const defaultBranch = branchesQ.data?.branches.find((b) => b.isDefault) ?? null; + const effectiveBranchId = branchId ?? defaultBranch?.id ?? null; + const isProduction = branchId === null; + + const [mode, setMode] = useState<"time" | "range" | "none">("time"); + const [timeType, setTimeType] = useState("DAY"); + const [timeField, setTimeField] = useState(""); + const [timeExpirationMs, setTimeExpirationMs] = useState(""); + const [rangeField, setRangeField] = useState(""); + const [rangeStart, setRangeStart] = useState(""); + const [rangeEnd, setRangeEnd] = useState(""); + const [rangeInterval, setRangeInterval] = useState(""); + const [clustering, setClustering] = useState([]); + const tempName = `${d.name}_repartition`; + const tempTableId = `${d.bucket_id}.${tempName}`; + + const [confirmOpen, setConfirmOpen] = useState(false); + const [phase, setPhase] = useState<"idle" | "creating" | "swapping" | "done">("idle"); + const [error, setError] = useState(null); + // True once the copy exists but the swap failed: the sibling table is left + // behind, so we offer an explicit cleanup. (The copy itself is idempotent via + // if_not_exists, so plain "Repartition" also safely retries just the swap.) + const [swapFailed, setSwapFailed] = useState(false); + + const rangeComplete = !!(rangeField && rangeStart && rangeEnd && rangeInterval); + // "none" => no partitioning (de-partition; clustering optional) and is always + // valid on its own. Time needs a type; range needs all four bounds. + const layoutValid = mode === "none" ? true : mode === "time" ? !!timeType : rangeComplete; + const branchReady = effectiveBranchId !== null; + const canSubmit = !!project && branchReady && layoutValid && phase === "idle"; + + const toggleCluster = (col: string) => + setClustering((cur) => (cur.includes(col) ? cur.filter((c) => c !== col) : [...cur, col])); + + const run = useMutation({ + mutationFn: async () => { + setError(null); + setSwapFailed(false); + // 1) Create the new-layout copy from the source (original) table. + // if_not_exists makes this idempotent: if a previous attempt already + // created the copy (e.g. the swap then failed), the retry skips the + // create and proceeds straight to the swap instead of erroring with + // "table already exists". + setPhase("creating"); + const createBody: Record = { + bucket_id: d.bucket_id, + name: tempName, + source_table_id: d.table_id, + branch_id: effectiveBranchId, + if_not_exists: true, + primary_key: d.primary_key.length ? d.primary_key : undefined, + clustering_fields: clustering.length ? clustering : undefined, + }; + if (mode === "time") { + createBody.time_partitioning_type = timeType; + if (timeField) createBody.time_partitioning_field = timeField; + if (timeExpirationMs) createBody.time_partitioning_expiration_ms = timeExpirationMs; + } else if (mode === "range") { + createBody.range_partitioning_field = rangeField; + createBody.range_partitioning_start = rangeStart; + createBody.range_partitioning_end = rangeEnd; + createBody.range_partitioning_interval = rangeInterval; + } + // mode === "none": no partitioning fields -- the copy is unpartitioned + // (clustering, if any, still applies). + await api.post(`/storage/tables/${encodeURIComponent(project!)}`, createBody); + + // 2) Swap the new-layout copy into the original's place. If this fails the + // copy is left behind, so flag it for the cleanup affordance. + setPhase("swapping"); + try { + await api.post( + `/storage/tables/${encodeURIComponent(project!)}/${encodeURIComponent(d.table_id)}/swap`, + { target_table_id: tempTableId, branch_id: effectiveBranchId }, + ); + } catch (e) { + setSwapFailed(true); + throw e; + } + setPhase("done"); + }, + onError: (e) => { + setError(e instanceof ApiError ? e.message : String(e)); + setPhase("idle"); + }, + onSuccess: () => { + qc.invalidateQueries({ queryKey: ["tables"] }); + qc.invalidateQueries({ queryKey: ["table-detail"] }); + }, + }); + + // After the swap the sibling id holds the OLD data/layout. Deleting it is the + // user's call (they may want to verify the new table first). + const del = useMutation({ + mutationFn: () => + api.delete(`/storage/tables/${encodeURIComponent(project!)}`, { + query: { + table_id: tempTableId, + branch_id: effectiveBranchId ?? undefined, + force: true, + }, + }), + onSuccess: () => { + qc.invalidateQueries({ queryKey: ["tables"] }); + onClose(); + }, + }); + + // Cleanup after a failed swap: remove the leftover copy and return to the + // form so the user can adjust and try again (does not close the drawer). + const cleanup = useMutation({ + mutationFn: () => + api.delete(`/storage/tables/${encodeURIComponent(project!)}`, { + query: { + table_id: tempTableId, + branch_id: effectiveBranchId ?? undefined, + force: true, + }, + }), + onSuccess: () => { + qc.invalidateQueries({ queryKey: ["tables"] }); + setSwapFailed(false); + setError(null); + }, + }); + + const submit = () => { + if (isProduction && !confirmOpen) { + setConfirmOpen(true); + return; + } + setConfirmOpen(false); + run.mutate(); + }; + + if (phase === "done") { + return ( +
+
+ +
+
Repartition complete.
+
+ {d.table_id} now uses the new layout. + The previous data & layout are preserved under{" "} + {tempTableId}. +
+
+
+ {del.isError ? : null} +
+
+ Delete the old table? +
+
+ + +
+
+
+ ); + } + + const busy = phase === "creating" || phase === "swapping"; + + return ( +
+
+ Copy {d.table_id} into a new + partition/clustering layout, then atomically swap it into place. +
+ + {isProduction ? ( +
+ + + No dev branch is selected — this runs against{" "} + production{defaultBranch ? ` (branch #${defaultBranch.id})` : ""} and + changes the live table. A swap in a dev branch never merges back, so production is the + only branch that actually repartitions the real table. + +
+ ) : ( +
+ Runs in branch #{effectiveBranchId} +
+ )} + + {/* Partitioning mode */} +
+
Partitioning
+
+ {(["time", "range", "none"] as const).map((m) => ( + + ))} +
+ + {mode === "none" ? ( +
+ No partitioning — the table is copied unpartitioned (clustering below still + applies). Use this to remove an existing partition layout. +
+ ) : mode === "time" ? ( +
+ + + +
+ ) : ( +
+ + + + +
+ )} +
+ + {/* Clustering */} +
+
+ Clustering fields (optional, ordered by selection, max {MAX_CLUSTERING_FIELDS}) +
+
+ {d.columns.map((c) => { + const idx = clustering.indexOf(c); + const on = idx !== -1; + const atLimit = !on && clustering.length >= MAX_CLUSTERING_FIELDS; + return ( + + ); + })} +
+
+ + {error ? : null} + + {swapFailed ? ( +
+ +
+
+ The copy {tempTableId} was created but the swap + failed. Click Repartition to retry just the swap, or delete the + leftover copy. +
+ +
+
+ ) : null} + + {confirmOpen ? ( +
+ +
+
+ This swaps the production table{" "} + {d.table_id} into the new layout. Continue? +
+
+ + +
+
+
+ ) : null} + +
+ + + {busy + ? phase === "creating" + ? `Creating ${tempName} (copying rows)...` + : "Swapping into place..." + : branchReady + ? `Creates ${tempName}, then swaps it into ${d.name}.` + : "Resolving branch..."} + +
+
+ ); +} + function InfoTab({ d }: { d: TableDetail }) { return (