kbagent is three things at once, and you can program against any of them:
| Surface | Import / entrypoint | Use it when | Reference |
|---|---|---|---|
| CLI | kbagent ... (subprocess) |
shell scripts, AI agents, CI steps, anything that shells out | README, kbagent --help |
| In-process SDK | from keboola_agent_cli import Client |
a Python process that already has the token and wants Keboola data without spawning a subprocess or a daemon | this document |
| HTTP REST | kbagent serve + any HTTP client |
a separate process / language (JS, Go), a Web UI, a Slack bot | build-your-own-client.md |
This guide is the deep reference for the middle row: the importable, stateless Client facade introduced in issue #415 and given typed return contracts in #428.
Working demo:
examples/storage_tui/is a runnable terminal app (curses) that browses a real project's Storage entirely through this SDK. Read it alongside this doc.
import os
from keboola_agent_cli import Client
with Client(url=os.environ["KBC_URL"], token=os.environ["KBC_TOKEN"]) as kbc:
rows = kbc.query(workspace_id, "SELECT id, name FROM customers") # list[dict]
meta = kbc.files.upload(b"hello", name="greeting.txt", tags=["demo"])
data = kbc.files.read_bytes(meta.id) # bytes
job = kbc.run_job("keboola.ex-db-snowflake", "12345", wait=True) # JobResultIt is:
- Stateless. A
Clientholds the stack URL, the token, one pooled HTTP client (with the shared retry/backoff), and an optional idempotency store. Nothing else. - Config-dir-free. No
~/.config/keboola-agent-cli/, nokbagent project add, noconfig.json. Auth is the token you pass in (12-factor: you readKBC_TOKENyourself). - In-process. No CLI subprocess, no
kbagent servedaemon. Just function calls. - Typed.
py.typedships in the wheel; the high-traffic operations return pydantic models (JobResult,QueryResult, ...). Yourmypy/ty/IDE sees the shapes.
It is not:
- Not the whole CLI. The facade exposes the high-value operations (Query Service, Storage Files, run-job, config detail, table upload). For everything else there is
Client.raw. - Not config-aware. It does not resolve project aliases, default branches from a config file, or linked variable values. You pass IDs explicitly.
- Not auto-creating.
upload_tablerequires the table to already exist (the CLI's auto-create-bucket-and-table path lives in the service layer, which the facade deliberately does not carry).
kbagent is a strict 3-layer codebase (see CONTRIBUTING.md):
CLI command ─┐
REST route ─┼─► Service (services/*.py) ─► KeboolaClient (client.py) ─► HTTP
SDK facade ─┘ business logic endpoints, retry
(lib.py)
The SDK facade (lib.py) is a fourth caller that sits beside the CLI and REST layers, but it skips the service layer and talks to KeboolaClient directly, re-assembling the high-level shapes (list[dict] rows, bytes, FileEntry, the typed models) that the service layer would otherwise build for the CLI.
Consequence for contributors: a facade method is intentionally thinner than the matching service method. Client.run_job does not auto-resolve linked variable values the way JobService.run_job does; Client.upload_table does not auto-create the bucket/table the way StorageService.upload_table does. When you add or change a facade method, decide consciously which service conveniences to replicate and which to omit — and document the omission in the docstring (the existing methods do this).
import os
from keboola_agent_cli import Client
# 12-factor: the token is your responsibility to source. Never hardcode it.
kbc = Client(url=os.environ["KBC_URL"], token=os.environ["KBC_TOKEN"])
try:
for bucket in kbc.raw.list_buckets():
print(bucket["id"], bucket.get("description", ""))
finally:
kbc.close()
# ...or as a context manager (preferred — closes the HTTP client for you):
with Client(url=os.environ["KBC_URL"], token=os.environ["KBC_TOKEN"]) as kbc:
...Client(...) raises ValueError immediately if url or token is empty — fail-fast, no silent default.
To scope every call to a dev branch, pass branch_id:
with Client(url=URL, token=TOKEN, branch_id=1234) as kbc:
rows = kbc.query(ws_id, "SELECT 1") # runs on branch 1234branch_id=None (the default) targets production: Storage Files use the production scope and query() resolves the project's default branch on first use.
Every method below is committed public API; signatures change only under semver.
Runs SQL in a workspace via the Query Service inline-results fast path (no CSV materialization) and returns rows as dicts keyed by column name.
rows = kbc.query(workspace_id, "SELECT id, name FROM customers LIMIT 10")
# [{"ID": "1", "NAME": "Acme"}, ...]Three gotchas baked into the contract:
- Snowflake string-typing. Values are returned exactly as the Query Service serializes them and are not coerced by the facade. For Snowflake every scalar comes back as a JSON string (
1→"1",1.5→"1.5",true→"true"), with SQLNULLasNone. Cast on your side. - Column-name case. Snowflake folds unquoted aliases to UPPERCASE —
SELECT id AS fookeys the dict as"FOO". Quote the alias (AS "foo") for lowercase. - Multi-statement. With several statements, the rows of the last statement that produced a result set win (
USE ...; SELECT ...yields the SELECT). Statements without a result set yield[].
Over-limit results are capped and a warning is logged — raise limit to fetch more.
Same execution as query, but returns the full typed QueryResult — columns (in warehouse order), rows, truncated, total_rows, and a .row_count property. Use it when you need column ordering or want to detect a limit cap:
res = kbc.query_result(ws_id, "SELECT * FROM big_table", limit=500)
if res.truncated:
print(f"showing {res.row_count} of {res.total_rows} rows")
print(res.columns) # ["ID", "NAME", "CREATED_AT"]Creates a Queue API job and — with wait=True — polls until terminal (or timeout). Returns a typed JobResult with .succeeded / .failed convenience properties.
job = kbc.run_job("keboola.ex-db-snowflake", "12345", wait=True, timeout=600)
if job.failed:
raise RuntimeError(f"job {job.id} ended {job.status}")Full signature knobs: config_row_ids, variable_values_id (the facade does not auto-resolve linked variable values — pass it explicitly if the config needs one), branch_id, mode (run/debug/forceRun), poll_strategy, and the idempotency pair below.
Idempotency (#427). The Queue API has no server-side idempotency, so the facade does it client-side against a store you supply (it has no config-dir to default to):
from keboola_agent_cli import JobIdempotencyStore
store = JobIdempotencyStore(path="/my/app/checkpoints/jobs.json")
with Client(url=URL, token=TOKEN, idempotency_store=store) as kbc:
job = kbc.run_job("keboola.ex-db-snowflake", "12345",
wait=True, idempotency_key="nightly-2026-06-17")
if job.idempotent_replay:
print("a prior run for this key was returned — no new side effect")A prior still-running or non-failed job is returned (with idempotent_replay=True) instead of firing a duplicate; a prior failed run is re-run; force_rerun=True always creates fresh. Passing idempotency_key without a store raises ValueError.
Fetches one configuration's detail as a typed ConfigDetailResult (id, name, description, version, configuration, rows, ...).
cfg = kbc.config_detail("keboola.ex-db-snowflake", "12345")
print(cfg.name, cfg.version)
host = cfg.configuration["parameters"]["db"]["host"]Imports a CSV into an existing Storage table. Unlike the CLI, it does not auto-create a missing bucket/table — use kbagent storage upload-table for that.
res = kbc.upload_table("in.c-crm.customers", "customers.csv", incremental=True)
print(res.imported_rows, res.warnings)A Files helper bound to the client's project/branch. See §6.
Escape hatch for endpoints the facade omits. See §7.
close() closes the pooled HTTP client. Prefer with Client(...) as kbc: so it always runs.
result_models.py defines the stable return shapes (JobResult, QueryResult, UploadTableResult, ConfigDetailResult, SyncPushResult, CloneResult), all re-exported from the package root. They exist so a downstream consumer types against a semver-versioned contract instead of an undocumented dict[str, Any] — a contract change then surfaces at type-check time, not at runtime against a customer build.
Two design rules every model follows (_ApiResultModel base):
-
extra="allow"— forward-compatible. The backing Keboola APIs grow fields across stack versions. An SDK contract must never raise because the server returned more than the documented subset. Only the named fields are the committed surface; everything else is preserved as model extras (reachable via attribute access andmodel_dump()), so nothing is lost.job = kbc.run_job("keboola.ex-db-snowflake", "12345", wait=True) job.status # named field — committed contract job.model_extra # {"branchId": ..., "durationSeconds": ..., "runId": ...} — preserved, not typed
-
populate_by_name=True— alias-tolerant. Each model accepts both the snake_case field name and the raw API key (declared viaAliasChoices), soModel.model_validate(service_dict)works directly on a service-layer dict without renaming.JobResultreadsisFinished/is_finished,componentId/component/component_id, etc.
Convenience properties carry semantic meaning so callers don't re-derive it: JobResult.succeeded / .failed, QueryResult.row_count, SyncPushResult.ok, CloneResult.ok.
The committed surface is
__all__. Anything exported fromkeboola_agent_cli(Client,Files,FileEntry, the six result models,JobIdempotencyStore,__version__) is public API under semver. Renaming or removing a named field, or tightening a type, is a breaking change.
Client.files is a Files instance bound to the project (and branch). It returns a uniform FileEntry regardless of whether the API response happened to include a signed URL.
# upload bytes (name required) or a path (name derived from basename)
meta = kbc.files.upload(b"col1,col2\n1,2\n", name="data.csv", tags=["nightly"])
meta = kbc.files.upload("report.pdf", tags=["report"], permanent=True)
# list as FileEntry records (tags AND-filter, name full-text query, pagination)
entries = kbc.files.list(tags=["nightly"], limit=50)
for e in entries:
print(e.id, e.name, e.size_bytes, e.is_permanent)
# read the whole file into memory (handles sliced + gzip transparently)
raw_bytes = kbc.files.read_bytes(meta.id)
kbc.files.delete(meta.id)FileEntry fields: id, name, tags, created, size_bytes, is_permanent, and raw (the untouched API dict for anything the facade doesn't surface).
Memory note: read_bytes holds the whole payload in RAM — fine for results, manifests, and small exports. For multi-GB tables, stream to disk via Client.raw instead.
The facade is intentionally small. For any endpoint it omits — listing buckets/tables, table previews, workspaces, sharing, dev branches — reach the underlying KeboolaClient:
with Client(url=URL, token=TOKEN) as kbc:
buckets = kbc.raw.list_buckets() # list[dict]
tables = kbc.raw.list_tables(bucket_id="in.c-crm") # list[dict]
detail = kbc.raw.get_table_detail("in.c-crm.customers") # dict
csv = kbc.raw.get_table_data_preview("in.c-crm.customers", limit=20) # CSV strraw returns dict/list[dict]/str straight from the API — not the typed models. It is the lower-level, less-stable surface: useful, but you own the parsing. The storage TUI demo is built almost entirely on these four raw methods.
already in a Python separate process / a shell / an
process with the token? language? AI agent?
───────────────────────────────────────────────────────────────────────────────
in-process SDK ✅ best
kbagent serve (REST) ✅ best
kbagent CLI ✅ best
- SDK when you are already a Python process holding the token — a Keboola Data App, a transformation, a hosted FastAPI service, a one-off automation script. Lowest latency (no subprocess, no daemon), typed, 12-factor auth.
- REST (
kbagent serve) when the caller is a different process or language — a React/Next.js UI, a Go service, a Slack bot. One server, many clients, auth via a bearer token. Spec: build-your-own-client.md. - CLI when you are in a shell or an AI agent that shells out — scripted glue, CI steps,
claude/codex/geminitasks. Structured--jsonon every command.
They share the same KeboolaClient underneath, so behavior (retry, error shapes) is identical across all three.
The importable surface is a committed contract, so changing it is a deliberate act with its own checklist — parallel to "Adding a New CLI Command" but smaller. Add a facade method (or a result model) when an in-process consumer would otherwise have to drop to Client.raw and hand-assemble a high-value shape.
Adding a Client method:
- Write it in
lib.py, callingself._client(theKeboolaClient) directly — do not import the service layer (that would drag config-dir/orchestration assumptions into a stateless facade). Re-assemble the high-level shape yourself, the way the existing methods do. - Decide which service conveniences to omit (auto-create, alias resolution, linked-variable resolution) and state the omission in the docstring — the existing methods set this precedent (
run_job,upload_table). - Return a typed model, not a bare dict, for anything non-trivial (see below).
- Keyword-only for everything past the required positionals (
*,), matching the existing signatures — it keeps call sites readable and lets you add knobs without breaking positional callers.
Adding a result model (result_models.py):
- Subclass
_ApiResultModel(gives youextra="allow"+populate_by_name). Never setextra="forbid"— the API grows fields and the contract must not raise. - Type only the stable subset you are willing to commit to under semver. Everything else stays a model extra.
- Use
Field(validation_alias=AliasChoices("rawApiKey", "snake_case_name"))soModel.model_validate(api_or_service_dict)works without renaming. - Add semantic
@propertyhelpers (.succeeded,.ok) rather than making callers re-derive state.
Then, the contract chores:
- Export it from
__init__.pyand add it to__all__— that list is the public API. make typecheckmust stay clean — the SDK is the one place where types are a user-facing promise, not just an internal aid.- Test it in
tests/(facade tests mockKeboolaClient; model tests assert alias-acceptance andextrapreservation). - Document it here in §4 / §5, and update the one-liner in the README "Use as a library" section if it's a headline capability.
- Treat removals/renames as breaking — a field rename or a type tightening is a semver-major-worthy change; prefer adding over mutating.
| Gotcha | Why | What to do |
|---|---|---|
| Snowflake values are strings | Query Service serializes scalars as JSON strings; the facade does not coerce | cast on your side (int(row["N"])) |
| Column keys are UPPERCASE | Snowflake folds unquoted aliases | quote the alias for lowercase |
query returns only the last result set |
multi-statement SQL | split, or put the SELECT last |
run_job ignores linked variable values |
the facade is not config-aware | pass variable_values_id explicitly |
upload_table won't create the table |
no service/config context | create it first, or use the CLI |
idempotency_key raises without a store |
stateless facade has no config-dir | pass idempotency_store= |
read_bytes loads the whole file into RAM |
convenience over streaming | stream via Client.raw for huge tables |
branch_id=None means production |
default scope | pass a branch_id to target a dev branch |
examples/storage_tui/— runnable curses demo built on this SDK.- build-your-own-client.md — the REST surface (
kbagent serve) for non-Python callers. - CONTRIBUTING.md — full 3-layer architecture and coding conventions.