From 19521e048b41362b0f56d97dbf706c34721d18a6 Mon Sep 17 00:00:00 2001 From: yustme Date: Mon, 29 Jun 2026 12:54:27 +0200 Subject: [PATCH 1/2] feat(storage): create-table from a source table + BigQuery partition/clustering Extend `storage create-table` to mirror keboola/connection#7697: - `--source-table-id` (+ optional `--source-branch-id`): copy an existing table's data into the requested partition/clustering layout instead of building from `--column`. The schema is derived from the source, so `--column`/`--not-null`/`--default` are forbidden. This is the supported way to repartition a populated BigQuery table; pair with `swap-tables`. - `--column` is now optional and mutually exclusive with `--source-table-id`. - New BigQuery layout flags (also usable on a plain columns create): `--time-partitioning-type`/`-field`/`-expiration-ms`, `--range-partitioning-field`/`-start`/`-end`/`-interval` (bounds are strings), `--clustering-field`. Time vs range partitioning are mutually exclusive. - BigQuery-only with a one-call pre-flight guard: when any source/layout flag is used, the project backend is verified first and a non-BigQuery project fails fast (exit 2) before the create. Plain `--column` creates are unaffected. Connection 422 codes remain as a server-side backstop. Client builds the tables-definition body conditionally (source XOR columns, optional layout). Adds unit tests (client/service/CLI), a backend-aware E2E step, and full agent doc-sync. Bumps to 0.66.0. --- .claude-plugin/marketplace.json | 2 +- CLAUDE.md | 3 +- plugins/kbagent/.claude-plugin/plugin.json | 2 +- plugins/kbagent/agents/keboola-expert.md | 1 + plugins/kbagent/skills/kbagent/SKILL.md | 2 +- .../kbagent/references/commands-reference.md | 2 +- .../skills/kbagent/references/gotchas.md | 27 ++ .../references/storage-types-workflow.md | 41 ++ pyproject.toml | 2 +- src/keboola_agent_cli/changelog.py | 16 + src/keboola_agent_cli/client.py | 34 +- src/keboola_agent_cli/commands/context.py | 20 +- src/keboola_agent_cli/commands/storage.py | 96 +++- .../services/storage_service.py | 235 +++++++++- tests/test_e2e.py | 84 ++++ tests/test_storage_create_table.py | 422 ++++++++++++++++++ tests/test_storage_write.py | 22 + uv.lock | 4 +- 18 files changed, 987 insertions(+), 28 deletions(-) create mode 100644 tests/test_storage_create_table.py diff --git a/.claude-plugin/marketplace.json b/.claude-plugin/marketplace.json index e1275b39..47d2dc2b 100644 --- a/.claude-plugin/marketplace.json +++ b/.claude-plugin/marketplace.json @@ -10,7 +10,7 @@ "plugins": [ { "name": "kbagent", - "version": "0.65.1", + "version": "0.66.0", "source": "./plugins/kbagent", "description": "AI-friendly interface to Keboola Connection projects — explore configs, jobs, lineage, call MCP tools, manage dev branches, and debug SQL in workspaces", "category": "development" diff --git a/CLAUDE.md b/CLAUDE.md index 72d488ef..8efbe0f5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -328,7 +328,8 @@ kbagent storage bucket-detail --project NAME --bucket-id ID [--branch ID] kbagent storage tables [--project NAME ...] [--bucket-id ID] [--branch ID] kbagent storage table-detail --project NAME --table-id ID [--branch ID] kbagent storage create-bucket --project NAME --stage STAGE --name NAME [--description D] [--backend B] [--branch ID] -kbagent storage create-table --project NAME --bucket-id ID --name NAME --column COL:TYPE[(length)] [...] [--primary-key COL] [--not-null COL ...] [--default NAME=VALUE ...] [--branch ID] [--if-not-exists] +kbagent storage create-table --project NAME --bucket-id ID --name NAME [--column COL:TYPE[(length)] ...] [--primary-key COL] [--not-null COL ...] [--default NAME=VALUE ...] [--source-table-id ID] [--source-branch-id N] [--time-partitioning-type DAY|HOUR|MONTH|YEAR] [--time-partitioning-field COL] [--time-partitioning-expiration-ms MS] [--range-partitioning-field COL --range-partitioning-start S --range-partitioning-end E --range-partitioning-interval I] [--clustering-field COL ...] [--branch ID] [--if-not-exists] +# --column XOR --source-table-id (0.66.0+, BigQuery only): --source-table-id copies an existing table's data into the requested partition/clustering layout (schema derived from source) -> swap into place with swap-tables. Partition/clustering flags work in both modes (BigQuery only); time vs range partitioning are mutually exclusive. A non-BigQuery project fails fast (pre-flight backend check). kbagent storage upload-table --project NAME --table-id ID --file PATH [--incremental] [--branch ID] kbagent storage download-table --project NAME --table-id ID [--output FILE] [--columns COL ...] [--limit N] [--where-column COL --where-value VAL ... [--where-operator eq|neq]] [--changed-since WHEN] [--changed-until WHEN] [--branch ID] kbagent storage add-column --project NAME --table-id ID --column COL:TYPE[(length)] [--not-null] [--default VALUE] [--branch ID] diff --git a/plugins/kbagent/.claude-plugin/plugin.json b/plugins/kbagent/.claude-plugin/plugin.json index 6d7eb53b..45549029 100644 --- a/plugins/kbagent/.claude-plugin/plugin.json +++ b/plugins/kbagent/.claude-plugin/plugin.json @@ -1,6 +1,6 @@ { "name": "kbagent", - "version": "0.65.1", + "version": "0.66.0", "description": "AI-friendly interface to Keboola Connection projects — explore configs, jobs, lineage, call MCP tools, manage dev branches, and debug SQL in workspaces", "author": { "name": "Keboola", diff --git a/plugins/kbagent/agents/keboola-expert.md b/plugins/kbagent/agents/keboola-expert.md index db7903ad..31467d09 100644 --- a/plugins/kbagent/agents/keboola-expert.md +++ b/plugins/kbagent/agents/keboola-expert.md @@ -100,6 +100,7 @@ a critical failure. | Create typed table with native types | `kbagent storage create-table --column pk:VARCHAR(40) --column amount:NUMBER(18,2) --not-null pk --default amount=0` (0.25.0+) | `tool call create_table` (accepts the same `definition.length` shape via MCP) | re-creating via raw REST to `/v2/storage/...tables-definition` | | Add one column to an existing table | `kbagent storage add-column --project P --table-id in.c-foo.data --column status:VARCHAR(20) [--not-null] [--default active]` (0.62.0+) -- synchronous Storage endpoint, same `name:TYPE(length)` grammar as `create-table`; the add-side mirror of `delete-column` | -- | re-creating the whole table just to add a field (loses data/PK/dependents); raw `POST /v2/storage/tables/.../columns` | | Promote typed rebuild back into the original name | `kbagent storage swap-tables --project P --table-id in.c-foo.data --target-table-id in.c-foo.data_change_log --branch --yes` (0.28.0+) -- async storage job (`tableSwap`); client polls to completion. Service refuses without a branch; any branch incl. prod | -- | renaming or deleting + re-uploading (loses history; downstream configs need to be rewritten) | +| Repartition / recluster a populated BigQuery table | `kbagent storage create-table --project P --bucket-id in.c-main --name events_repart --source-table-id in.c-main.events --time-partitioning-type DAY --time-partitioning-field created_at --clustering-field tenant_id --primary-key id` (0.66.0+, BigQuery only) to copy the data into the new layout, then `kbagent storage swap-tables --table-id in.c-main.events --target-table-id in.c-main.events_repart --branch --yes` to flip it into place. `--source-table-id` derives the schema from the source so `--column` is forbidden (mutually exclusive); a non-BigQuery project fails fast (pre-flight backend check, exit 2). Range partitioning instead: `--range-partitioning-field/-start/-end/-interval` (all four; bounds are strings; mutually exclusive with time partitioning) | -- | raw `POST /v2/storage/buckets/.../tables-definition` with a `source` object, then manual swap; or a `CREATE TABLE ... AS SELECT` in a workspace (drops NOT NULL + primary key) | | Re-seed a table without losing its schema / PK / dependents | `kbagent storage truncate-table --project P --table-id in.c-foo.data [--branch ID] [--dry-run] [--yes]` (0.32.0+) -- DELETE `/tables/{id}/rows?allowTruncate=1`; endpoint is uniformly async on every branch (returns a queued `tableRowsDelete` job; client polls via `_wait_for_storage_job`). Do NOT pass `async=true` -- the API rejects it. Batch via repeated `--table-id`. Returns `{truncated[], failed[], dry_run, project_alias}` with `truncated[]` entries carrying `{table_id, rows_before, rows_after, branch_id}`. Permission class: `destructive` | `tool call delete_table_rows` if the upstream MCP exposes it | drop + recreate the table (loses descriptions, PK, sharing edges, and breaks every downstream config reference); deleting rows via raw SQL in a workspace (bypasses the Storage API audit trail) | | Debug a failed job | `kbagent job detail --project P --job-id J --json` + `kbagent job run ... --log-tail-lines 200` | `kbagent workspace from-transformation` for SQL repro | "I think the issue is..." without reading logs | | Ad-hoc SQL / row-count / type audit | `kbagent workspace create` + `kbagent workspace load` + `kbagent workspace query --sql "..."` (0.59.0+: results come back inline+fast but **capped at `--limit`, default 500** -- check `statements[].truncated`/`total_rows`, use `COUNT(*)` for counts, `--full` for the complete set) | `kbagent workspace from-transformation` for existing transform debugging; `workspace list --qs-compatible` (0.42.0+, #304) for data-app reuse | trusting a default `SELECT *` as the full result (it is truncated at 500); querying Storage via raw Snowflake credentials outside the workspace abstraction | diff --git a/plugins/kbagent/skills/kbagent/SKILL.md b/plugins/kbagent/skills/kbagent/SKILL.md index 21fa2716..74ce0c79 100644 --- a/plugins/kbagent/skills/kbagent/SKILL.md +++ b/plugins/kbagent/skills/kbagent/SKILL.md @@ -198,7 +198,7 @@ When working inside a git repository or project directory, run `kbagent init` (o | List storage tables from one or more projects | `kbagent storage tables` | | Show detailed table info including columns and types | `kbagent storage table-detail --project PROJECT --table-id TABLE-ID` | | Create a new storage bucket | `kbagent storage create-bucket --project PROJECT --stage STAGE --name NAME` | -| Create a new storage table with typed columns | `kbagent storage create-table --project PROJECT --bucket-id BUCKET-ID --name NAME --column COLUMN` | +| Create a new storage table with typed columns | `kbagent storage create-table --project PROJECT --bucket-id BUCKET-ID --name NAME` | | Upload a CSV file into a storage table | `kbagent storage upload-table --project PROJECT --table-id TABLE-ID --file FILE` | | Export a storage table to a local CSV file | `kbagent storage download-table --project PROJECT --table-id TABLE-ID` | | Delete one or more storage tables | `kbagent storage delete-table --project PROJECT --table-id TABLE-ID` | diff --git a/plugins/kbagent/skills/kbagent/references/commands-reference.md b/plugins/kbagent/skills/kbagent/references/commands-reference.md index 9de6e8fe..1f1b7815 100644 --- a/plugins/kbagent/skills/kbagent/references/commands-reference.md +++ b/plugins/kbagent/skills/kbagent/references/commands-reference.md @@ -103,7 +103,7 @@ Requires a **super-admin** Manage API token (same kind as `org setup`). Same def - `storage tables [--project NAME ...] [--bucket-id ID] [--branch ID]` -- list tables across all connected projects in parallel (multi-project by default, same as `storage buckets`); repeat `--project` to target a subset; `--bucket-id` is applied independently per project (missing buckets become per-project errors); `--branch` requires exactly one `--project` - `storage table-detail --project NAME --table-id ID [--branch ID]` -- table detail with columns, types, primary key, row count (branch-aware) - `storage create-bucket --project NAME --stage STAGE --name NAME [--description D] [--backend B] [--branch ID]` -- create bucket (branch-aware). With `--branch ID` on a project lacking the `storage-branches` feature (legacy fake-branch), response carries `legacy_branch_storage: true` and human mode prints a warning -- the runner will create a parallel `out.c--*` bucket at job time. See `storage-types-workflow.md` -- `storage create-table --project NAME --bucket-id ID --name NAME --column col:TYPE[(length)] [...] [--primary-key COL] [--not-null COL ...] [--default NAME=VALUE ...] [--branch ID] [--if-not-exists]` -- create typed table. Base types `STRING/INTEGER/NUMERIC/FLOAT/BOOLEAN/DATE/TIMESTAMP` plus native backend types with length (`VARCHAR(40)`, `NUMBER(18,2)`, `TIMESTAMP_TZ`, `VARIANT`, etc.) -- type/length validation delegated to the Storage API. `--not-null` marks a column `nullable=false`; `--default NAME=VALUE` sets a DEFAULT expression (booleans must be lowercase `true`/`false`). In a dev branch, the target bucket is auto-materialized if it has not yet been written to there -- response surfaces this via `auto_created_bucket: bool`. On legacy fake-branch projects (no `storage-branches` feature), `legacy_branch_storage: true` flags that the runner will use a separate `out.c--*` bucket at job time. `--if-not-exists` (0.47.0+) turns a duplicate-display-name failure into `action: skipped` when the table really exists at the expected id (safe for parallel workers). Since 0.47.1 the skipped envelope reports the EXISTING table's actual `columns`/`primary_key`/`name`, mirrors the request under `requested_columns`/`requested_primary_key`, and sets `schema_drift: true` when they diverge. See `storage-types-workflow.md` +- `storage create-table --project NAME --bucket-id ID --name NAME [--column col:TYPE[(length)] ...] [--primary-key COL] [--not-null COL ...] [--default NAME=VALUE ...] [--source-table-id ID] [--source-branch-id N] [--time-partitioning-type DAY|HOUR|MONTH|YEAR] [--time-partitioning-field COL] [--time-partitioning-expiration-ms MS] [--range-partitioning-field COL --range-partitioning-start S --range-partitioning-end E --range-partitioning-interval I] [--clustering-field COL ...] [--branch ID] [--if-not-exists]` -- create typed table. Base types `STRING/INTEGER/NUMERIC/FLOAT/BOOLEAN/DATE/TIMESTAMP` plus native backend types with length (`VARCHAR(40)`, `NUMBER(18,2)`, `TIMESTAMP_TZ`, `VARIANT`, etc.) -- type/length validation delegated to the Storage API. `--not-null` marks a column `nullable=false`; `--default NAME=VALUE` sets a DEFAULT expression (booleans must be lowercase `true`/`false`). In a dev branch, the target bucket is auto-materialized if it has not yet been written to there -- response surfaces this via `auto_created_bucket: bool`. On legacy fake-branch projects (no `storage-branches` feature), `legacy_branch_storage: true` flags that the runner will use a separate `out.c--*` bucket at job time. `--if-not-exists` (0.47.0+) turns a duplicate-display-name failure into `action: skipped` when the table really exists at the expected id (safe for parallel workers). Since 0.47.1 the skipped envelope reports the EXISTING table's actual `columns`/`primary_key`/`name`, mirrors the request under `requested_columns`/`requested_primary_key`, and sets `schema_drift: true` when they diverge. **`--source-table-id` (0.66.0+, BigQuery only)** copies an existing table's data into the requested partition/clustering layout instead of building from `--column` (schema derived from source -> `--column`/`--not-null`/`--default` forbidden; the two are mutually exclusive). This is the supported way to repartition a populated BigQuery table -- then promote it with `storage swap-tables`. Partition/clustering flags (`--time-partitioning-*`, `--range-partitioning-*`, `--clustering-field`) also work on a plain `--column` create (BigQuery only); time vs range partitioning are mutually exclusive and range bounds are strings. When any source/partition/clustering flag is used, a one-call backend pre-flight rejects non-BigQuery projects (exit 2) before the create. See `storage-types-workflow.md` - `storage upload-table --project NAME --table-id ID --file PATH [--incremental] [--branch ID]` -- upload CSV (branch-aware) - `storage download-table --project NAME --table-id ID [--output FILE] [--columns COL ...] [--limit N] [--where-column COL --where-value VAL ... [--where-operator eq|neq]] [--changed-since WHEN] [--changed-until WHEN] [--branch ID]` -- export table to CSV (branch-aware). `--where-column` + `--where-value` (repeatable, OR within the set) + `--where-operator eq|neq` filter rows server-side; `--changed-since`/`--changed-until` (unix ts or strtotime like `-2 days`) filter by import time -- the credential-only, no-workspace way to pull a filtered/incremental slice (0.62.0+) - `storage add-column --project NAME --table-id ID --column COL:TYPE[(length)] [--not-null] [--default VALUE] [--branch ID]` -- add a single column to an existing table (0.62.0+). Same `name:TYPE(length)` grammar as `create-table --column`; a bare `name` adds an untyped STRING column. Synchronous endpoint (no job to wait on). `--not-null` needs an empty table or a `--default`. Mirror of `delete-column` diff --git a/plugins/kbagent/skills/kbagent/references/gotchas.md b/plugins/kbagent/skills/kbagent/references/gotchas.md index 427973dc..a56bbfb3 100644 --- a/plugins/kbagent/skills/kbagent/references/gotchas.md +++ b/plugins/kbagent/skills/kbagent/references/gotchas.md @@ -1534,6 +1534,33 @@ One project failing does not block others. Check the `errors` array: See [storage-types-workflow.md](storage-types-workflow.md) for the full type inventory and examples. +## `storage create-table --source-table-id` + partition/clustering are BigQuery-only (since 0.66.0) + +- **`--source-table-id` copies an existing table instead of building from `--column`.** + The new table's schema is derived from the source and its rows are copied into the + requested partition/clustering layout (`INSERT … SELECT`, preserving NOT NULL + primary + key). This is the supported way to repartition a populated BigQuery table; promote it + with `storage swap-tables`. Mirrors keboola/connection#7697. +- **`--column` and `--source-table-id` are mutually exclusive.** Supplying both, or + neither, exits 2 (`INVALID_ARGUMENT`) before any API call. `--not-null` / `--default` + attach to `--column` definitions, so they are also rejected in source mode. +- **Partition/clustering flags also work on a plain `--column` create** (BigQuery only): + `--time-partitioning-type` (DAY/HOUR/MONTH/YEAR; required when any `--time-partitioning-*` + is set) + optional `--time-partitioning-field`/`-expiration-ms`; OR + `--range-partitioning-field`/`-start`/`-end`/`-interval` (all four required together). + **Range bounds are strings** in the API, and time vs range partitioning are mutually + exclusive (BigQuery allows one partitioning kind per table). +- **BigQuery-only with a pre-flight guard.** When any source/partition/clustering flag is + used, `create-table` verifies the project backend (one token-verify call) and fails fast + with exit 2 + a clear `… require a BigQuery backend` message on a non-BigQuery project, + before issuing the create. A plain `--column` create makes no extra call. The connection + API also rejects these server-side (422 `storage.tables.backendDoesNotSupportSourceTable`, + `sourceAliasNotPersisted`, `sourceTableMissingReferencedColumn`; 404 + `sourceTableNotFound`) as a backstop. +- **Aliases and linked-bucket tables are valid sources.** A persisted alias (materialized + view) is queryable; a non-persisted alias (project lacks `bigquery-persisted-alias-views`) + is rejected 422. + ## Legacy fake-branch storage warning on `--branch` writes (since 0.25.2) - **What it is.** Projects without the `storage-branches` feature flag use diff --git a/plugins/kbagent/skills/kbagent/references/storage-types-workflow.md b/plugins/kbagent/skills/kbagent/references/storage-types-workflow.md index 9414db97..db33168d 100644 --- a/plugins/kbagent/skills/kbagent/references/storage-types-workflow.md +++ b/plugins/kbagent/skills/kbagent/references/storage-types-workflow.md @@ -302,3 +302,44 @@ Rules: exchanged on return. Real swaps observed at ~10s on Snowflake. - The swap is symmetric; there is no rollback besides swapping again (or aborting the dev branch). + +## BigQuery repartition via `create-table --source-table-id` (since 0.66.0) + +On **BigQuery** you can produce the repartitioned/re-clustered copy in a +single call instead of hand-writing a CTAS transformation: `create-table` +copies an existing table's data into a new partition/clustering layout, +then `swap-tables` flips it into place. + +```bash +# 1. Copy in.c-main.events into a new DAY-partitioned, tenant-clustered table. +# Schema (columns, NOT NULL) is derived from the source -> NO --column. +kbagent storage create-table --project prod --bucket-id in.c-main \ + --name events_repart --source-table-id in.c-main.events \ + --time-partitioning-type DAY --time-partitioning-field created_at \ + --clustering-field tenant_id --primary-key id + +# 2. Inspect the copy, then swap it into the original's place. +kbagent storage table-detail --project prod --table-id in.c-main.events_repart +kbagent storage swap-tables --project prod --table-id in.c-main.events \ + --target-table-id in.c-main.events_repart --branch --yes +``` + +Rules: +- **BigQuery only.** `--source-table-id` and the partition/clustering flags + (`--time-partitioning-*`, `--range-partitioning-*`, `--clustering-field`) + are rejected on a non-BigQuery project by a one-call backend pre-flight + (exit 2) before the create is issued. The connection API also rejects + them server-side (422) as a backstop. +- **`--column` XOR `--source-table-id`.** The two are mutually exclusive; + the schema in source mode is derived from the source, so `--column` / + `--not-null` / `--default` must not be supplied. +- **Partitioning shapes.** Time partitioning needs `--time-partitioning-type` + (DAY/HOUR/MONTH/YEAR); range partitioning needs all of + `--range-partitioning-field/-start/-end/-interval` (bounds are strings). + Time and range partitioning are mutually exclusive. +- **Sources.** Regular tables, linked-bucket tables, and persisted aliases + (materialized views) are valid copy sources. A non-persisted alias is + rejected 422. +- The partition/clustering flags also work on a plain `--column` create + (same BigQuery-only rule) when you want a fresh empty table in a specific + layout rather than a copy. diff --git a/pyproject.toml b/pyproject.toml index 4db6e17b..5117765e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "keboola-cli" -version = "0.65.1" +version = "0.66.0" description = "AI-friendly CLI for managing Keboola projects" readme = "README.md" requires-python = ">=3.12" diff --git a/src/keboola_agent_cli/changelog.py b/src/keboola_agent_cli/changelog.py index 1eb76afd..ce778498 100644 --- a/src/keboola_agent_cli/changelog.py +++ b/src/keboola_agent_cli/changelog.py @@ -24,6 +24,22 @@ # Ordered newest-first. Each value is a list of brief one-line descriptions. CHANGELOG: dict[str, list[str]] = { + "0.66.0": [ + "New: `storage create-table` can copy from an existing table and apply a " + "BigQuery partition/clustering layout. `--source-table-id` (with optional " + "`--source-branch-id`) derives the new table's schema from a source table and " + "copies its rows into the requested layout -- the supported way to repartition " + "a populated BigQuery table, then flip it into place with `storage swap-tables`. " + "`--column` is now optional and mutually exclusive with `--source-table-id`. " + "New layout flags (also usable on a plain columns create): " + "`--time-partitioning-type`/`-field`/`-expiration-ms`, `--range-partitioning-field`/" + "`-start`/`-end`/`-interval`, and `--clustering-field` (repeatable). Time and range " + "partitioning are mutually exclusive. Mirrors keboola/connection#7697.", + "Note: the source-copy and partition/clustering flags are BigQuery-only. " + "`create-table` runs a one-call backend pre-flight (token verify) when any of them " + "is used and fails fast with a clear message on a non-BigQuery project, before " + "issuing the create. A plain columns create is unaffected (no extra call).", + ], "0.65.1": [ "BREAKING: Removed `data-app git-bind-credential` (and its `kbagent serve` endpoint). It shipped in " "0.65.0 on a misdiagnosis: managed-repo deploys were failing and we believed the platform " diff --git a/src/keboola_agent_cli/client.py b/src/keboola_agent_cli/client.py index 23151abc..879fa9b3 100644 --- a/src/keboola_agent_cli/client.py +++ b/src/keboola_agent_cli/client.py @@ -1518,19 +1518,38 @@ def create_table( self, bucket_id: str, name: str, - columns: list[dict[str, Any]], + columns: list[dict[str, Any]] | None = None, primary_key: list[str] | None = None, branch_id: int | None = None, + source: dict[str, Any] | None = None, + time_partitioning: dict[str, Any] | None = None, + range_partitioning: dict[str, Any] | None = None, + clustering: dict[str, Any] | None = None, ) -> dict[str, Any]: """Create a new table with typed columns (async, waits for completion). + Hits the typed ``tables-definition`` endpoint. Exactly one of ``columns`` + or ``source`` is expected (the caller enforces this): with ``columns`` an + empty table is created from the definition; with ``source`` (BigQuery + only) the new table's schema is derived from the source table and its + rows are copied into the requested partition/clustering layout. + Args: bucket_id: Target bucket ID (e.g. "in.c-my-bucket"). name: Table name. columns: List of column dicts with "name" and "definition.type" keys, e.g. [{"name": "id", "definition": {"type": "INTEGER"}}]. + Omitted when ``source`` is set (forbidden together). primary_key: Optional list of column names for the primary key. branch_id: If set, create table in a specific dev branch. + source: Optional ``{"tableId": str, "branchId"?: int}`` to copy the new + table from (BigQuery only). Forbidden together with ``columns``. + time_partitioning: Optional ``{"type": str, "field"?: str, + "expirationMs"?: str}`` (BigQuery). Mutually exclusive with + ``range_partitioning``. + range_partitioning: Optional ``{"field": str, "range": {"start": str, + "end": str, "interval": str}}`` (BigQuery). + clustering: Optional ``{"fields": list[str]}`` (BigQuery). Returns: Completed storage job results dict. @@ -1540,8 +1559,19 @@ def create_table( body: dict[str, Any] = { "name": name, "primaryKeysNames": primary_key or [], - "columns": columns, } + # Send exactly one of columns / source. The Storage API rejects supplying + # both; the service layer enforces this before we get here. + if columns is not None: + body["columns"] = columns + if source is not None: + body["source"] = source + if time_partitioning is not None: + body["timePartitioning"] = time_partitioning + if range_partitioning is not None: + body["rangePartitioning"] = range_partitioning + if clustering is not None: + body["clustering"] = clustering response = self._request("POST", f"{prefix}/buckets/{safe_id}/tables-definition", json=body) job = self._wait_for_storage_job(response.json()) return job.get("results", {}) diff --git a/src/keboola_agent_cli/commands/context.py b/src/keboola_agent_cli/commands/context.py index d9a97c83..0618fb7c 100644 --- a/src/keboola_agent_cli/commands/context.py +++ b/src/keboola_agent_cli/commands/context.py @@ -345,7 +345,7 @@ time. Response includes `legacy_branch_storage: true` and human mode prints a warning when this applies. See storage-types-workflow.md. - kbagent storage create-table --project NAME --bucket-id BUCKET_ID --name TABLE_NAME --column col:TYPE[(length)] [...] [--primary-key COL] [--not-null COL ...] [--default NAME=VALUE ...] [--branch ID] [--if-not-exists] + kbagent storage create-table --project NAME --bucket-id BUCKET_ID --name TABLE_NAME [--column col:TYPE[(length)] ...] [--primary-key COL] [--not-null COL ...] [--default NAME=VALUE ...] [--source-table-id ID] [--source-branch-id N] [--time-partitioning-type DAY|HOUR|MONTH|YEAR] [--time-partitioning-field COL] [--time-partitioning-expiration-ms MS] [--range-partitioning-field COL --range-partitioning-start S --range-partitioning-end E --range-partitioning-interval I] [--clustering-field COL ...] [--branch ID] [--if-not-exists] Create a typed table. --column repeatable. - --if-not-exists (since 0.47.0): opt-in idempotency. On a duplicate-display-name failure, probe get-table-detail at the expected id and, if the table really exists, return @@ -359,6 +359,20 @@ The API validates type/length per backend; e.g. INTEGER(10) is rejected with "'10' is not valid length for INTEGER". - --not-null COL marks a column NOT NULL (nullable=false). Must match a defined --column name. - --default NAME=VALUE sets a DEFAULT expression. Booleans must be lowercase (true/false). + - --source-table-id (since 0.66.0, BigQuery only): create the table by COPYING an existing + table's data into the requested partition/clustering layout instead of from --column specs. + The schema is derived from the source, so --column (and --not-null/--default) must NOT be used; + the two are mutually exclusive. --source-branch-id resolves the source in another branch. + This is the supported way to repartition a populated BigQuery table -- then promote it with + `storage swap-tables`. Aliases and linked-bucket tables are valid sources. + - Partition/clustering layout (since 0.66.0, BigQuery only; works in BOTH --column and + --source-table-id mode): --time-partitioning-type (DAY/HOUR/MONTH/YEAR; required when any + --time-partitioning-* is set) + optional --time-partitioning-field/-expiration-ms; OR + --range-partitioning-field/-start/-end/-interval (all four required together; range bounds + are strings). Time and range partitioning are mutually exclusive. --clustering-field repeatable. + - BigQuery pre-flight guard: when any source/partition/clustering flag is used, create-table + verifies the project backend first and fails fast (exit 2) on a non-BigQuery project before + issuing the create. Plain --column creates are unaffected. - In a dev branch, the bucket is auto-materialized if it has not yet been written to in the branch (response includes auto_created_bucket=true). Mirrors the official Keboola Go CLI's EnsureBucketExists. - Auto-materialized buckets get KBC.createdBy.branch.id system metadata stamped on them, @@ -371,6 +385,10 @@ Branch-aware. Examples: --column pk:VARCHAR(40) --column amount:NUMERIC(18,2) --not-null pk --default amount=0 --column ts:TIMESTAMP_TZ --column meta:VARIANT + # BigQuery repartition: copy a populated table into a new layout, then swap it in place + --name events_repart --source-table-id in.c-main.events --time-partitioning-type DAY \\ + --time-partitioning-field created_at --clustering-field tenant_id --primary-key id + then: kbagent storage swap-tables --table-id in.c-main.events --target-table-id in.c-main.events_repart --branch ID kbagent storage upload-table --project NAME --table-id TABLE_ID --file PATH [--incremental] [--delimiter D] [--enclosure E] [--no-auto-create] [--branch ID] Upload CSV into a table. Auto-creates bucket and table if missing (columns inferred as STRING from CSV header). diff --git a/src/keboola_agent_cli/commands/storage.py b/src/keboola_agent_cli/commands/storage.py index 1a9571a5..9596cede 100644 --- a/src/keboola_agent_cli/commands/storage.py +++ b/src/keboola_agent_cli/commands/storage.py @@ -527,14 +527,15 @@ def storage_create_table( "--name", help="Table name", ), - column: list[str] = typer.Option( - ..., + column: list[str] | None = typer.Option( + None, "--column", help=( "Column as 'name:TYPE' or 'name:TYPE(length)'. Repeatable. Base types: " "STRING, INTEGER, NUMERIC, FLOAT, BOOLEAN, DATE, TIMESTAMP. Native types " "are passed through to the Storage API (e.g. 'pk:VARCHAR(40)', " - "'amount:NUMERIC(18,2)', 'ts:TIMESTAMP_TZ', 'meta:VARIANT')." + "'amount:NUMERIC(18,2)', 'ts:TIMESTAMP_TZ', 'meta:VARIANT'). Required " + "unless --source-table-id is given; the two are mutually exclusive." ), ), primary_key: list[str] | None = typer.Option( @@ -555,6 +556,61 @@ def storage_create_table( "lowercase ('true'/'false') per Keboola API validation." ), ), + source_table_id: str | None = typer.Option( + None, + "--source-table-id", + help=( + "Create the table by copying an existing table's data into the " + "requested partition/clustering layout (BigQuery only). The column " + "definition is derived from the source, so --column must not be used. " + "Pair with 'swap-tables' to repartition a populated table in place." + ), + ), + source_branch_id: int | None = typer.Option( + None, + "--source-branch-id", + help="Branch ID the source table is resolved in (defaults to the request branch).", + ), + time_partitioning_type: str | None = typer.Option( + None, + "--time-partitioning-type", + help="BigQuery time partitioning type, e.g. DAY, HOUR, MONTH, YEAR.", + ), + time_partitioning_field: str | None = typer.Option( + None, + "--time-partitioning-field", + help="Column used for time partitioning (defaults to ingestion time if omitted).", + ), + time_partitioning_expiration_ms: str | None = typer.Option( + None, + "--time-partitioning-expiration-ms", + help="Milliseconds to keep storage for a partition (BigQuery).", + ), + range_partitioning_field: str | None = typer.Option( + None, + "--range-partitioning-field", + help="Column used for integer-range partitioning (BigQuery).", + ), + range_partitioning_start: str | None = typer.Option( + None, + "--range-partitioning-start", + help="Start of the range partitioning, inclusive (required with other range flags).", + ), + range_partitioning_end: str | None = typer.Option( + None, + "--range-partitioning-end", + help="End of the range partitioning, exclusive (required with other range flags).", + ), + range_partitioning_interval: str | None = typer.Option( + None, + "--range-partitioning-interval", + help="Width of each range interval (required with other range flags).", + ), + clustering_field: list[str] | None = typer.Option( + None, + "--clustering-field", + help="Column used for clustering (BigQuery). Repeatable.", + ), branch: int | None = typer.Option( None, "--branch", @@ -593,6 +649,14 @@ def storage_create_table( --column ts:TIMESTAMP_TZ --column is_paid:BOOLEAN \\ --primary-key pk --not-null pk --not-null amount \\ --default amount=0 --default is_paid=false + + # BigQuery: repartition a populated table by copying it into a new + # layout, then swap it into place (no --column -- schema derives from + # the source): + kbagent storage create-table --project p --bucket-id in.c-main \\ + --name events_repart --source-table-id in.c-main.events \\ + --time-partitioning-type DAY --time-partitioning-field created_at \\ + --clustering-field tenant_id --primary-key id --branch 123 """ formatter = get_formatter(ctx) service = get_service(ctx, "storage_service") @@ -610,6 +674,16 @@ def storage_create_table( not_null_columns=not_null, defaults=default, if_not_exists=if_not_exists, + source_table_id=source_table_id, + source_branch_id=source_branch_id, + time_partitioning_type=time_partitioning_type, + time_partitioning_field=time_partitioning_field, + time_partitioning_expiration_ms=time_partitioning_expiration_ms, + range_partitioning_field=range_partitioning_field, + range_partitioning_start=range_partitioning_start, + range_partitioning_end=range_partitioning_end, + range_partitioning_interval=range_partitioning_interval, + clustering_fields=clustering_field, ) except ValueError as exc: formatter.error(message=str(exc), error_code=ErrorCode.INVALID_ARGUMENT) @@ -648,9 +722,23 @@ def storage_create_table( f" [yellow]Note:[/yellow] bucket {result['bucket_id']} was " f"auto-materialized in this branch." ) + if result.get("source_table_id"): + formatter.console.print(f" Copied from: {result['source_table_id']}") if result["primary_key"]: formatter.console.print(f" Primary key: {', '.join(result['primary_key'])}") - formatter.console.print(f" Columns: {', '.join(result['columns'])}") + if result.get("columns"): + formatter.console.print(f" Columns: {', '.join(result['columns'])}") + time_partitioning = result.get("time_partitioning") + if time_partitioning: + field = time_partitioning.get("field") + suffix = f" on {field}" if field else " (ingestion time)" + formatter.console.print(f" Time partitioning: {time_partitioning['type']}{suffix}") + range_partitioning = result.get("range_partitioning") + if range_partitioning: + formatter.console.print(f" Range partitioning: {range_partitioning['field']}") + clustering = result.get("clustering") + if clustering: + formatter.console.print(f" Clustering: {', '.join(clustering['fields'])}") if result.get("legacy_branch_storage"): formatter.console.print(_LEGACY_BRANCH_STORAGE_WARNING) diff --git a/src/keboola_agent_cli/services/storage_service.py b/src/keboola_agent_cli/services/storage_service.py index d4ee80d9..e5bb5f38 100644 --- a/src/keboola_agent_cli/services/storage_service.py +++ b/src/keboola_agent_cli/services/storage_service.py @@ -169,6 +169,107 @@ def _parse_default_assignments(defaults: list[str] | None) -> dict[str, str]: return result +def _build_source( + source_table_id: str | None, + source_branch_id: int | None, +) -> dict[str, Any] | None: + """Build the optional ``source`` object for the tables-definition endpoint. + + Returns ``None`` when no source is requested. ``branchId`` is only included + when explicitly given; otherwise the API defaults it to the request branch. + """ + if source_table_id is None: + return None + if not source_table_id.strip(): + raise ValueError("--source-table-id must not be empty.") + source: dict[str, Any] = {"tableId": source_table_id} + if source_branch_id is not None: + source["branchId"] = source_branch_id + return source + + +def _build_bigquery_layout( + time_partitioning_type: str | None, + time_partitioning_field: str | None, + time_partitioning_expiration_ms: str | None, + range_partitioning_field: str | None, + range_partitioning_start: str | None, + range_partitioning_end: str | None, + range_partitioning_interval: str | None, + clustering_fields: list[str] | None, +) -> tuple[dict[str, Any] | None, dict[str, Any] | None, dict[str, Any] | None]: + """Assemble the BigQuery partition/clustering objects from typed flags. + + Mirrors the shapes the Storage API already expects (connection's + ``BigqueryCreateTableDefinitionRequest``): + + - ``timePartitioning`` ``{"type", "field"?, "expirationMs"?}`` -- ``type`` required. + - ``rangePartitioning`` ``{"field", "range": {"start", "end", "interval"}}`` -- + all four required together; range bounds are **strings** in the API. + - ``clustering`` ``{"fields": [...]}``. + + ``timePartitioning`` and ``rangePartitioning`` are mutually exclusive + (BigQuery allows only one partitioning kind). Returns ``(None, None, None)`` + when nothing is requested. + + Raises: + ValueError: incomplete/conflicting partitioning flags. + """ + has_time = any( + v is not None + for v in (time_partitioning_type, time_partitioning_field, time_partitioning_expiration_ms) + ) + range_parts = ( + range_partitioning_field, + range_partitioning_start, + range_partitioning_end, + range_partitioning_interval, + ) + has_range = any(v is not None for v in range_parts) + + if has_time and has_range: + raise ValueError( + "--time-partitioning-* and --range-partitioning-* are mutually exclusive; " + "BigQuery supports only one partitioning kind per table." + ) + + time_partitioning: dict[str, Any] | None = None + if has_time: + if not time_partitioning_type: + raise ValueError( + "--time-partitioning-type is required when any --time-partitioning-* " + "flag is set (e.g. DAY, HOUR, MONTH, YEAR)." + ) + time_partitioning = {"type": time_partitioning_type} + if time_partitioning_field is not None: + time_partitioning["field"] = time_partitioning_field + if time_partitioning_expiration_ms is not None: + time_partitioning["expirationMs"] = time_partitioning_expiration_ms + + range_partitioning: dict[str, Any] | None = None + if has_range: + if not all(v is not None for v in range_parts): + raise ValueError( + "--range-partitioning requires all of --range-partitioning-field, " + "--range-partitioning-start, --range-partitioning-end and " + "--range-partitioning-interval." + ) + range_partitioning = { + "field": range_partitioning_field, + "range": { + "start": range_partitioning_start, + "end": range_partitioning_end, + "interval": range_partitioning_interval, + }, + } + + clustering: dict[str, Any] | None = None + if clustering_fields: + clustering = {"fields": list(clustering_fields)} + + return time_partitioning, range_partitioning, clustering + + def _ensure_bucket_exists_in_branch( client: Any, bucket_id: str, @@ -731,14 +832,24 @@ def create_table( alias: str, bucket_id: str, name: str, - columns: list[str], + columns: list[str] | None = None, primary_key: list[str] | None = None, branch_id: int | None = None, not_null_columns: list[str] | None = None, defaults: list[str] | None = None, if_not_exists: bool = False, + source_table_id: str | None = None, + source_branch_id: int | None = None, + time_partitioning_type: str | None = None, + time_partitioning_field: str | None = None, + time_partitioning_expiration_ms: str | None = None, + range_partitioning_field: str | None = None, + range_partitioning_start: str | None = None, + range_partitioning_end: str | None = None, + range_partitioning_interval: str | None = None, + clustering_fields: list[str] | None = None, ) -> dict[str, Any]: - """Create a new table with typed columns. + """Create a new table with typed columns, or by copying a source table. Column specs accept base Keboola types (STRING, INTEGER, NUMERIC, FLOAT, BOOLEAN, DATE, TIMESTAMP) *and* native backend types with @@ -747,6 +858,14 @@ def create_table( Storage API derives ``basetype`` automatically and returns precise errors for invalid type/length pairs per backend. + Exactly one of ``columns`` or ``source_table_id`` must be given. With + ``source_table_id`` (BigQuery only) the new table's schema is derived + from the source table and its rows are copied into the requested + partition/clustering layout -- the supported way to repartition a + populated BigQuery table, then flip it into place with + ``swap_tables``. The partition/clustering flags also apply to a normal + ``columns`` create (BigQuery only). + When ``branch_id`` targets a dev branch and the bucket has not been materialized there yet, this method auto-creates it (mirrors the official Go CLI's ``EnsureBucketExists`` pattern). The response @@ -757,15 +876,25 @@ def create_table( bucket_id: Target bucket ID (e.g. ``in.c-my-bucket``). name: Table name. columns: List of column specs -- ``name``, ``name:TYPE``, or - ``name:TYPE(length)``. + ``name:TYPE(length)``. Forbidden together with + ``source_table_id``. primary_key: Optional list of primary-key column names. branch_id: If set, create the table in this dev branch and auto-materialize the bucket when missing. not_null_columns: Column names to mark NOT NULL (``nullable=false`` - in the API definition). + in the API definition). Not valid in source mode. defaults: ``NAME=VALUE`` assignments for DEFAULT expressions (e.g. ``is_admin=false``, ``amount=0``). Boolean defaults - must be lowercase per Keboola API validation. + must be lowercase per Keboola API validation. Not valid in + source mode. + source_table_id: Storage table ID to copy from (BigQuery only). + source_branch_id: Optional branch the source is resolved in. + time_partitioning_type/field/expiration_ms: BigQuery time + partitioning (``type`` required when any is set). + range_partitioning_field/start/end/interval: BigQuery integer-range + partitioning (all four required together). Mutually exclusive + with time partitioning. + clustering_fields: BigQuery clustering columns. Returns: Dict with table details and ``auto_created_bucket`` flag. @@ -778,16 +907,65 @@ def create_table( ``schema_drift`` is ``True`` when the two diverge. Raises: - ValueError: Malformed column spec or ``--default`` assignment, - or ``--not-null`` / ``--default`` references an unknown - column. + ValueError: Malformed column spec or ``--default`` assignment; + ``--not-null`` / ``--default`` references an unknown column; + ``columns`` and ``source`` both/neither given; incomplete or + conflicting partitioning flags; or BigQuery-only features + requested on a non-BigQuery backend. """ not_null_set = set(not_null_columns or []) defaults_map = _parse_default_assignments(defaults) - parsed_columns = [ - _parse_column_spec(col_spec, not_null_set, defaults_map) for col_spec in columns - ] + source = _build_source(source_table_id, source_branch_id) + time_partitioning, range_partitioning, clustering = _build_bigquery_layout( + time_partitioning_type, + time_partitioning_field, + time_partitioning_expiration_ms, + range_partitioning_field, + range_partitioning_start, + range_partitioning_end, + range_partitioning_interval, + clustering_fields, + ) + uses_bigquery_features = ( + source is not None + or time_partitioning is not None + or range_partitioning is not None + or clustering is not None + ) + has_columns = bool(columns) + + # Exactly one of columns / source. The Storage API forbids both and + # requires at least one; fail fast with a clear message. + if source is not None and has_columns: + raise ValueError( + "--column must not be combined with --source-table-id; in source " + "mode the column definition is derived from the source table." + ) + if source is None and not has_columns: + raise ValueError( + "create-table requires either --column (one or more) or " + "--source-table-id (copy from an existing BigQuery table)." + ) + + # not-null / default attach to --column definitions; they have no + # meaning when columns are derived from a source table. + if source is not None and not_null_columns: + raise ValueError( + "--not-null is not valid with --source-table-id (columns are " + "derived from the source table)." + ) + if source is not None and defaults: + raise ValueError( + "--default is not valid with --source-table-id (columns are " + "derived from the source table)." + ) + + parsed_columns = ( + [_parse_column_spec(col_spec, not_null_set, defaults_map) for col_spec in columns] + if columns + else [] + ) # Reject attribute references to columns not actually defined. Without # this check a typo like `--not-null pk --column pkey:VARCHAR(40)` @@ -812,14 +990,31 @@ def create_table( client = self._client_factory(project.stack_url, project.token) target_table_id = f"{bucket_id}.{name}" try: + # BigQuery pre-flight guard: source copy + partition/clustering are + # BigQuery-only. When any is requested, verify the project's backend + # before issuing the create so a non-BigQuery project fails fast with + # a clear message instead of a late driver-side 422. + if uses_bigquery_features: + backend = (client.verify_token().default_backend or "").lower() + if backend != "bigquery": + raise ValueError( + f"Project backend is '{backend or 'unknown'}'; " + "--source-table-id and partition/clustering flags " + "require a BigQuery backend." + ) + auto_created_bucket = _ensure_bucket_exists_in_branch(client, bucket_id, branch_id) try: results = client.create_table( bucket_id=bucket_id, name=name, - columns=parsed_columns, + columns=parsed_columns if columns else None, primary_key=primary_key, branch_id=branch_id, + source=source, + time_partitioning=time_partitioning, + range_partitioning=range_partitioning, + clustering=clustering, ) except KeboolaApiError as exc: # IF-NOT-EXISTS: if the create failed because the table @@ -874,16 +1069,30 @@ def create_table( finally: client.close() + # In columns mode the requested column names are authoritative. In source + # mode the schema is derived from the source, so surface whatever columns + # the completed create job reports (a list of names or column dicts). + if columns: + result_columns = [c["name"] for c in parsed_columns] + else: + raw_columns = results.get("columns") or [] + result_columns = [c["name"] if isinstance(c, dict) else c for c in raw_columns] + return { "project_alias": alias, "table_id": results.get("id", target_table_id), "name": name, "bucket_id": bucket_id, "primary_key": primary_key or [], - "columns": [c["name"] for c in parsed_columns], + "columns": result_columns, "auto_created_bucket": auto_created_bucket, "legacy_branch_storage": legacy_branch_storage, "action": "created", + "source_table_id": source_table_id, + "source_branch_id": source_branch_id, + "time_partitioning": time_partitioning, + "range_partitioning": range_partitioning, + "clustering": clustering, } def upload_table( diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 13e92140..35b6fa94 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -471,6 +471,13 @@ def test_full_cli_e2e(self) -> None: _step(15, "storage load-file", "upload CSV as file then load into table") self._test_load_file(table_id) + _step( + "15.1", + "storage create-table --source-table-id + swap-tables", + "BigQuery repartition workflow (backend-aware)", + ) + self._test_create_table_from_source(bucket_id, table_id) + # ============================================================== # PHASE 4: Config operations (create via API, test via CLI) # ============================================================== @@ -835,6 +842,83 @@ def _test_create_table(self, bucket_id: str) -> str: assert table_id return table_id + def _test_create_table_from_source(self, bucket_id: str, source_table_id: str) -> None: + """create-table --source-table-id (+ swap-tables). BigQuery-only feature. + + On a BigQuery project: copy the populated source into a new table with a + clustering layout, then swap the two. On any other backend: assert the + pre-flight guard rejects the request (exit 2) before issuing the create. + """ + detail = self._run_ok( + "storage", "bucket-detail", "--project", self.alias, "--bucket-id", bucket_id + )["data"] + backend = (detail.get("backend") or detail.get("sql_dialect") or "").lower() + + repart_name = f"{RUN_ID.replace('-', '_')}_repart" + repart_id = f"{bucket_id}.{repart_name}" + + if backend != "bigquery": + # Pre-flight guard: a non-BigQuery backend fails fast with exit 2 and + # never issues the create (no table is left behind). + result = self._run( + "storage", + "create-table", + "--project", + self.alias, + "--bucket-id", + bucket_id, + "--name", + repart_name, + "--source-table-id", + source_table_id, + "--clustering-field", + "id", + ) + assert result.exit_code == 2, ( + f"Expected exit 2 from BigQuery guard, got {result.exit_code}:\n{result.output}" + ) + assert "BigQuery" in result.output + return + + # BigQuery: create the repartitioned copy from the populated source. + created = self._run_ok( + "storage", + "create-table", + "--project", + self.alias, + "--bucket-id", + bucket_id, + "--name", + repart_name, + "--source-table-id", + source_table_id, + "--clustering-field", + "id", + "--primary-key", + "id", + )["data"] + assert created["table_id"] == repart_id + assert created["source_table_id"] == source_table_id + + # Swap the repartitioned copy into the original table's place. Storage + # swap requires a branch; the default branch works. + branch_id = self._run_ok("branch", "list", "--project", self.alias)["data"]["branches"][0][ + "id" + ] + self._run_ok( + "storage", + "swap-tables", + "--project", + self.alias, + "--table-id", + source_table_id, + "--target-table-id", + repart_id, + "--branch", + str(branch_id), + "--yes", + ) + def _test_upload_table(self, table_id: str) -> None: """Upload CSV data to the table.""" csv_path = _create_test_csv(self.data_dir, rows=5) diff --git a/tests/test_storage_create_table.py b/tests/test_storage_create_table.py new file mode 100644 index 00000000..3f665ab3 --- /dev/null +++ b/tests/test_storage_create_table.py @@ -0,0 +1,422 @@ +"""Tests for storage create-table: source copy + BigQuery partition/clustering. + +Covers the create-table-from-source / repartition capability added to mirror +keboola/connection#7697: +- Client: conditional request body (source vs columns, partition/clustering). +- Service: columns/source XOR validation, partition-flag validation, the + BigQuery pre-flight backend guard, and the result envelope. +- CLI: new flags reach the service; --column is no longer required. +""" + +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from typer.testing import CliRunner + +from keboola_agent_cli.cli import app +from keboola_agent_cli.client import KeboolaClient +from keboola_agent_cli.config_store import ConfigStore +from keboola_agent_cli.models import AppConfig, ProjectConfig +from keboola_agent_cli.services.storage_service import StorageService + +runner = CliRunner() + +TEST_TOKEN = "901-55555-fakeTestTokenDoNotUseXXXXXXXX" + + +def _make_store(tmp_path: Path) -> ConfigStore: + config_dir = tmp_path / "config" + config_dir.mkdir(exist_ok=True) + store = ConfigStore(config_dir=config_dir) + config = AppConfig( + projects={ + "test": ProjectConfig( + stack_url="https://connection.keboola.com", + token=TEST_TOKEN, + ), + }, + ) + store.save(config) + return store + + +def _make_service(store: ConfigStore, mock_client: MagicMock) -> StorageService: + return StorageService( + config_store=store, + client_factory=lambda url, token: mock_client, + ) + + +def _bigquery_client(create_result: dict | None = None) -> MagicMock: + """MagicMock KeboolaClient that reports a BigQuery backend. + + branch_id is None in these tests, so the bucket-exists / legacy-branch + helpers short-circuit and only verify_token + create_table are exercised. + """ + client = MagicMock() + client.verify_token.return_value = MagicMock(default_backend="bigquery") + client.create_table.return_value = create_result or {"id": "in.c-main.events_repart"} + return client + + +# --------------------------------------------------------------------------- +# Client layer +# --------------------------------------------------------------------------- + + +class TestCreateTableClientBody: + """KeboolaClient.create_table() request body shaping.""" + + def test_source_mode_omits_columns(self, httpx_mock) -> None: + """Source mode sends `source` and NO `columns` key.""" + httpx_mock.add_response( + url="https://connection.keboola.com/v2/storage/buckets/in.c-main/tables-definition", + method="POST", + json={"id": 1, "status": "success", "results": {"id": "in.c-main.events_repart"}}, + status_code=200, + ) + client = KeboolaClient(stack_url="https://connection.keboola.com", token=TEST_TOKEN) + client.create_table( + bucket_id="in.c-main", + name="events_repart", + source={"tableId": "in.c-main.events", "branchId": 123}, + primary_key=["id"], + time_partitioning={"type": "DAY", "field": "created_at"}, + clustering={"fields": ["tenant_id"]}, + ) + + body = json.loads(httpx_mock.get_request().content.decode("utf-8")) + assert body["name"] == "events_repart" + assert body["primaryKeysNames"] == ["id"] + assert body["source"] == {"tableId": "in.c-main.events", "branchId": 123} + assert "columns" not in body + assert body["timePartitioning"] == {"type": "DAY", "field": "created_at"} + assert body["clustering"] == {"fields": ["tenant_id"]} + client.close() + + def test_range_partitioning_body_uses_strings(self, httpx_mock) -> None: + """rangePartitioning bounds are sent as strings, matching the API.""" + httpx_mock.add_response( + url="https://connection.keboola.com/v2/storage/buckets/in.c-main/tables-definition", + method="POST", + json={"id": 1, "status": "success", "results": {"id": "in.c-main.t"}}, + status_code=200, + ) + client = KeboolaClient(stack_url="https://connection.keboola.com", token=TEST_TOKEN) + client.create_table( + bucket_id="in.c-main", + name="t", + source={"tableId": "in.c-main.src"}, + range_partitioning={ + "field": "id", + "range": {"start": "0", "end": "1000000", "interval": "1000"}, + }, + ) + + body = json.loads(httpx_mock.get_request().content.decode("utf-8")) + assert body["rangePartitioning"] == { + "field": "id", + "range": {"start": "0", "end": "1000000", "interval": "1000"}, + } + assert body["source"] == {"tableId": "in.c-main.src"} + client.close() + + def test_columns_mode_unchanged(self, httpx_mock) -> None: + """Plain columns create still sends `columns` and no `source`/layout.""" + httpx_mock.add_response( + url="https://connection.keboola.com/v2/storage/buckets/in.c-main/tables-definition", + method="POST", + json={"id": 1, "status": "success", "results": {"id": "in.c-main.t"}}, + status_code=200, + ) + client = KeboolaClient(stack_url="https://connection.keboola.com", token=TEST_TOKEN) + client.create_table( + bucket_id="in.c-main", + name="t", + columns=[{"name": "id", "definition": {"type": "INTEGER"}}], + primary_key=["id"], + ) + + body = json.loads(httpx_mock.get_request().content.decode("utf-8")) + assert body["columns"] == [{"name": "id", "definition": {"type": "INTEGER"}}] + assert "source" not in body + assert "timePartitioning" not in body + assert "clustering" not in body + client.close() + + +# --------------------------------------------------------------------------- +# Service layer -- validation +# --------------------------------------------------------------------------- + + +class TestCreateTableServiceValidation: + """Argument validation happens before any HTTP call.""" + + def test_columns_and_source_both_rejected(self, tmp_path: Path) -> None: + service = _make_service(_make_store(tmp_path), MagicMock()) + with pytest.raises(ValueError, match="must not be combined"): + service.create_table( + alias="test", + bucket_id="in.c-main", + name="t", + columns=["id:INTEGER"], + source_table_id="in.c-main.src", + ) + + def test_neither_columns_nor_source_rejected(self, tmp_path: Path) -> None: + service = _make_service(_make_store(tmp_path), MagicMock()) + with pytest.raises(ValueError, match="either --column"): + service.create_table(alias="test", bucket_id="in.c-main", name="t") + + def test_not_null_rejected_in_source_mode(self, tmp_path: Path) -> None: + service = _make_service(_make_store(tmp_path), MagicMock()) + with pytest.raises(ValueError, match="--not-null is not valid"): + service.create_table( + alias="test", + bucket_id="in.c-main", + name="t", + source_table_id="in.c-main.src", + not_null_columns=["id"], + ) + + def test_default_rejected_in_source_mode(self, tmp_path: Path) -> None: + service = _make_service(_make_store(tmp_path), MagicMock()) + with pytest.raises(ValueError, match="--default is not valid"): + service.create_table( + alias="test", + bucket_id="in.c-main", + name="t", + source_table_id="in.c-main.src", + defaults=["id=0"], + ) + + def test_incomplete_range_partitioning_rejected(self, tmp_path: Path) -> None: + service = _make_service(_make_store(tmp_path), MagicMock()) + with pytest.raises(ValueError, match="--range-partitioning requires all"): + service.create_table( + alias="test", + bucket_id="in.c-main", + name="t", + source_table_id="in.c-main.src", + range_partitioning_field="id", + range_partitioning_start="0", + # end + interval missing + ) + + def test_time_and_range_partitioning_mutually_exclusive(self, tmp_path: Path) -> None: + service = _make_service(_make_store(tmp_path), MagicMock()) + with pytest.raises(ValueError, match="mutually exclusive"): + service.create_table( + alias="test", + bucket_id="in.c-main", + name="t", + source_table_id="in.c-main.src", + time_partitioning_type="DAY", + range_partitioning_field="id", + range_partitioning_start="0", + range_partitioning_end="100", + range_partitioning_interval="10", + ) + + def test_time_partitioning_field_without_type_rejected(self, tmp_path: Path) -> None: + service = _make_service(_make_store(tmp_path), MagicMock()) + with pytest.raises(ValueError, match="--time-partitioning-type is required"): + service.create_table( + alias="test", + bucket_id="in.c-main", + name="t", + source_table_id="in.c-main.src", + time_partitioning_field="created_at", + ) + + +# --------------------------------------------------------------------------- +# Service layer -- BigQuery pre-flight guard +# --------------------------------------------------------------------------- + + +class TestCreateTableBackendGuard: + """The BigQuery-only guard fires before the create POST.""" + + def test_source_on_snowflake_rejected_before_post(self, tmp_path: Path) -> None: + client = MagicMock() + client.verify_token.return_value = MagicMock(default_backend="snowflake") + service = _make_service(_make_store(tmp_path), client) + + with pytest.raises(ValueError, match="require a BigQuery backend"): + service.create_table( + alias="test", + bucket_id="in.c-main", + name="t", + source_table_id="in.c-main.src", + ) + client.create_table.assert_not_called() + + def test_partitioning_on_snowflake_rejected(self, tmp_path: Path) -> None: + client = MagicMock() + client.verify_token.return_value = MagicMock(default_backend="snowflake") + service = _make_service(_make_store(tmp_path), client) + + with pytest.raises(ValueError, match="require a BigQuery backend"): + service.create_table( + alias="test", + bucket_id="in.c-main", + name="t", + columns=["id:INTEGER"], + clustering_fields=["id"], + ) + client.create_table.assert_not_called() + + def test_source_on_bigquery_passes(self, tmp_path: Path) -> None: + client = _bigquery_client( + {"id": "in.c-main.events_repart", "columns": ["id", "created_at"]} + ) + service = _make_service(_make_store(tmp_path), client) + + result = service.create_table( + alias="test", + bucket_id="in.c-main", + name="events_repart", + source_table_id="in.c-main.events", + time_partitioning_type="DAY", + time_partitioning_field="created_at", + clustering_fields=["tenant_id"], + primary_key=["id"], + ) + + client.create_table.assert_called_once() + kwargs = client.create_table.call_args.kwargs + assert kwargs["columns"] is None + assert kwargs["source"] == {"tableId": "in.c-main.events"} + assert kwargs["time_partitioning"] == {"type": "DAY", "field": "created_at"} + assert kwargs["clustering"] == {"fields": ["tenant_id"]} + # Columns are derived from the completed job in source mode. + assert result["columns"] == ["id", "created_at"] + assert result["source_table_id"] == "in.c-main.events" + assert result["action"] == "created" + + def test_plain_columns_create_skips_backend_check(self, tmp_path: Path) -> None: + """No BigQuery-only feature => verify_token is never called (no regression).""" + client = MagicMock() + client.create_table.return_value = {"id": "in.c-main.t"} + service = _make_service(_make_store(tmp_path), client) + + service.create_table( + alias="test", + bucket_id="in.c-main", + name="t", + columns=["id:INTEGER"], + ) + client.verify_token.assert_not_called() + + +# --------------------------------------------------------------------------- +# CLI layer +# --------------------------------------------------------------------------- + + +class TestCreateTableCli: + def test_source_flags_passed_through(self, tmp_path: Path) -> None: + store = _make_store(tmp_path) + with ( + patch("keboola_agent_cli.cli.ConfigStore") as MockStore, + patch("keboola_agent_cli.cli.StorageService") as MockSvc, + ): + MockStore.return_value = store + svc = MockSvc.return_value + svc.create_table.return_value = { + "project_alias": "test", + "table_id": "in.c-main.events_repart", + "name": "events_repart", + "bucket_id": "in.c-main", + "primary_key": ["id"], + "columns": ["id", "created_at"], + "auto_created_bucket": False, + "legacy_branch_storage": False, + "action": "created", + "source_table_id": "in.c-main.events", + "source_branch_id": None, + "time_partitioning": {"type": "DAY", "field": "created_at"}, + "range_partitioning": None, + "clustering": {"fields": ["tenant_id"]}, + } + result = runner.invoke( + app, + [ + "--json", + "storage", + "create-table", + "--project", + "test", + "--bucket-id", + "in.c-main", + "--name", + "events_repart", + "--source-table-id", + "in.c-main.events", + "--time-partitioning-type", + "DAY", + "--time-partitioning-field", + "created_at", + "--clustering-field", + "tenant_id", + "--primary-key", + "id", + ], + ) + + assert result.exit_code == 0, result.output + kwargs = svc.create_table.call_args.kwargs + assert kwargs["source_table_id"] == "in.c-main.events" + assert kwargs["time_partitioning_type"] == "DAY" + assert kwargs["time_partitioning_field"] == "created_at" + assert kwargs["clustering_fields"] == ["tenant_id"] + # --column is optional now; nothing was passed. + assert not kwargs["columns"] + + def test_column_no_longer_required(self, tmp_path: Path) -> None: + """Invoking with only --source-table-id (no --column) must not be a usage error.""" + store = _make_store(tmp_path) + with ( + patch("keboola_agent_cli.cli.ConfigStore") as MockStore, + patch("keboola_agent_cli.cli.StorageService") as MockSvc, + ): + MockStore.return_value = store + svc = MockSvc.return_value + svc.create_table.return_value = { + "project_alias": "test", + "table_id": "in.c-main.t", + "name": "t", + "bucket_id": "in.c-main", + "primary_key": [], + "columns": ["id"], + "auto_created_bucket": False, + "legacy_branch_storage": False, + "action": "created", + "source_table_id": "in.c-main.src", + "source_branch_id": None, + "time_partitioning": None, + "range_partitioning": None, + "clustering": None, + } + result = runner.invoke( + app, + [ + "storage", + "create-table", + "--project", + "test", + "--bucket-id", + "in.c-main", + "--name", + "t", + "--source-table-id", + "in.c-main.src", + ], + ) + + assert result.exit_code == 0, result.output + svc.create_table.assert_called_once() diff --git a/tests/test_storage_write.py b/tests/test_storage_write.py index 106cb8ed..9808fa28 100644 --- a/tests/test_storage_write.py +++ b/tests/test_storage_write.py @@ -216,6 +216,10 @@ def test_success(self, tmp_path: Path) -> None: ], primary_key=["id"], branch_id=None, + source=None, + time_partitioning=None, + range_partitioning=None, + clustering=None, ) mock_client.close.assert_called_once() @@ -258,6 +262,10 @@ def test_no_primary_key(self, tmp_path: Path) -> None: columns=[{"name": "x", "definition": {"type": "STRING"}}], primary_key=None, branch_id=None, + source=None, + time_partitioning=None, + range_partitioning=None, + clustering=None, ) def test_unknown_type_is_passed_through_to_api(self, tmp_path: Path) -> None: @@ -1401,6 +1409,16 @@ def test_create_table_json(self, tmp_path: Path) -> None: not_null_columns=None, defaults=None, if_not_exists=False, + source_table_id=None, + source_branch_id=None, + time_partitioning_type=None, + time_partitioning_field=None, + time_partitioning_expiration_ms=None, + range_partitioning_field=None, + range_partitioning_start=None, + range_partitioning_end=None, + range_partitioning_interval=None, + clustering_fields=None, ) def test_create_table_native_types_and_attributes(self, tmp_path: Path) -> None: @@ -1854,6 +1872,10 @@ def test_service_passes_branch_id(self, tmp_path: Path) -> None: columns=[{"name": "id", "definition": {"type": "INTEGER"}}], primary_key=None, branch_id=77, + source=None, + time_partitioning=None, + range_partitioning=None, + clustering=None, ) # In a branch we check bucket existence first (auto-materialize). mock_client.get_bucket_detail.assert_called_once_with("in.c-b", branch_id=77) diff --git a/uv.lock b/uv.lock index e5ec232b..c576a31c 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.12" resolution-markers = [ "python_full_version >= '3.14' and sys_platform == 'win32'", @@ -590,7 +590,7 @@ wheels = [ [[package]] name = "keboola-cli" -version = "0.65.1" +version = "0.66.0" source = { editable = "." } dependencies = [ { name = "croniter" }, From 920b5bd8251dd2c7979ec03b847009a92431a75e Mon Sep 17 00:00:00 2001 From: yustme Date: Mon, 29 Jun 2026 19:43:47 +0200 Subject: [PATCH 2/2] fix(storage): address CR on create-table from source - Reject --source-branch-id without --source-table-id (was silently dropped) - Keep skipped if-not-exists envelope schema-consistent with created path - Show range partitioning bounds in human output - Restore uv.lock revision 3 (unrelated downgrade) --- src/keboola_agent_cli/commands/storage.py | 11 +++++++++- .../services/storage_service.py | 20 ++++++++++++++++--- tests/test_storage_create_table.py | 11 ++++++++++ tests/test_storage_write.py | 7 +++++++ uv.lock | 2 +- 5 files changed, 46 insertions(+), 5 deletions(-) diff --git a/src/keboola_agent_cli/commands/storage.py b/src/keboola_agent_cli/commands/storage.py index 9596cede..2e4f1e30 100644 --- a/src/keboola_agent_cli/commands/storage.py +++ b/src/keboola_agent_cli/commands/storage.py @@ -735,7 +735,16 @@ def storage_create_table( formatter.console.print(f" Time partitioning: {time_partitioning['type']}{suffix}") range_partitioning = result.get("range_partitioning") if range_partitioning: - formatter.console.print(f" Range partitioning: {range_partitioning['field']}") + bounds = range_partitioning.get("range") or {} + bounds_suffix = "" + if bounds: + bounds_suffix = ( + f" [{bounds.get('start')}, {bounds.get('end')})" + f" step {bounds.get('interval')}" + ) + formatter.console.print( + f" Range partitioning: {range_partitioning['field']}{bounds_suffix}" + ) clustering = result.get("clustering") if clustering: formatter.console.print(f" Clustering: {', '.join(clustering['fields'])}") diff --git a/src/keboola_agent_cli/services/storage_service.py b/src/keboola_agent_cli/services/storage_service.py index e5bb5f38..aa8a6357 100644 --- a/src/keboola_agent_cli/services/storage_service.py +++ b/src/keboola_agent_cli/services/storage_service.py @@ -909,13 +909,19 @@ def create_table( Raises: ValueError: Malformed column spec or ``--default`` assignment; ``--not-null`` / ``--default`` references an unknown column; - ``columns`` and ``source`` both/neither given; incomplete or - conflicting partitioning flags; or BigQuery-only features - requested on a non-BigQuery backend. + ``columns`` and ``source`` both/neither given; + ``source_branch_id`` given without ``source_table_id``; + incomplete or conflicting partitioning flags; or BigQuery-only + features requested on a non-BigQuery backend. """ not_null_set = set(not_null_columns or []) defaults_map = _parse_default_assignments(defaults) + # --source-branch-id only qualifies a source table; on its own it would + # be silently dropped (source mode never activates). Fail fast instead. + if source_branch_id is not None and source_table_id is None: + raise ValueError("--source-branch-id requires --source-table-id.") + source = _build_source(source_table_id, source_branch_id) time_partitioning, range_partitioning, clustering = _build_bigquery_layout( time_partitioning_type, @@ -1063,6 +1069,14 @@ def create_table( ), "action": "skipped", "skip_reason": "table already exists", + # Keep the JSON envelope shape identical to the + # "created" path; the existing table's layout is not + # re-derived here, so the source/layout keys are null. + "source_table_id": None, + "source_branch_id": None, + "time_partitioning": None, + "range_partitioning": None, + "clustering": None, } raise legacy_branch_storage = _detect_legacy_branch_storage(client, branch_id) diff --git a/tests/test_storage_create_table.py b/tests/test_storage_create_table.py index 3f665ab3..4dfdf530 100644 --- a/tests/test_storage_create_table.py +++ b/tests/test_storage_create_table.py @@ -193,6 +193,17 @@ def test_default_rejected_in_source_mode(self, tmp_path: Path) -> None: defaults=["id=0"], ) + def test_source_branch_id_without_source_table_rejected(self, tmp_path: Path) -> None: + service = _make_service(_make_store(tmp_path), MagicMock()) + with pytest.raises(ValueError, match="--source-branch-id requires --source-table-id"): + service.create_table( + alias="test", + bucket_id="in.c-main", + name="t", + columns=["id:INTEGER"], + source_branch_id=42, + ) + def test_incomplete_range_partitioning_rejected(self, tmp_path: Path) -> None: service = _make_service(_make_store(tmp_path), MagicMock()) with pytest.raises(ValueError, match="--range-partitioning requires all"): diff --git a/tests/test_storage_write.py b/tests/test_storage_write.py index 9808fa28..d3000473 100644 --- a/tests/test_storage_write.py +++ b/tests/test_storage_write.py @@ -1092,6 +1092,13 @@ def test_skip_on_existing_when_flag_set(self, tmp_path: Path) -> None: assert result["action"] == "skipped" assert result["skip_reason"] == "table already exists" assert result["table_id"] == "in.c-b.users" + # The skipped envelope carries the same source/layout keys as the + # "created" path (null here) so the JSON schema stays consistent. + assert result["source_table_id"] is None + assert result["source_branch_id"] is None + assert result["time_partitioning"] is None + assert result["range_partitioning"] is None + assert result["clustering"] is None mock_client.get_table_detail.assert_called_once_with("in.c-b.users", branch_id=None) mock_client.close.assert_called_once() diff --git a/uv.lock b/uv.lock index c576a31c..9e5ad008 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.12" resolution-markers = [ "python_full_version >= '3.14' and sys_platform == 'win32'",