From 7097061a3a16cdaa3b5d7971ff8dc53c41e30772 Mon Sep 17 00:00:00 2001 From: Sergio Marcelino Date: Wed, 27 May 2026 17:13:13 -0300 Subject: [PATCH 1/3] feat: move database to configuration --- database/README.md | 66 ++++- database/config.yaml | 7 - database/config.yaml.example | 28 +- database/skills/SKILL.md | 117 ++++++++ database/skills/iii-database/execute.md | 136 ---------- .../iii-database/interactive-transaction.md | 240 ----------------- .../iii-database/prepared-statements.md | 168 ------------ database/skills/iii-database/query.md | 126 --------- database/skills/iii-database/transaction.md | 165 ------------ database/skills/index.md | 36 --- database/src/config.rs | 253 ++++++++++++++++-- database/src/configuration.rs | 176 ++++++++++++ database/src/handlers/begin_transaction.rs | 5 +- database/src/handlers/commit_transaction.rs | 2 +- database/src/handlers/execute.rs | 7 +- database/src/handlers/mod.rs | 8 +- database/src/handlers/prepare.rs | 5 +- database/src/handlers/query.rs | 9 +- database/src/handlers/rollback_transaction.rs | 2 +- database/src/handlers/run_statement.rs | 5 +- database/src/handlers/transaction.rs | 7 +- database/src/handlers/transaction_execute.rs | 2 +- database/src/lib.rs | 1 + database/src/main.rs | 78 ++++-- database/tests/e2e/README.md | 30 ++- database/tests/e2e/config.yaml | 50 +--- database/tests/e2e/run-tests.sh | 46 +++- .../harness/fixtures/database.schema.json | 177 ++++++++++++ .../tests/e2e/workers/harness/package.json | 1 + .../workers/harness/src/database-config.ts | 34 +++ .../workers/harness/src/seed-configuration.ts | 88 ++++++ database/tests/integration.rs | 15 +- 32 files changed, 1072 insertions(+), 1018 deletions(-) delete mode 100644 database/config.yaml create mode 100644 database/skills/SKILL.md delete mode 100644 database/skills/iii-database/execute.md delete mode 100644 database/skills/iii-database/interactive-transaction.md delete mode 100644 database/skills/iii-database/prepared-statements.md delete mode 100644 database/skills/iii-database/query.md delete mode 100644 database/skills/iii-database/transaction.md delete mode 100644 database/skills/index.md create mode 100644 database/src/configuration.rs create mode 100644 database/tests/e2e/workers/harness/fixtures/database.schema.json create mode 100644 database/tests/e2e/workers/harness/src/database-config.ts create mode 100644 database/tests/e2e/workers/harness/src/seed-configuration.ts diff --git a/database/README.md b/database/README.md index 833be8fc..73272b5a 100644 --- a/database/README.md +++ b/database/README.md @@ -17,26 +17,64 @@ iii worker add database@1.0.0 ## Configure -Add a single `databases` block to your `config.yaml`. SQLite is the recommended starting point — no server, just a file: +Runtime settings live in the **`configuration` worker** under id **`database`**. The worker registers its JSON Schema at startup, reads the live value via `configuration::get`, and hot-reloads connection pools when the value changes. + +Persisted values default to `./data/configuration/database.yaml` (fs adapter). Edit that file directly or call `configuration::set` — both propagate without a worker restart. + +### Zero-config default + +With no seed file and no stored configuration value, the worker uses a built-in default: ```yaml -workers: - - name: database - config: - databases: - primary: - url: sqlite:./data/iii.db - pool: - max: 10 - idle_timeout_ms: 30000 - acquire_timeout_ms: 5000 - analytics: - url: ${ANALYTICS_URL} # postgres:// or mysql:// - pool: { max: 5 } +databases: + primary: + url: sqlite:./data/iii.db + pool: + max: 10 + idle_timeout_ms: 30000 + acquire_timeout_ms: 5000 ``` +This is seeded into the `configuration` worker on first register and used as a runtime fallback when the stored value is `null`. + +### Optional seed file + +Pass `--config ` to supply a YAML seed file. When present, its `databases` block is passed as `initial_value` on `configuration::register` (overriding the built-in default for first-time registration). See [`config.yaml.example`](config.yaml.example). + +Engine-managed deployments can inline config under the worker entry; the engine delivers it via `--config` as before. + +### Value shape + +SQLite is the recommended starting point — no server, just a file: + +```yaml +databases: + primary: + url: sqlite:./data/iii.db + pool: + max: 10 + idle_timeout_ms: 30000 + acquire_timeout_ms: 5000 + analytics: + url: ${ANALYTICS_URL:postgres://localhost/analytics} + pool: { max: 5 } +``` + +Set or replace the whole value: + +```bash +iii trigger configuration::get id=database +iii trigger configuration::set id=database value='{"databases":{"primary":{"url":"sqlite:./data/iii.db"}}}' +``` + +Env placeholders use **`${VAR:default}`** syntax. The configuration worker expands them on every `configuration::get` call, so env changes propagate without a restart. + URL scheme picks the driver: `sqlite:`, `postgres://`, `postgresql://`, `mysql://`. +### Hot reload + +When configuration changes (`configuration::set`, or an external edit to `./data/configuration/database.yaml`), the worker rebuilds connection pools in place. Invalid configs are rejected and the previous pools are kept. In-flight prepared-statement handles and open transactions continue on their original pool until they expire. + ### TLS (postgres + mysql) Postgres and mysql connections default to **`tls.mode: require`** — TLS handshake required, certificate chain validated against the system trust store, hostname verification skipped (matches libpq's `sslmode=require`). Override per-database: diff --git a/database/config.yaml b/database/config.yaml deleted file mode 100644 index 7887696c..00000000 --- a/database/config.yaml +++ /dev/null @@ -1,7 +0,0 @@ -databases: - primary: - url: sqlite:./iii.db - pool: - max: 10 - idle_timeout_ms: 30000 - acquire_timeout_ms: 5000 diff --git a/database/config.yaml.example b/database/config.yaml.example index 5ed41b7e..31834442 100644 --- a/database/config.yaml.example +++ b/database/config.yaml.example @@ -1,10 +1,18 @@ -workers: - - name: database - config: - databases: - primary: - url: sqlite:./data/iii.db - pool: - max: 10 - idle_timeout_ms: 30000 - acquire_timeout_ms: 5000 +# Optional seed file for first-time registration (--config ./config.yaml.example). +# When omitted, the worker seeds the built-in default: +# +# databases: +# primary: +# url: sqlite:./data/iii.db +# pool: { max: 10, idle_timeout_ms: 30000, acquire_timeout_ms: 5000 } +# +# After the first boot, the runtime source of truth is the `configuration` +# worker entry `database`, persisted at ./data/configuration/database.yaml. + +databases: + primary: + url: sqlite:./data/iii.db + pool: + max: 10 + idle_timeout_ms: 30000 + acquire_timeout_ms: 5000 diff --git a/database/skills/SKILL.md b/database/skills/SKILL.md new file mode 100644 index 00000000..1336c290 --- /dev/null +++ b/database/skills/SKILL.md @@ -0,0 +1,117 @@ +--- +name: database +description: >- + Run SQL against PostgreSQL, MySQL, or SQLite from the iii engine — reads, + writes, transactions, and prepared statements over managed connection pools. +--- + +# database + +The database worker connects to PostgreSQL, MySQL, and SQLite through a +managed per-database connection pool. Every callable surface lives under +the `database::*` namespace. The driver is chosen from each database URL +scheme (`sqlite:`, `postgres://`, `postgresql://`, `mysql://`). + +Runtime settings live in the `configuration` worker under id `database`; +pools hot-reload when the value changes. SQLite is the recommended starting +point. Placeholder syntax: `?` for SQLite and MySQL, `$1`/`$2`/… for Postgres. + +## When to Use + +- You need to read rows from a configured database (`database::query`). +- You need to insert, update, delete, or run DDL and read affected-row + counts or autoincrement ids (`database::execute`). +- Several statements must commit or roll back together as one unit + (`database::transaction` or the interactive transaction surface). +- The same parameterized SQL will run many times and you want to skip + per-call parse/plan cost (`database::prepareStatement` + + `database::runStatement`). +- You need read-your-writes across round-trips with logic between steps + (`database::beginTransaction` … `commitTransaction` / `rollbackTransaction`). +- You want to react to Postgres row-level changes once logical replication + streaming ships (`database::row-change` trigger — see below). + +## Boundaries + +- Not a migration tool, ORM, or schema designer — pass raw SQL only. +- Not a general pub/sub bus — use `database::row-change` only for Postgres + table change feeds, not for application events. +- `database::query` is read-oriented; use `database::execute` for writes. + Running a SELECT through `execute` discards rows. +- Prepared handles pin a pool connection until TTL expiry — not transactions. + Batch `database::transaction` needs every statement up front; use the + interactive surface when code must branch between steps. +- MySQL ignores the `returning` option on `execute` (warn-once). SQLite + degrades `read_committed` / `repeatable_read` isolation to serializable. +- For filesystem or shell operations, use the `shell` worker instead. + +## Functions + +- `database::query` — run read-only SQL and return rows, row count, and + column metadata. +- `database::execute` — run write SQL (INSERT/UPDATE/DELETE/DDL) and + return affected rows, optional last insert id, and optional RETURNING rows. +- `database::prepareStatement` — parse and plan SQL once; return a handle + that pins a pool connection until TTL expiry. +- `database::runStatement` — re-execute a prepared handle with new bind + params; response shape matches `query`. +- `database::transaction` — run an ordered batch of statements atomically; + rolls back on first failure and reports `failed_index`. +- `database::beginTransaction` — open an interactive transaction and + return an id plus expiry deadline. +- `database::transactionQuery` — read SQL inside an open interactive + transaction; same envelope as `query`. +- `database::transactionExecute` — write SQL inside an open interactive + transaction; same envelope as `execute`. Rejects bare transaction-control + SQL — finalize via `commitTransaction` or `rollbackTransaction`. +- `database::commitTransaction` — commit and finalize an interactive + transaction. +- `database::rollbackTransaction` — roll back and finalize an interactive + transaction. + +Interactive transactions auto-roll back when `timeout_ms` elapses (default +30 s, max 5 min). Prepared handles default to a 1 h TTL (max 24 h) with no +explicit release call — let them expire or stop using them when done. + +## Reactive triggers + +Register a `database::row-change` trigger when a function should run +automatically on Postgres INSERT/UPDATE/DELETE for specific tables — without +polling with `database::query`. + +Reach for it when: + +- A downstream worker or workflow must react to row mutations in near real + time on Postgres. +- You need decoded row payloads (old/new values) from logical replication + rather than polling an outbox table. + +Do not bind when: + +- The writer already has the new row in its `execute` or `transactionExecute` + return payload. +- You are on SQLite or MySQL — this trigger type is Postgres-only. +- You need events today — v1.0.0 returns `UNSUPPORTED` on `registerTrigger` + pending an upstream `tokio-postgres` replication API release. + +### How to bind + +1. Register a handler: `registerFunction('stream::on-row-change', handler)`. +2. Register the trigger: + +```typescript +iii.registerTrigger({ + type: 'database::row-change', + function_id: 'stream::on-row-change', + config: { + db: 'primary', + schema: 'public', + tables: ['orders', 'payments'], + // optional: slot_name, publication_name — see get function info + }, +}) +``` + +Config: `db`, `schema` (default `public`), `tables`. Slot/publication names +derive from `trigger_id` unless overridden. For event payload shape, call +`get function info` on the trigger type or handler function id. diff --git a/database/skills/iii-database/execute.md b/database/skills/iii-database/execute.md deleted file mode 100644 index 9bcc6609..00000000 --- a/database/skills/iii-database/execute.md +++ /dev/null @@ -1,136 +0,0 @@ ---- -type: how-to -function_id: database::execute -title: Run a write statement and return affected rows ---- - -# When to use - -Call `database::execute` for any write-side SQL — `INSERT`, `UPDATE`, -`DELETE`, or DDL (`CREATE TABLE`, `ALTER TABLE`, `DROP INDEX`, ...). The -response carries `affected_rows`, an optional `last_insert_id`, and a -`returned_rows` array populated when the caller asks for `RETURNING`-style -output. - -Reach for it when: - -- You need the count of rows the write touched (`affected_rows`). -- You inserted into a table with an autoincrement primary key and want - the newly-assigned id without a follow-up `SELECT`. -- You want a write to surface specific columns from each affected row - (e.g. server-defaulted `id` + `created_at`) — set `returning` on - Postgres or SQLite. - -Use [`database::query`](iii://database/query) instead when the -statement reads — `execute` does run a `SELECT` if you give it one but -discards the rows and reports `affected_rows: 0`, which is rarely what a -SELECT caller wants. - -Use [`database::transaction`](iii://database/transaction) instead -when you need several writes to commit atomically — `execute` runs each -call as its own implicit transaction. - -# Inputs - -```json -{ - "db": "primary", // required; key from your `databases:` config - "sql": "INSERT INTO users (email) VALUES (?) RETURNING id", // required; non-empty after trim - "params": ["a@x"], // optional; bound positionally - "returning": ["id", "created_at"] // optional; SQLite + Postgres only — see Driver compatibility -} -``` - -`db` and `sql` are required. Empty/whitespace-only `sql` is rejected -uniformly with `DRIVER_ERROR` carrying `message: "empty SQL"` (matches -[`database::query`](iii://database/query)'s contract). - -`params` accepts JSON primitives, arrays, and objects exactly like -`query` — same per-driver placeholder syntax (`?` for sqlite/mysql, -`$1`/`$2`/... for postgres) and same `INVALID_PARAM` rule for -out-of-range numbers. - -`returning` is the column projection the driver fills into -`returned_rows` when supported. **MySQL ignores it** (warns once and -returns `returned_rows: []`); SQLite implements it via the native -`RETURNING` clause; Postgres reads it from the SQL's own `RETURNING` -list. To stay portable, prefer writing the `RETURNING ...` clause -directly in `sql` for sqlite/postgres rather than passing -`returning: [...]`. - -# Outputs - -```json -{ - "affected_rows": 1, // rows the engine reports as inserted/updated/deleted - "last_insert_id": "42", // string-encoded; null when not applicable — see below - "returned_rows": [{ "id": 42, "created_at": "..." }] // empty array when no RETURNING is used -} -``` - -- `affected_rows` is the engine-reported write count. DDL statements - return `0` on every driver. -- `last_insert_id` is **always a JSON string or `null`** so the field can - carry sequence values that overflow JS `Number.MAX_SAFE_INTEGER`. It is - populated only for inserts: - - **SQLite / MySQL**: surfaces the engine's `last_insert_rowid()` / - `LAST_INSERT_ID()` for the connection. **Only set on INSERT** — an - `UPDATE` that runs immediately after an `INSERT` on the same pooled - connection returns `null` (not the prior INSERT's rowid). Falls back - to `null` when the engine reports `0` (no INSERT has run on that - connection yet). - - **Postgres**: has no engine-level `LASTVAL()` equivalent in this - surface. The worker reads the **first column of the first - `RETURNING` row** as the id. Put your primary key first: - `RETURNING id, name` works; `RETURNING name, id` returns `name` as - `last_insert_id`. With no `RETURNING` clause, the field is `null`. -- `returned_rows` mirrors the row-of-objects shape from - [`database::query`](iii://database/query). Empty `[]` when the - statement omits `RETURNING` or runs on MySQL. - -# Worked example - -Insert one row and capture the autoincrement id (SQLite or MySQL): - -```json -{ - "db": "primary", - "sql": "INSERT INTO users (email) VALUES (?)", - "params": ["a@x"] -} -``` - -Returns `{ "affected_rows": 1, "last_insert_id": "1", "returned_rows": [] }`. - -Same intent on Postgres — the worker pulls `last_insert_id` from the -first `RETURNING` column, so put `id` first: - -```json -{ - "db": "primary", - "sql": "INSERT INTO users (email) VALUES ($1) RETURNING id, email", - "params": ["a@x"] -} -``` - -Returns -`{ "affected_rows": 1, "last_insert_id": "1", "returned_rows": [{ "id": 1, "email": "a@x" }] }`. - -A bulk update reports the count and leaves `last_insert_id` null -(no INSERT happened): - -```json -{ - "db": "primary", - "sql": "UPDATE users SET active = ? WHERE last_seen_at < ?", - "params": [false, "2026-01-01T00:00:00Z"] -} -``` - -Returns `{ "affected_rows": 17, "last_insert_id": null, "returned_rows": [] }`. - -# Related - -- `database::query` — for read SQL; returns materialized rows + column metadata instead of `affected_rows`. -- `database::transaction` — group several writes into one atomic batch with rollback on first failure. -- `database::prepareStatement` + `database::runStatement` — re-run the same parameterized write many times without re-parsing on each call. diff --git a/database/skills/iii-database/interactive-transaction.md b/database/skills/iii-database/interactive-transaction.md deleted file mode 100644 index 8aa6fb00..00000000 --- a/database/skills/iii-database/interactive-transaction.md +++ /dev/null @@ -1,240 +0,0 @@ ---- -type: how-to -functions: [database::beginTransaction, database::transactionQuery, database::transactionExecute, database::commitTransaction, database::rollbackTransaction] -title: Run a stateful transaction across multiple RPC calls with a timeout-driven auto-rollback ---- - -# When to use - -Use the interactive transaction surface when you need **read-your-writes -across several round-trips** inside one transaction — e.g. issue a write, -inspect the resulting state, branch in your application code, then issue -follow-up writes that all commit (or all roll back) together. The worker -pins one pool connection inside a server-side `BEGIN ... COMMIT/ROLLBACK` -under a UUID handle; subsequent calls re-use the same connection. - -Reach for it when: - -- You need to take a decision in application code *between* statements - that must commit together (the batch - [`database::transaction`](iii://database/transaction) requires - every statement up-front, so it can't carry inter-statement logic). -- You want a single transaction id to thread through a long-running - workflow without holding a network connection open in your code. -- You need stronger-than-default isolation for a multi-step read+write - flow (pass `isolation: "serializable"` to `beginTransaction`). - -Use [`database::transaction`](iii://database/transaction) instead -when every statement is known in advance — it skips the registry overhead -and the per-call round-trip cost. Use -[`database::execute`](iii://database/execute) for one-off writes; -each `execute` call is its own auto-committed transaction. - -# Lifecycle - -``` -beginTransaction(db, isolation?, timeout_ms?) → { transaction: { id, expires_at } } - ├─ transactionQuery ( transaction_id, sql, params ) → { rows, row_count, columns } - ├─ transactionExecute ( transaction_id, sql, params ) → { affected_rows, last_insert_id, returned_rows } - ├─ ... (repeat) ... - ├─ commitTransaction ( transaction_id ) → { committed: true } - └─ rollbackTransaction ( transaction_id ) → { rolled_back: true } -``` - -After `commitTransaction` / `rollbackTransaction` returns, the id is gone -— any subsequent call against it returns `TRANSACTION_NOT_FOUND`. If the -configured `timeout_ms` elapses before either finalizer lands, the worker -auto-rolls back and removes the id; the next call also gets -`TRANSACTION_NOT_FOUND`. - -# `database::beginTransaction` - -## Inputs - -```json -{ - "db": "primary", // required; key from your `databases:` config - "isolation": "serializable", // optional; read_committed | repeatable_read | serializable - "timeout_ms": 30000 // optional; default 30000, clamped to max 300000 (5 min) -} -``` - -`db` is required. `isolation` accepts the same three values as the batch -[`database::transaction`](iii://database/transaction); any other -string is rejected with `INVALID_PARAM`. On SQLite, `read_committed` / -`repeatable_read` log a one-line `tracing::warn!` and fall back to -`BEGIN IMMEDIATE` (serializable in practice). - -`timeout_ms` is the **total lifetime of the transaction**, not an -inactivity timeout. It defaults to 30 s and is clamped at 5 min so a -buggy client can't pin a pool connection indefinitely. A background -sweep task fires `ROLLBACK` once the deadline elapses, then removes the -id from the registry. - -## Outputs - -```json -{ - "transaction": { - "id": "550e8400-e29b-41d4-a716-446655440000", - "expires_at": "2026-05-19T18:30:00Z" - } -} -``` - -`transaction.id` is the opaque UUID every subsequent handler accepts. -Treat it as a session token: scope it to one workflow, hand it off -between functions if needed, but never share it across unrelated requests. - -`transaction.expires_at` is RFC 3339 UTC; once it elapses the worker has -already auto-rolled back. Re-issue `beginTransaction` and start over; -the original id is gone. - -# `database::transactionQuery` / `database::transactionExecute` - -The envelopes are **identical to the standalone -[`query`](iii://database/query) and -[`execute`](iii://database/execute)** handlers — same row-of-objects -shape, same `columns` metadata, same `affected_rows` / -`last_insert_id` / `returned_rows` semantics. The only difference: SQL -runs on the pinned transaction connection. - -```json -{ - "transaction_id": "550e8400-...", // required; id from beginTransaction - "sql": "SELECT n FROM t WHERE id = ?", - "params": [42] -} -``` - -`transactionExecute` adds a `returning` array exactly like -[`execute`](iii://database/execute) for Postgres + SQLite -`RETURNING` clauses; MySQL ignores it (logged warn-once). - -`transactionExecute` **rejects** bare `BEGIN`, `COMMIT`, `ROLLBACK`, -`END`, `SAVEPOINT`, `RELEASE`, and `SET TRANSACTION ...` SQL with -`INVALID_PARAM` (`reason` points at `commitTransaction` / -`rollbackTransaction`). Finalization is a first-class handler, not a -side-channel — this makes the lifecycle observable: every commit and -every rollback shows up as its own engine call + log event. - -Concurrent calls against the same `transaction_id` serialize on the -per-conn mutex. The worker doesn't pipeline statements within one -transaction. - -# `database::commitTransaction` / `database::rollbackTransaction` - -```json -{ "transaction_id": "550e8400-..." } -``` - -Returns `{ "committed": true }` or `{ "rolled_back": true }` on success. -The id is removed atomically before the connection is locked, so a -racing `transactionQuery` / `transactionExecute` either landed before -finalization (succeeded inside the same transaction) or after -(returns `TRANSACTION_NOT_FOUND`). - -If `commitTransaction` fails (e.g. serialization conflict in Postgres), -the worker issues a best-effort `ROLLBACK` before returning the error — -the pool's recycler doesn't run rollback for us, so without this the -next caller on the recycled connection would see "current transaction is -aborted, commands ignored". - -# Errors - -| Code | Surface | Meaning | -|---|---|---| -| `UNKNOWN_DB` | `beginTransaction` | `db` not in your `databases:` config. | -| `INVALID_PARAM` | `beginTransaction` | Unknown `isolation`. | -| `INVALID_PARAM` | `transactionExecute` | Transaction-control SQL — use the dedicated finalizer handler. | -| `TRANSACTION_NOT_FOUND` | any of `transactionQuery` / `transactionExecute` / `commitTransaction` / `rollbackTransaction` | Id unknown, already finalized, or timed out. | -| `POOL_TIMEOUT` | `beginTransaction` | Pool was busy when `beginTransaction` tried to acquire a connection. Bump `pool.max` or shorten the longest-running transaction. | -| `DRIVER_ERROR` | any of the above | Wraps the underlying driver error with `inner_code`; same shape as elsewhere. | - -# Observability - -Every call lands on its own engine-managed OTel span. The worker emits -structured log events via `iii_sdk::Logger` that attach to the active -span, including `db.system`, `db.name`, `db.transaction.id`, -`db.operation`, and `db.statement` (no params — those can carry PII). - -Key event names you can grep for in your trace backend: - -- `db_tx_started` — info on `beginTransaction` success. -- `db_tx_statement` — debug per `transactionQuery` / `transactionExecute`. -- `db_tx_statement_failed` — warn on driver-level failure inside a tx. -- `db_tx_unknown` — warn when an id is missing/finalized/timed-out. -- `db_tx_committed` / `db_tx_rolled_back` — info on explicit finalization (carries `duration_ms`). -- `db_tx_commit_failed` — error on COMMIT failure (also indicates whether the follow-up rollback succeeded). -- `db_tx_timed_out` — warn from the background watcher when the deadline auto-rolls back. -- `db_tx_timeout_rollback_failed` — error when the timeout-driven ROLLBACK itself fails (rare). - -# Worked example - -Transfer money between two accounts with a read-then-write check that -must see the updated balance before deciding whether to credit: - -```ts -const { transaction } = await iii.trigger({ - function_id: 'database::beginTransaction', - payload: { - db: 'primary', - isolation: 'serializable', - timeout_ms: 5000, - }, -}) - -try { - const debit = await iii.trigger({ - function_id: 'database::transactionExecute', - payload: { - transaction_id: transaction.id, - sql: 'UPDATE accounts SET balance = balance - ? WHERE id = ? AND balance >= ?', - params: [10, 1, 10], - }, - }) - if (debit.affected_rows !== 1) { - await iii.trigger({ - function_id: 'database::rollbackTransaction', - payload: { transaction_id: transaction.id }, - }) - throw new Error('insufficient funds') - } - await iii.trigger({ - function_id: 'database::transactionExecute', - payload: { - transaction_id: transaction.id, - sql: 'UPDATE accounts SET balance = balance + ? WHERE id = ?', - params: [10, 2], - }, - }) - await iii.trigger({ - function_id: 'database::commitTransaction', - payload: { transaction_id: transaction.id }, - }) -} catch (e) { - // Best-effort rollback; the worker is happy to see TRANSACTION_NOT_FOUND - // if a prior step already rolled it back or the deadline elapsed. - try { - await iii.trigger({ - function_id: 'database::rollbackTransaction', - payload: { transaction_id: transaction.id }, - }) - } catch {/* ignore */} - throw e -} -``` - -# Related - -- [`database::transaction`](iii://database/transaction) — atomic - batch when every statement is known up-front; skip the round-trip - overhead of the interactive surface. -- [`database::query`](iii://database/query) / - [`database::execute`](iii://database/execute) — one-off - read/write outside any transaction. -- [`database::prepareStatement` + - `runStatement`](iii://database/prepared-statements) — pin a - connection for repeated parameterized calls **without** transactional - semantics. Useful for read-heavy workloads where the prepared plan is - the bottleneck. diff --git a/database/skills/iii-database/prepared-statements.md b/database/skills/iii-database/prepared-statements.md deleted file mode 100644 index 8ba45fb6..00000000 --- a/database/skills/iii-database/prepared-statements.md +++ /dev/null @@ -1,168 +0,0 @@ ---- -type: how-to -functions: [database::prepareStatement, database::runStatement] -title: Prepare a SQL statement once, run it many times against a pinned connection ---- - -# When to use - -Use the `prepareStatement` + `runStatement` pair when a single SQL string -will run repeatedly with different bind values, or when you need -session-scoped state (temp tables, `SET LOCAL`, transaction snapshots) to -persist across several calls. `prepareStatement` parses + plans the SQL -once and **pins a pool connection under a UUID handle**; `runStatement` -re-executes that handle with new params on the same connection. - -| Question | Use this | -|------------------------------------------------------------------|-----------------------------------------------------------| -| Will this exact SQL run more than a handful of times? | `database::prepareStatement` first, then re-use. | -| Do I have a UUID handle already? | `database::runStatement` with new `params`. | -| Need session-scoped state (e.g. a Postgres advisory lock) across calls? | `database::prepareStatement` pins one connection. | -| Just running this SQL once? | [`database::query`](iii://database/query) — no handle, no pool pinning. | - -**A live handle pins one pool connection until its TTL expires.** The -default TTL is 1 hour and the cap is 24 hours; while a handle exists, -`pool.max - 1` connections are available for everyone else. Hold only as -many handles as you need concurrently, and let them expire (or simply -stop calling them) when the workload ends — there is no `release` or -`close` function. - -Use [`database::query`](iii://database/query) instead when the -SQL runs only once. Use -[`database::transaction`](iii://database/transaction) instead -when several statements need atomic commit/rollback semantics — handles -are not transactions; commit boundaries are defined by the SQL you run -through them. - -# `database::prepareStatement` - -## Inputs - -```json -{ - "db": "primary", // required; key from your `databases:` config - "sql": "SELECT id, email FROM users WHERE id > ?", // required; non-empty after trim - "ttl_seconds": 3600 // optional; default 3600, capped at 86400 -} -``` - -`db` and `sql` are required. Empty/whitespace-only `sql` is rejected -with `DRIVER_ERROR` carrying `message: "empty SQL"`. The guard runs -**before** the worker acquires a pool connection, so a typo cannot -silently leak a pinned conn. - -`ttl_seconds` values above 86400 (24 h) are silently clamped down — no -error is returned. A background evictor sweeps expired entries every -30 s; a `runStatement` against an entry whose TTL elapsed between -evictor sweeps fails fast with `STATEMENT_NOT_FOUND` and removes the -entry as a side effect. - -## Outputs - -```json -{ - "handle": { - "id": "550e8400-e29b-41d4-a716-446655440000", // RFC 4122 v4 UUID - "expires_at": "2026-05-19T18:30:00Z" // RFC 3339 UTC; now + ttl_seconds (clamped) - } -} -``` - -- `handle.id` is the opaque UUID `runStatement` accepts. Stable until - expiry; treat it as a session token. -- `handle.expires_at` is when the worker will stop honouring the - handle. After that point `runStatement` returns `STATEMENT_NOT_FOUND`. - -# `database::runStatement` - -## Inputs - -```json -{ - "handle_id": "550e8400-e29b-41d4-a716-446655440000", // required; UUID returned by prepareStatement - "params": [42] // optional; positionally bound at run time -} -``` - -`handle_id` is required and must reference a live handle. Unknown or -expired ids return `STATEMENT_NOT_FOUND`; the response carries the -`handle_id` echo so callers can correlate failures. - -`params` follows the same JSON-to-driver coercion as -[`database::query`](iii://database/query). The placeholder -syntax matches the SQL given to `prepareStatement` (`?` for -sqlite/mysql, `$1`/`$2`/... for postgres). - -There is **no `timeout_ms`** input on `runStatement`. The handle pins a -pool connection for its full TTL, and per-call timeouts would not -short-circuit the underlying network round-trip on Postgres or MySQL — -configure the per-call ceiling via the connection's session lifetime -(e.g. `statement_timeout` in `postgresql.conf`) or shorten -`ttl_seconds` on `prepareStatement`. - -## Outputs - -```json -{ - "rows": [{ "id": 1, "email": "a@x" }], // row-of-objects, same shape as `database::query` - "row_count": 1, - "columns": [ - { "name": "id", "type_name": "INTEGER" }, - { "name": "email", "type_name": "TEXT" } - ] -} -``` - -The envelope is bit-for-bit identical to -[`database::query`](iii://database/query) — same row coercion -rules, same `columns[i]` metadata, same empty-result handling. Callers -can share one parser for both surfaces. - -`runStatement` does not surface write counts or `last_insert_id`. To -re-run an INSERT/UPDATE/DELETE many times and read those fields, use -[`database::execute`](iii://database/execute) per call — write -statements are typically cheap to re-parse and the prepared path saves -less than the pool-pinning cost. - -# Worked example - -Prepare a paginated SELECT once, then advance the cursor twice. Same -flow on every driver; the SQL changes its placeholder syntax. - -SQLite or MySQL — prepare: - -```json -{ - "db": "primary", - "sql": "SELECT id, body FROM outbox WHERE id > ? ORDER BY id LIMIT 50", - "ttl_seconds": 3600 -} -``` - -Returns -`{ "handle": { "id": "550e8400-...", "expires_at": "..." } }`. - -Run with cursor `0` (first page): - -```json -{ "handle_id": "550e8400-...", "params": [0] } -``` - -Run again with cursor `50` (second page) on the same handle: - -```json -{ "handle_id": "550e8400-...", "params": [50] } -``` - -If a `runStatement` returns `STATEMENT_NOT_FOUND` mid-loop, the handle -expired (or the worker process restarted): re-prepare and continue from -the last successfully-read cursor. Do **not** retry the same -`handle_id`. - -# Related - -- `database::query` — drop the handle altogether for one-shot reads; same response envelope. -- `database::execute` — for repeated writes; pair its own `last_insert_id` with `affected_rows` per call. -- `database::transaction` — group writes into an atomic batch instead of holding a pinned connection across many calls. -- Error code `STATEMENT_NOT_FOUND` — re-prepare and retry with the new `handle.id`; the old one is gone. -- Error code `POOL_TIMEOUT` — too many live handles can starve the pool. Bump `pool.max` in your `databases:` config or shorten `ttl_seconds`. diff --git a/database/skills/iii-database/query.md b/database/skills/iii-database/query.md deleted file mode 100644 index d81cd2ee..00000000 --- a/database/skills/iii-database/query.md +++ /dev/null @@ -1,126 +0,0 @@ ---- -type: how-to -function_id: database::query -title: Run a read-only SQL query and return rows ---- - -# When to use - -Call `database::query` for any read-side SQL — `SELECT`, `WITH`, -`PRAGMA`, `EXPLAIN`, anything that produces a result set you want -materialized as JSON. The response carries the rows as objects keyed by -column name plus a `columns` array with per-column type metadata, so -callers don't need to issue a follow-up describe call. - -Reach for it when: - -- You need a row-of-objects shape for a UI table or a JSON-returning - function. Each row is `{column_name: value}`, not a positional array. -- You only run the SQL once or a small handful of times. Bind parameters - inline via `params` rather than reaching for prepared statements. -- You want column-name + driver-type metadata alongside the rows - (`columns[i].name` and `columns[i].type_name`). - -Use [`database::execute`](iii://database/execute) instead when the -statement writes (INSERT/UPDATE/DELETE/DDL) — `query` only returns rows; -write counts and `last_insert_id` come from `execute`. - -Use [`database::prepareStatement` + `runStatement`](iii://database/prepared-statements) -instead when you'll re-run the same SQL many times in a hot loop — the -prepared path skips the per-call parse/plan cost and pins a pool -connection so isolation primitives like temp tables stay alive across -calls. - -# Inputs - -```json -{ - "db": "primary", // required; key from your `databases:` config - "sql": "SELECT id, email FROM users WHERE id > ?", // required; non-empty after trim - "params": [42], // optional; bound positionally (`?` for sqlite/mysql, `$1`/`$2`/... for postgres) - "timeout_ms": 30000 // optional; per-call cap, default 30000 -} -``` - -`db` and `sql` are required. Empty/whitespace-only `sql` is rejected at -the handler boundary with a `DRIVER_ERROR` carrying `message: "empty SQL"` -— this is uniform across all three drivers (Postgres' `tokio-postgres` -treats empty SQL as a successful no-op, sqlite/mysql parse-error it; the -worker normalizes the contract). - -`params` accepts JSON primitives (`null`, `bool`, integer, float, -string), arrays, and objects — arrays/objects bind as the driver's JSON -type. A number that fits neither `i64` nor `f64` is rejected with -`INVALID_PARAM` carrying the offending index. Placeholder syntax differs -per driver: `?` for sqlite and mysql, `$1`/`$2`/... for postgres. - -`timeout_ms` exceeded yields `QUERY_TIMEOUT` with the `db` and the -configured cap. - -# Outputs - -```json -{ - "rows": [ // row-of-objects, ordered to match the SQL - { "id": 1, "email": "a@x" }, - { "id": 2, "email": "b@x" } - ], - "row_count": 2, // == rows.length; convenience field - "columns": [ // per-column metadata in projection order - { "name": "id", "type_name": "INTEGER" }, - { "name": "email", "type_name": "TEXT" } - ] -} -``` - -- `rows` is always an array of objects keyed by `columns[i].name`. Empty - result sets return `[]`, not `null`. -- `row_count` is exactly `rows.length`; included so callers can branch on - presence without re-measuring. -- `columns[i].type_name` is the driver's native type name (e.g. - `INTEGER`, `TEXT`, `int4`, `varchar`); use it as a hint, not a - contract. -- Cell coercion follows fixed rules: - - Integer columns become JSON numbers when they fit `i64`. - - 64-bit identity columns serialize as JSON **strings** to preserve - precision past `Number.MAX_SAFE_INTEGER` in JS clients. - - `BYTEA` / `BLOB` cells are base64-encoded strings (standard alphabet, - with padding). - - `TIMESTAMP` / `TIMESTAMPTZ` cells are RFC 3339 strings in UTC - (`...Z`), seconds precision. - - `NUMERIC` / `DECIMAL` cells are JSON strings (no precision loss). - - `JSON` / `JSONB` columns pass through as the embedded value. - -# Worked example - -Read every user past a given id, with the cursor bound positionally. - -SQLite or MySQL: - -```json -{ - "db": "primary", - "sql": "SELECT id, email FROM users WHERE id > ? ORDER BY id LIMIT ?", - "params": [42, 100] -} -``` - -Postgres uses numbered placeholders for the same call: - -```json -{ - "db": "primary", - "sql": "SELECT id, email FROM users WHERE id > $1 ORDER BY id LIMIT $2", - "params": [42, 100] -} -``` - -Both return the same envelope; the `columns[i].type_name` strings will -differ (e.g. sqlite `INTEGER` vs postgres `int4`) but `rows` are -shape-compatible. - -# Related - -- `database::execute` — for the write side (INSERT/UPDATE/DELETE/DDL); returns affected-row counts instead of materialized rows. -- `database::prepareStatement` + `database::runStatement` — re-run the same SQL many times without re-parsing; also pins a pool connection so session-scoped state (temp tables, `SET LOCAL`, ...) survives across calls. -- `database::transaction` — group several statements (mixed read/write) into one atomic batch with a single `committed` flag. diff --git a/database/skills/iii-database/transaction.md b/database/skills/iii-database/transaction.md deleted file mode 100644 index 6a954fa8..00000000 --- a/database/skills/iii-database/transaction.md +++ /dev/null @@ -1,165 +0,0 @@ ---- -type: how-to -function_id: database::transaction -title: Run a sequence of statements atomically with rollback on first failure ---- - -# When to use - -Call `database::transaction` when several SQL statements must commit -or roll back together — the canonical "transfer money between accounts" -shape, plus any multi-step write that would leave the DB in an -inconsistent state if a later statement failed. The worker opens a -transaction on a freshly-acquired pool connection, runs every statement -in sequence, and commits if (and only if) every step returned without -error. - -Reach for it when: - -- You're running two or more writes that share a consistency invariant - (e.g. debit + credit, parent + child rows, denormalized counter - updates). -- You need a stronger isolation level than the engine's default — pass - `isolation: "serializable"` (or `"repeatable_read"` on Postgres/MySQL) - to upgrade just this batch. -- You want a single response that tells you *which* statement failed - when something rolls back (`failed_index`). - -Use [`database::execute`](iii://database/execute) instead for a -single write — `execute` is implicitly its own transaction and skips the -multi-statement framing cost. Use -[`database::prepareStatement` + `runStatement`](iii://database/prepared-statements) -instead when the goal is repeating one parameterized statement many -times rather than committing several different statements as a unit. - -# Inputs - -```json -{ - "db": "primary", // required; key from your `databases:` config - "statements": [ // required; array, run in order - { "sql": "INSERT INTO accounts (id, balance) VALUES (?, ?)", "params": [1, 100] }, - { "sql": "INSERT INTO accounts (id, balance) VALUES (?, ?)", "params": [2, 0] } - ], - "isolation": "serializable" // optional; one of read_committed | repeatable_read | serializable -} -``` - -`db` and `statements` are required; an empty `statements` array -commits a no-op transaction (`committed: true`, `results: []`). Each -statement carries its own `sql` (non-empty, like -[`database::query`](iii://database/query) and `execute`) plus -optional `params` with the same JSON-to-driver coercion rules. - -`isolation` is optional and accepts exactly three values: -`"read_committed"`, `"repeatable_read"`, `"serializable"`. Anything else -(including the empty string) is rejected with `INVALID_PARAM` carrying -`reason: "unknown isolation \`\`"`. Per-driver behaviour: - -- **Postgres**: maps directly to `BEGIN ISOLATION LEVEL ...`. -- **MySQL**: maps directly to `SET TRANSACTION ISOLATION LEVEL ...` - before `BEGIN`. -- **SQLite**: only supports a single isolation level. `serializable` - uses `BEGIN IMMEDIATE`; `read_committed` and `repeatable_read` log a - one-line `tracing::warn!` and fall back to `BEGIN IMMEDIATE`. The - call still succeeds; check your worker logs if you expect strict - isolation semantics on SQLite. - -Omitting `isolation` uses the driver's session default -(`READ COMMITTED` on Postgres + MySQL; serializable on SQLite by -construction). - -# Outputs - -The response shape changes based on `committed`. The two shapes never -overlap; success has no `failed_index`/`error`, failure has no -`results`. - -Success: - -```json -{ - "committed": true, - "results": [ // one entry per input statement, same order - { "affected_rows": 1, "rows": [] }, // statement 0 - { "affected_rows": 1, "rows": [] } // statement 1 - ] -} -``` - -Failure (rollback): - -```json -{ - "committed": false, - "failed_index": 1, // 0-based index of the offending statement; absent for non-step failures - "error": { // structured DbError; same shape returned by `query`/`execute` on driver failure - "code": "DRIVER_ERROR", - "driver": "sqlite", - "inner_code": null, - "message": "constraint failed: NOT NULL on accounts.id", - "failed_index": 1 - } -} -``` - -- `committed` is the truthy/falsy split. The transaction either - committed every statement or rolled back every statement; partial - commit is impossible. -- `results` is present only on success. Each entry mirrors the - `affected_rows` count `database::execute` would have produced for - that statement, plus a positional `rows` array (NOT keyed by column — - this is intentionally lighter than `query`'s row-of-objects shape; - parse with the input order in mind). -- `failed_index` is present only when the failing error carries a - per-statement index — i.e. a `DRIVER_ERROR` thrown by one of the - inputs. **Connection-level failures leave it absent**: `POOL_TIMEOUT` - on acquire, `UNKNOWN_DB`, a `BEGIN` that fails, or any error that - isn't tied to a specific input statement. Treat absence as "the - transaction never started" or "the failure spans the batch", not as - "step 0 failed". -- `error` is the same JSON-tagged `DbError` returned elsewhere on - driver-level failure (`code` is the stable discriminant, `driver` / - `inner_code` / `message` are diagnostic). - -# Worked example - -Two related INSERTs that must land together: - -```json -{ - "db": "primary", - "statements": [ - { "sql": "INSERT INTO accounts (id, balance) VALUES (?, ?)", "params": [1, 100] }, - { "sql": "UPDATE accounts SET balance = balance - ? WHERE id = ?", "params": [10, 1] }, - { "sql": "UPDATE accounts SET balance = balance + ? WHERE id = ?", "params": [10, 2] } - ], - "isolation": "serializable" -} -``` - -Returns `{ "committed": true, "results": [...] }` with `affected_rows` -populated per step. - -If the third UPDATE hits a `NOT NULL` constraint failure, the response -becomes: - -```json -{ - "committed": false, - "failed_index": 2, - "error": { "code": "DRIVER_ERROR", "driver": "sqlite", "message": "...", "failed_index": 2 } -} -``` - -The first two statements rolled back; no row in `accounts` reflects the -debit, no row reflects the credit, and the original INSERT did not -persist. Re-issue the entire transaction call (don't retry only the -failed step) once the underlying constraint condition is fixed. - -# Related - -- [`database::beginTransaction` + `transactionQuery` / `transactionExecute` + `commitTransaction` / `rollbackTransaction`](iii://database/interactive-transaction) — **stateful interactive** transaction with a configurable timeout-driven auto-rollback. Use this surface when you need to take a decision in application code *between* statements (read-your-writes across round-trips). The batch handler on this page requires every statement up-front, so it can't carry that inter-statement logic. -- `database::execute` — single-statement variant; skips the BEGIN/COMMIT framing for one-shot writes. -- `database::query` — read-only; cannot be combined with writes inside this surface but is fine to mix into the `statements` array if you only need its rows for `affected_rows`-equivalent counts. -- `database::prepareStatement` + `database::runStatement` — for repeating one parameterized statement many times; not a substitute for atomic multi-statement commit. diff --git a/database/skills/index.md b/database/skills/index.md deleted file mode 100644 index 31bdfdc6..00000000 --- a/database/skills/index.md +++ /dev/null @@ -1,36 +0,0 @@ ---- -type: index -title: database ---- - -# database - -Connect to PostgreSQL, MySQL, and SQLite from the iii engine. Run read-only -queries, write statements, atomic transactions, and prepared-statement -sequences over a managed per-database connection pool. Every callable -surface lives under the single `database::*` namespace; SQLite is the -recommended starting point because it needs no server, just a file. - -The worker resolves the driver from each database's URL scheme (`sqlite:`, -`postgres://`, `postgresql://`, `mysql://`). For the `databases:` config -block, TLS modes, error-code reference, and the per-driver compatibility -table (e.g. `returning:` is a no-op on MySQL; SQLite degrades -`read_committed` / `repeatable_read` to `serializable`), see -[the README](../README.md). - -## How-tos - -### `database::*` - -- [`database::query`](iii://database/query) — read-only SQL; returns `{ rows, row_count, columns }` for SELECT-style statements. -- [`database::execute`](iii://database/execute) — write SQL (INSERT/UPDATE/DELETE/DDL); returns `{ affected_rows, last_insert_id, returned_rows }`. -- [`database::prepareStatement` + `database::runStatement`](iii://database/prepared-statements) — prepare-once, run-many parameterized SQL; the prepare step pins a pool connection until TTL expiry, so always run before re-issuing the same statement many times. -- [`database::transaction`](iii://database/transaction) — atomic batch sequence; one call, pass every statement together, rolls back on first failure with a `failed_index` pointer at the offending step. -- [`database::beginTransaction` + `transactionQuery` / `transactionExecute` + `commitTransaction` / `rollbackTransaction`](iii://database/interactive-transaction) — stateful interactive transaction with a configurable timeout-driven auto-rollback. Use this when you need read-your-writes across several round-trips inside a single transaction. - -`database::row-change` (Postgres logical replication via `pgoutput`) is -registered as a trigger type but is **not yet functional in v1.0.0**: -`register_trigger` returns `UNSUPPORTED` while the streaming decode loop -waits on an upstream `tokio-postgres` replication API release. Operators -can pre-provision slots and publications now; see the **Triggers** section -of [the README](../README.md) for current status and cleanup commands. diff --git a/database/src/config.rs b/database/src/config.rs index 3dab0a0c..b1e703ef 100644 --- a/database/src/config.rs +++ b/database/src/config.rs @@ -1,33 +1,78 @@ //! Configuration parsing for the database worker. //! -//! The worker accepts a YAML file with a `databases:` map keyed by name. -//! Each entry has a `url` (whose scheme picks the driver) and an optional -//! `pool` block. Environment variables in the form `${NAME}` are expanded -//! against the process environment. - -use serde::Deserialize; +//! Runtime config is stored in the `configuration` worker under id `database`. +//! When no stored value and no `--config` seed exist, [`WorkerConfig::default`] +//! supplies a local SQLite pool. An optional YAML seed file (`--config`) may +//! override `initial_value` on first register. Each database entry has a `url` +//! (whose scheme picks the driver) and an optional `pool` block. The seed path +//! expands `${NAME}` against the process environment; values read from +//! `configuration::get` are already expanded by the configuration worker +//! (`${VAR:default}` syntax). + +use schemars::JsonSchema; +use schemars::schema::Schema; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; use std::collections::HashMap; -/// Top-level worker config (the contents of `config.yaml`, or the `config` -/// block of `iii-config.yaml` when running embedded). -#[derive(Debug, Clone, Deserialize)] +pub const DEFAULT_DB_NAME: &str = "primary"; +pub const DEFAULT_SQLITE_URL: &str = "sqlite:./data/iii.db"; + +/// Top-level worker config registered with the `configuration` worker. +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[schemars(example = "worker_config_example")] pub struct WorkerConfig { #[serde(default)] + #[schemars(schema_with = "databases_schema")] pub databases: HashMap, } -#[derive(Debug, Clone, Deserialize)] +fn worker_config_example() -> WorkerConfig { + WorkerConfig::default() +} + +fn databases_schema(gen: &mut schemars::gen::SchemaGenerator) -> Schema { + let mut schema = gen.subschema_for::>(); + if let Schema::Object(obj) = &mut schema { + obj.metadata().description = Some( + "Named connection pools. Keys are logical database names referenced \ + by RPC handlers (for example `primary`). At least one entry is required." + .into(), + ); + obj.metadata().examples = vec![json!({ + "primary": { + "url": DEFAULT_SQLITE_URL, + "pool": { + "max": 10, + "idle_timeout_ms": 30000, + "acquire_timeout_ms": 5000 + } + } + })]; + if let Some(validation) = obj.object.as_mut() { + validation.min_properties = Some(1); + } + } + schema +} + +/// Per-database connection settings. The URL scheme selects the driver; +/// `pool` and `tls` are optional and default when omitted. +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] pub struct DatabaseConfig { + /// Connection URL. Driver is inferred from the scheme: `sqlite:`, + /// `postgres://` or `postgresql://`, or `mysql://`. pub url: String, #[serde(default)] pub pool: PoolConfig, #[serde(default)] pub tls: TlsConfig, - /// Populated by [`WorkerConfig::from_yaml`] from the URL scheme. + /// Populated by [`WorkerConfig::finalize`] from the URL scheme. /// Do not construct `DatabaseConfig` directly without calling - /// `detect_driver` — the default `Sqlite` value will silently mismatch + /// `finalize` — the default `Sqlite` value will silently mismatch /// the URL. #[serde(skip)] + #[schemars(skip)] pub driver: DriverKind, } @@ -39,8 +84,9 @@ pub struct DatabaseConfig { /// (matching libpq's `sslmode=require` semantics). Use `mode: verify-full` /// to additionally verify the certificate hostname matches the URL host, /// and `mode: disable` to opt out of TLS entirely (local-dev only). -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] pub struct TlsConfig { + /// TLS mode: `disable` (plaintext), `require` (default), or `verify-full`. #[serde(default)] pub mode: TlsMode, /// Optional path to a PEM file containing one or more CA certificates. @@ -80,7 +126,7 @@ fn default_trust_native() -> bool { true } -#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)] +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] #[serde(rename_all = "kebab-case")] pub enum TlsMode { /// No TLS. Plaintext connection. Local-dev only. @@ -103,12 +149,15 @@ pub enum DriverKind { Sqlite, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] pub struct PoolConfig { + /// Maximum number of open connections in the pool. #[serde(default = "default_pool_max")] pub max: u32, + /// Close idle connections after this many milliseconds. #[serde(default = "default_idle_timeout_ms")] pub idle_timeout_ms: u64, + /// Fail pool acquisition when no connection is available within this many milliseconds. #[serde(default = "default_acquire_timeout_ms")] pub acquire_timeout_ms: u64, } @@ -133,11 +182,80 @@ fn default_acquire_timeout_ms() -> u64 { 5_000 } +impl Default for WorkerConfig { + fn default() -> Self { + Self::default_unchecked() + } +} + impl WorkerConfig { + fn default_unchecked() -> Self { + Self::finalize(WorkerConfig { + databases: HashMap::from([( + DEFAULT_DB_NAME.to_string(), + DatabaseConfig { + url: DEFAULT_SQLITE_URL.to_string(), + pool: PoolConfig::default(), + tls: TlsConfig::default(), + driver: DriverKind::default(), + }, + )]), + }) + .expect("built-in default config is valid") + } + pub fn from_yaml(yaml: &str) -> Result { let expanded = expand_env(yaml); - let mut cfg: WorkerConfig = + let cfg: WorkerConfig = serde_yml::from_str(&expanded).map_err(|e| format!("yaml parse: {e}"))?; + Self::finalize(cfg) + } + + pub fn from_json(value: &Value) -> Result { + let cfg: WorkerConfig = + serde_json::from_value(value.clone()).map_err(|e| format!("json parse: {e}"))?; + Self::finalize(cfg) + } + + pub fn from_file(path: &str) -> Result { + let raw = std::fs::read_to_string(path).map_err(|e| format!("read {path}: {e}"))?; + Self::from_yaml(&raw) + } + + pub fn to_json(&self) -> Value { + serde_json::to_value(self).expect("WorkerConfig serializes") + } + + pub fn json_schema() -> Value { + let root = schemars::schema_for!(WorkerConfig); + let mut schema = serde_json::to_value(&root.schema).expect("WorkerConfig JSON Schema serializes"); + if let Some(obj) = schema.as_object_mut() { + if !root.definitions.is_empty() { + obj.insert( + "definitions".into(), + serde_json::to_value(&root.definitions).expect("definitions serialize"), + ); + } + obj.insert( + "example".into(), + json!({ + "databases": { + DEFAULT_DB_NAME: { + "url": DEFAULT_SQLITE_URL, + "pool": { + "max": default_pool_max(), + "idle_timeout_ms": default_idle_timeout_ms(), + "acquire_timeout_ms": default_acquire_timeout_ms(), + } + } + } + }), + ); + } + schema + } + + fn finalize(mut cfg: WorkerConfig) -> Result { if cfg.databases.is_empty() { return Err("config must declare at least one database".into()); } @@ -151,11 +269,6 @@ impl WorkerConfig { } Ok(cfg) } - - pub fn from_file(path: &str) -> Result { - let raw = std::fs::read_to_string(path).map_err(|e| format!("read {path}: {e}"))?; - Self::from_yaml(&raw) - } } /// Strip the userinfo from a URL-like string for safe logging. @@ -266,6 +379,104 @@ mod tests { WorkerConfig::from_yaml(yaml).unwrap() } + #[test] + fn from_json_parses_sqlite_database() { + let json = serde_json::json!({ + "databases": { + "primary": { "url": "sqlite:./data/iii.db" } + } + }); + let c = WorkerConfig::from_json(&json).unwrap(); + assert!(matches!(c.databases["primary"].driver, DriverKind::Sqlite)); + assert_eq!(c.databases["primary"].url, "sqlite:./data/iii.db"); + } + + #[test] + fn from_json_empty_databases_errors() { + let err = WorkerConfig::from_json(&serde_json::json!({ "databases": {} })).unwrap_err(); + assert!(err.contains("at least one database"), "got: {err}"); + } + + #[test] + fn to_json_roundtrip_omits_driver() { + let yaml = "databases:\n p:\n url: \"sqlite::memory:\"\n"; + let cfg = cfg(yaml); + let json = cfg.to_json(); + assert!(json["databases"]["p"].get("driver").is_none()); + let back = WorkerConfig::from_json(&json).unwrap(); + assert!(matches!(back.databases["p"].driver, DriverKind::Sqlite)); + } + + #[test] + fn json_schema_is_object_with_databases_property() { + let schema = WorkerConfig::json_schema(); + assert!(schema.get("properties").and_then(|p| p.get("databases")).is_some()); + } + + #[test] + fn default_matches_expected_primary_sqlite() { + let cfg = WorkerConfig::default(); + assert_eq!(cfg.databases.len(), 1); + let db = &cfg.databases[DEFAULT_DB_NAME]; + assert_eq!(db.url, DEFAULT_SQLITE_URL); + assert!(matches!(db.driver, DriverKind::Sqlite)); + assert_eq!(db.pool.max, 10); + assert_eq!(db.pool.idle_timeout_ms, 30_000); + assert_eq!(db.pool.acquire_timeout_ms, 5_000); + } + + #[test] + fn default_json_roundtrips_and_omits_driver() { + let cfg = WorkerConfig::default(); + let json = cfg.to_json(); + assert!(json["databases"][DEFAULT_DB_NAME].get("driver").is_none()); + let back = WorkerConfig::from_json(&json).unwrap(); + assert_eq!(back.databases[DEFAULT_DB_NAME].url, DEFAULT_SQLITE_URL); + assert!(matches!( + back.databases[DEFAULT_DB_NAME].driver, + DriverKind::Sqlite + )); + } + + #[test] + fn json_schema_describes_url_and_requires_databases() { + let schema = WorkerConfig::json_schema(); + let databases = schema["properties"]["databases"].as_object().unwrap(); + assert!(databases.get("description").is_some()); + assert_eq!(databases["minProperties"], 1); + + let db_schema = schema["definitions"]["DatabaseConfig"].as_object().unwrap(); + let url = db_schema["properties"]["url"].as_object().unwrap(); + assert!(url.get("description").is_some()); + + let pool_schema = schema["definitions"]["PoolConfig"].as_object().unwrap(); + for field in ["max", "idle_timeout_ms", "acquire_timeout_ms"] { + assert!( + pool_schema["properties"][field].get("description").is_some(), + "missing description for pool.{field}" + ); + } + + assert!(schema.get("example").is_some()); + } + + /// Regenerate the e2e harness schema fixture when `WorkerConfig` changes: + /// `EXPORT_E2E_SCHEMA=1 cargo test -p database export_e2e_schema_fixture -- --ignored` + #[test] + #[ignore] + fn export_e2e_schema_fixture() { + if std::env::var("EXPORT_E2E_SCHEMA").is_err() { + return; + } + let schema = WorkerConfig::json_schema(); + let pretty = serde_json::to_string_pretty(&schema).expect("schema serializes"); + let path = concat!( + env!("CARGO_MANIFEST_DIR"), + "/tests/e2e/workers/harness/fixtures/database.schema.json" + ); + std::fs::write(path, pretty + "\n").expect("write schema fixture"); + } + #[test] fn parses_single_sqlite_database() { let yaml = r#" diff --git a/database/src/configuration.rs b/database/src/configuration.rs new file mode 100644 index 00000000..3e2e8650 --- /dev/null +++ b/database/src/configuration.rs @@ -0,0 +1,176 @@ +//! Integration with the `configuration` worker — register, fetch, and hot-reload +//! the `database` configuration entry. + +use crate::config::WorkerConfig; +use crate::handlers::AppState; +use crate::pool::{self, Pool}; +use iii_sdk::{ + III, IIIError, RegisterFunction, RegisterTriggerInput, TriggerRequest, +}; +use serde_json::{json, Value}; +use std::collections::HashMap; +use std::time::Duration; + +pub const CONFIG_ID: &str = "database"; +const CONFIG_FN_ID: &str = "database::on-config-change"; +const CONFIG_TIMEOUT_MS: u64 = 5_000; +const CONFIG_RETRIES: u32 = 3; + +/// Register the `database` configuration schema with the configuration worker. +/// When `seed` is present, its value is installed as `initial_value`. Otherwise, +/// built-in defaults are seeded only when no stored value exists yet. +pub async fn register_config(iii: &III, seed: Option<&WorkerConfig>) -> Result<(), String> { + let mut payload = json!({ + "id": CONFIG_ID, + "name": "Database", + "description": "Connection pools for PostgreSQL, MySQL, and SQLite.", + "schema": WorkerConfig::json_schema(), + }); + if let Some(seed) = seed { + payload["initial_value"] = seed.to_json(); + } else if should_seed_default_value(iii).await? { + payload["initial_value"] = WorkerConfig::default().to_json(); + } + trigger_with_retry(iii, "configuration::register", payload).await?; + Ok(()) +} + +/// Read the live `database` configuration (env-expanded by the configuration worker). +pub async fn fetch_config(iii: &III) -> Result { + let value = get_config_value(iii).await?; + if value.is_null() { + tracing::info!("no configuration value found; using built-in default configuration"); + return Ok(WorkerConfig::default()); + } + WorkerConfig::from_json(&value) +} + +async fn should_seed_default_value(iii: &III) -> Result { + match try_get_config_value(iii).await? { + None => Ok(true), + Some(value) if value.is_null() => Ok(true), + Some(_) => Ok(false), + } +} + +async fn get_config_value(iii: &III) -> Result { + try_get_config_value(iii) + .await? + .ok_or_else(|| format!("configuration `{CONFIG_ID}` not found")) +} + +/// Returns `Ok(None)` when the entry does not exist (`NOT_FOUND`). +async fn try_get_config_value(iii: &III) -> Result, String> { + match trigger_with_retry(iii, "configuration::get", json!({ "id": CONFIG_ID })).await { + Ok(resp) => Ok(resp.get("value").cloned()), + Err(e) if e.contains("NOT_FOUND") => Ok(None), + Err(e) => Err(e), + } +} + +/// Build connection pools for every configured database. +pub async fn build_pools(cfg: &WorkerConfig) -> Result, String> { + let mut pools = HashMap::new(); + for (name, db) in &cfg.databases { + let p = pool::build(name, db) + .await + .map_err(|e| serde_json::to_string(&e).unwrap_or_else(|_| e.to_string()))?; + tracing::info!(db = %name, driver = ?p.driver(), "pool ready"); + pools.insert(name.clone(), p); + } + Ok(pools) +} + +/// Replace in-memory pools with freshly built ones from `cfg`. +pub async fn apply_config(state: &AppState, cfg: WorkerConfig) -> Result<(), String> { + let new_pools = build_pools(&cfg).await?; + let mut guard = state.pools.write().await; + *guard = new_pools; + Ok(()) +} + +/// Register the internal config-change handler and bind a `configuration` trigger. +pub fn register_config_trigger(iii: &III, state: AppState) -> Result<(), IIIError> { + let st = state.clone(); + iii.register_function( + CONFIG_FN_ID, + RegisterFunction::new_async(move |payload: Value| { + let st = st.clone(); + async move { + on_config_change(&st, payload).await; + Ok::(json!({ "ok": true })) + } + }) + .description( + "Internal: reload connection pools when the database configuration changes.", + ), + ); + + iii.register_trigger(RegisterTriggerInput { + trigger_type: "configuration".to_string(), + function_id: CONFIG_FN_ID.to_string(), + config: json!({ + "configuration_id": CONFIG_ID, + "event_types": ["configuration:updated"], + }), + metadata: None, + })?; + Ok(()) +} + +async fn on_config_change(state: &AppState, payload: Value) { + let new_value = match payload.get("new_value") { + Some(v) if !v.is_null() => v.clone(), + _ => { + tracing::warn!("configuration event missing new_value; skipping pool reload"); + return; + } + }; + match WorkerConfig::from_json(&new_value) { + Ok(cfg) => match apply_config(state, cfg).await { + Ok(()) => tracing::info!("database pools reloaded after configuration change"), + Err(e) => tracing::error!( + error = %e, + "failed to rebuild pools after configuration change; keeping previous pools" + ), + }, + Err(e) => tracing::error!( + error = %e, + "invalid configuration payload; keeping previous pools" + ), + } +} + +async fn trigger_with_retry( + iii: &III, + function_id: &str, + payload: Value, +) -> Result { + let mut last_err = String::new(); + for attempt in 1..=CONFIG_RETRIES { + match iii + .trigger(TriggerRequest { + function_id: function_id.to_string(), + payload: payload.clone(), + action: None, + timeout_ms: Some(CONFIG_TIMEOUT_MS), + }) + .await + { + Ok(v) => return Ok(v), + Err(e) => { + last_err = e.to_string(); + if attempt < CONFIG_RETRIES { + tracing::warn!( + function_id, + attempt, + error = %last_err, + "configuration RPC failed; retrying" + ); + tokio::time::sleep(Duration::from_millis(250 * u64::from(attempt))).await; + } + } + } + } + Err(format!("{function_id} failed after {CONFIG_RETRIES} attempts: {last_err}")) +} diff --git a/database/src/handlers/begin_transaction.rs b/database/src/handlers/begin_transaction.rs index 43fb455c..071fb72f 100644 --- a/database/src/handlers/begin_transaction.rs +++ b/database/src/handlers/begin_transaction.rs @@ -55,7 +55,7 @@ fn parse_isolation(s: Option<&str>) -> Result, DbError> { } pub async fn handle(state: &AppState, req: BeginTxReq) -> Result { - let pool = state.pool(&req.db).map_err(err_to_str)?; + let pool = state.pool(&req.db).await.map_err(err_to_str)?; let driver = pool.driver(); let isolation = parse_isolation(req.isolation.as_deref()).map_err(err_to_str)?; let timeout_ms = req @@ -126,13 +126,14 @@ pub(crate) mod tests { use serde_json::{json, Value}; use std::collections::HashMap; use std::sync::Arc; + use tokio::sync::RwLock; pub fn state() -> AppState { let pool = SqlitePool::new("sqlite::memory:", &PoolConfig::default()).unwrap(); let mut pools = HashMap::new(); pools.insert("primary".to_string(), Pool::Sqlite(pool)); AppState { - pools: Arc::new(pools), + pools: Arc::new(RwLock::new(pools)), handles: Arc::new(HandleRegistry::new()), transactions: TxRegistry::new(), log: Logger::new(), diff --git a/database/src/handlers/commit_transaction.rs b/database/src/handlers/commit_transaction.rs index 89fe8a1a..9ea12fd0 100644 --- a/database/src/handlers/commit_transaction.rs +++ b/database/src/handlers/commit_transaction.rs @@ -156,7 +156,7 @@ mod tests { let mut pools = std::collections::HashMap::new(); pools.insert("primary".to_string(), crate::pool::Pool::Sqlite(pool)); let st = crate::handlers::AppState { - pools: std::sync::Arc::new(pools), + pools: std::sync::Arc::new(tokio::sync::RwLock::new(pools)), handles: std::sync::Arc::new(crate::handle::HandleRegistry::new()), transactions: crate::transaction::TxRegistry::new(), log: iii_observability::Logger::new(), diff --git a/database/src/handlers/execute.rs b/database/src/handlers/execute.rs index 0da8f6ee..0d068fe8 100644 --- a/database/src/handlers/execute.rs +++ b/database/src/handlers/execute.rs @@ -27,7 +27,7 @@ pub struct ExecuteResp { } pub async fn handle(state: &AppState, req: ExecuteReq) -> Result { - let pool = state.pool(&req.db).map_err(err_to_str)?; + let pool = state.pool(&req.db).await.map_err(err_to_str)?; // Reject empty SQL uniformly. See the matching guard in query.rs for why // this is at the handler boundary rather than per-driver: postgres' driver // accepts empty SQL as a no-op success, sqlite/mysql reject — guarding @@ -42,7 +42,7 @@ pub async fn handle(state: &AppState, req: ExecuteReq) -> Result driver::sqlite::execute(p, &req.sql, ¶ms, &req.returning).await, Pool::Postgres(p) => driver::postgres::execute(p, &req.sql, ¶ms, &req.returning).await, Pool::Mysql(p) => driver::mysql::execute(p, &req.sql, ¶ms, &req.returning).await, @@ -68,13 +68,14 @@ mod tests { use serde_json::json; use std::collections::HashMap; use std::sync::Arc; + use tokio::sync::RwLock; fn state() -> AppState { let pool = SqlitePool::new("sqlite::memory:", &PoolConfig::default()).unwrap(); let mut pools = HashMap::new(); pools.insert("primary".to_string(), Pool::Sqlite(pool)); AppState { - pools: Arc::new(pools), + pools: Arc::new(RwLock::new(pools)), handles: Arc::new(HandleRegistry::new()), transactions: crate::transaction::TxRegistry::new(), log: iii_observability::Logger::new(), diff --git a/database/src/handlers/mod.rs b/database/src/handlers/mod.rs index 0686b368..8def6d25 100644 --- a/database/src/handlers/mod.rs +++ b/database/src/handlers/mod.rs @@ -9,6 +9,7 @@ use crate::transaction::TxRegistry; use iii_observability::Logger; use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::RwLock; pub mod begin_transaction; pub mod commit_transaction; @@ -26,16 +27,19 @@ pub(crate) use query::rows_to_objects as query_rows_to_objects; #[derive(Clone)] pub struct AppState { - pub pools: Arc>, + pub pools: Arc>>, pub handles: Arc, pub transactions: TxRegistry, pub log: Logger, } impl AppState { - pub fn pool(&self, db: &str) -> Result<&Pool, DbError> { + pub async fn pool(&self, db: &str) -> Result { self.pools + .read() + .await .get(db) + .cloned() .ok_or_else(|| DbError::UnknownDb { db: db.to_string() }) } } diff --git a/database/src/handlers/prepare.rs b/database/src/handlers/prepare.rs index 9c5f5fd2..44423454 100644 --- a/database/src/handlers/prepare.rs +++ b/database/src/handlers/prepare.rs @@ -29,7 +29,7 @@ const MAX_TTL_SECONDS: u64 = 86_400; pub async fn handle(state: &AppState, req: PrepareReq) -> Result { let ttl = Duration::from_secs(req.ttl_seconds.min(MAX_TTL_SECONDS)); - let pool = state.pool(&req.db).map_err(err_to_str)?; + let pool = state.pool(&req.db).await.map_err(err_to_str)?; // Reject empty SQL at the handler boundary, mirroring query.rs / execute.rs. // Without this, prepareStatement happily acquires a pool connection and // pins it under a UUID handle that can never run successfully — the @@ -77,13 +77,14 @@ mod tests { use serde_json::{json, Value}; use std::collections::HashMap; use std::sync::Arc; + use tokio::sync::RwLock; fn state() -> AppState { let pool = SqlitePool::new("sqlite::memory:", &PoolConfig::default()).unwrap(); let mut pools = HashMap::new(); pools.insert("primary".to_string(), Pool::Sqlite(pool)); AppState { - pools: Arc::new(pools), + pools: Arc::new(RwLock::new(pools)), handles: Arc::new(HandleRegistry::new()), transactions: crate::transaction::TxRegistry::new(), log: iii_observability::Logger::new(), diff --git a/database/src/handlers/query.rs b/database/src/handlers/query.rs index b2ca8b0f..2c082c8a 100644 --- a/database/src/handlers/query.rs +++ b/database/src/handlers/query.rs @@ -32,7 +32,7 @@ fn default_timeout() -> u64 { /// Returns a JSON string body suitable to wrap in IIIError on failure. pub async fn handle(state: &AppState, req: QueryReq) -> Result { - let pool = state.pool(&req.db).map_err(err_to_str)?; + let pool = state.pool(&req.db).await.map_err(err_to_str)?; // Reject empty SQL uniformly. Postgres' tokio-postgres treats `client.query("")` // as a valid no-op and returns Ok([]), but sqlite (rusqlite) and mysql // (mysql_async) reject it at parse time — without this guard the worker's @@ -47,7 +47,7 @@ pub async fn handle(state: &AppState, req: QueryReq) -> Result driver::sqlite::query(p, &req.sql, ¶ms, req.timeout_ms).await, Pool::Postgres(p) => driver::postgres::query(p, &req.sql, ¶ms, req.timeout_ms).await, Pool::Mysql(p) => driver::mysql::query(p, &req.sql, ¶ms, req.timeout_ms).await, @@ -104,13 +104,14 @@ mod tests { use serde_json::json; use std::collections::HashMap; use std::sync::Arc; + use tokio::sync::RwLock; fn state() -> AppState { let pool = SqlitePool::new("sqlite::memory:", &PoolConfig::default()).unwrap(); let mut pools = HashMap::new(); pools.insert("primary".to_string(), Pool::Sqlite(pool)); AppState { - pools: Arc::new(pools), + pools: Arc::new(RwLock::new(pools)), handles: Arc::new(HandleRegistry::new()), transactions: crate::transaction::TxRegistry::new(), log: iii_observability::Logger::new(), @@ -124,7 +125,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn query_returns_rows_envelope() { let st = state(); - if let Pool::Sqlite(p) = st.pool("primary").unwrap() { + if let Pool::Sqlite(p) = st.pool("primary").await.unwrap() { let c = p.acquire().await.unwrap(); tokio::task::spawn_blocking(move || { c.with(|c| c.execute_batch("CREATE TABLE t (n INT); INSERT INTO t VALUES (1),(2);")) diff --git a/database/src/handlers/rollback_transaction.rs b/database/src/handlers/rollback_transaction.rs index bb40dbb6..1dcf6ba9 100644 --- a/database/src/handlers/rollback_transaction.rs +++ b/database/src/handlers/rollback_transaction.rs @@ -129,7 +129,7 @@ mod tests { let mut pools = std::collections::HashMap::new(); pools.insert("primary".to_string(), crate::pool::Pool::Sqlite(pool)); let st = crate::handlers::AppState { - pools: std::sync::Arc::new(pools), + pools: std::sync::Arc::new(tokio::sync::RwLock::new(pools)), handles: std::sync::Arc::new(crate::handle::HandleRegistry::new()), transactions: crate::transaction::TxRegistry::new(), log: iii_observability::Logger::new(), diff --git a/database/src/handlers/run_statement.rs b/database/src/handlers/run_statement.rs index 30a62db8..d939639d 100644 --- a/database/src/handlers/run_statement.rs +++ b/database/src/handlers/run_statement.rs @@ -51,13 +51,14 @@ mod tests { use serde_json::json; use std::collections::HashMap; use std::sync::Arc; + use tokio::sync::RwLock; fn state_in_memory() -> AppState { let pool = SqlitePool::new("sqlite::memory:", &PoolConfig::default()).unwrap(); let mut pools = HashMap::new(); pools.insert("primary".to_string(), Pool::Sqlite(pool)); AppState { - pools: Arc::new(pools), + pools: Arc::new(RwLock::new(pools)), handles: Arc::new(HandleRegistry::new()), transactions: crate::transaction::TxRegistry::new(), log: iii_observability::Logger::new(), @@ -73,7 +74,7 @@ mod tests { let mut pools = HashMap::new(); pools.insert("primary".to_string(), Pool::Sqlite(pool)); let st = AppState { - pools: Arc::new(pools), + pools: Arc::new(RwLock::new(pools)), handles: Arc::new(HandleRegistry::new()), transactions: crate::transaction::TxRegistry::new(), log: iii_observability::Logger::new(), diff --git a/database/src/handlers/transaction.rs b/database/src/handlers/transaction.rs index df868f4e..c68b1e4d 100644 --- a/database/src/handlers/transaction.rs +++ b/database/src/handlers/transaction.rs @@ -54,7 +54,7 @@ fn failed_index_of(e: &crate::error::DbError) -> Option { } pub async fn handle(state: &AppState, req: TxReq) -> Result { - let pool = state.pool(&req.db).map_err(err_to_str)?; + let pool = state.pool(&req.db).await.map_err(err_to_str)?; let isolation = match req.isolation.as_deref() { Some("read_committed") => Some(Isolation::ReadCommitted), @@ -75,7 +75,7 @@ pub async fn handle(state: &AppState, req: TxReq) -> Result { stmts.push(TxStatement { sql: s.sql, params }); } - let result = match pool { + let result = match &pool { Pool::Sqlite(p) => driver::sqlite::transaction(p, stmts, isolation).await, Pool::Postgres(p) => driver::postgres::transaction(p, stmts, isolation).await, Pool::Mysql(p) => driver::mysql::transaction(p, stmts, isolation).await, @@ -127,13 +127,14 @@ mod tests { use serde_json::json; use std::collections::HashMap; use std::sync::Arc; + use tokio::sync::RwLock; fn state() -> AppState { let pool = SqlitePool::new("sqlite::memory:", &PoolConfig::default()).unwrap(); let mut pools = HashMap::new(); pools.insert("primary".to_string(), Pool::Sqlite(pool)); AppState { - pools: Arc::new(pools), + pools: Arc::new(RwLock::new(pools)), handles: Arc::new(HandleRegistry::new()), transactions: crate::transaction::TxRegistry::new(), log: iii_observability::Logger::new(), diff --git a/database/src/handlers/transaction_execute.rs b/database/src/handlers/transaction_execute.rs index 61c9529b..0ce970d7 100644 --- a/database/src/handlers/transaction_execute.rs +++ b/database/src/handlers/transaction_execute.rs @@ -279,7 +279,7 @@ mod tests { let mut pools = std::collections::HashMap::new(); pools.insert("primary".to_string(), crate::pool::Pool::Sqlite(pool)); let st = crate::handlers::AppState { - pools: std::sync::Arc::new(pools), + pools: std::sync::Arc::new(tokio::sync::RwLock::new(pools)), handles: std::sync::Arc::new(crate::handle::HandleRegistry::new()), transactions: crate::transaction::TxRegistry::new(), log: iii_observability::Logger::new(), diff --git a/database/src/lib.rs b/database/src/lib.rs index 8db979e2..6ae751d2 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -1,6 +1,7 @@ //! database worker — public surface for the binary and tests. pub mod config; +pub mod configuration; pub(crate) mod driver; pub mod error; pub mod handle; diff --git a/database/src/main.rs b/database/src/main.rs index bfe2d44a..61925d54 100644 --- a/database/src/main.rs +++ b/database/src/main.rs @@ -1,6 +1,7 @@ use anyhow::{Context, Result}; use clap::Parser; use database::config::WorkerConfig; +use database::configuration; use database::handle::HandleRegistry; use database::handlers::{ begin_transaction::{self, BeginTxReq}, @@ -15,13 +16,14 @@ use database::handlers::{ transaction_query::{self, TxQueryReq}, AppState, }; -use database::pool; use database::transaction::TxRegistry; use database::triggers::handler::RowChangeTrigger; -use iii_observability::Logger; -use iii_sdk::{register_worker, InitOptions, RegisterFunction, RegisterTriggerType}; -use std::collections::HashMap; +use iii_observability::{Logger, OtelConfig}; +use iii_sdk::{ + register_worker, InitOptions, RegisterFunction, RegisterTriggerType, +}; use std::sync::Arc; +use tokio::sync::RwLock; #[derive(Parser, Debug)] #[command( @@ -29,9 +31,9 @@ use std::sync::Arc; about = "database worker (PostgreSQL, MySQL, SQLite)" )] struct Cli { - /// Path to config.yaml file - #[arg(long, default_value = "./config.yaml")] - config: String, + /// Optional seed config.yaml used to populate `initial_value` on first register + #[arg(long)] + config: Option, /// WebSocket URL of the iii engine #[arg(long, default_value = "ws://127.0.0.1:49134")] @@ -50,30 +52,57 @@ async fn main() -> Result<()> { let cli = Cli::parse(); tracing::info!( name = database::worker_name(), - config = %cli.config, + seed_config = cli.config.as_deref().unwrap_or("(none)"), url = %redact_url(&cli.url), "starting" ); - let cfg = WorkerConfig::from_file(&cli.config) - .map_err(|e| anyhow::anyhow!(e)) - .with_context(|| format!("loading config from {}", cli.config))?; + let iii = register_worker( + &cli.url, + InitOptions { + otel: Some(OtelConfig::default()), + ..Default::default() + }, + ); - let mut pools = HashMap::new(); - for (name, db) in &cfg.databases { - let p = pool::build(name, db) - .await - .map_err(|e| anyhow::anyhow!(serde_json::to_string(&e).unwrap_or_default())) - .with_context(|| format!("building pool for db `{name}`"))?; - tracing::info!(db = %name, driver = ?p.driver(), "pool ready"); - pools.insert(name.clone(), p); - } + let seed = match &cli.config { + Some(path) => match WorkerConfig::from_file(path) { + Ok(cfg) => { + tracing::info!(path = %path, "loaded seed config for initial registration"); + Some(cfg) + } + Err(e) => { + tracing::warn!( + path = %path, + error = %e, + "failed to load seed config; relying on existing configuration entry" + ); + None + } + }, + None => None, + }; + + configuration::register_config(&iii, seed.as_ref()) + .await + .map_err(anyhow::Error::msg) + .context("registering database configuration schema")?; + + let cfg = configuration::fetch_config(&iii) + .await + .map_err(anyhow::Error::msg) + .context("loading database configuration")?; + + let pools = configuration::build_pools(&cfg) + .await + .map_err(anyhow::Error::msg) + .context("building initial connection pools")?; let handles = Arc::new(HandleRegistry::new()); let transactions = TxRegistry::new(); let log = Logger::new(); let state = AppState { - pools: Arc::new(pools), + pools: Arc::new(RwLock::new(pools)), handles: handles.clone(), transactions: transactions.clone(), log: log.clone(), @@ -82,8 +111,6 @@ async fn main() -> Result<()> { let _evictor = handles.spawn_evictor(); let _tx_watcher = transactions.spawn_timeout_watcher(log.clone()); - let iii = register_worker(&cli.url, InitOptions::default()); - { let st = state.clone(); iii.register_function( @@ -247,8 +274,11 @@ async fn main() -> Result<()> { RowChangeTrigger, )); + configuration::register_config_trigger(&iii, state.clone()) + .context("registering configuration change trigger")?; + tracing::info!( - "database worker registered 10 functions and 1 trigger type, waiting for invocations" + "database worker registered 11 functions and 1 trigger type, waiting for invocations" ); wait_for_shutdown_signal().await?; tracing::info!("database worker shutting down"); diff --git a/database/tests/e2e/README.md b/database/tests/e2e/README.md index 62f83155..a6bdb482 100644 --- a/database/tests/e2e/README.md +++ b/database/tests/e2e/README.md @@ -34,9 +34,21 @@ Runs locally and in CI (`.github/workflows/database-e2e.yml`). ``` Builds the worker (`cargo build --release --bin database`), brings up -the docker stack with `wal_level=logical`, starts the engine, and runs the +the docker stack with `wal_level=logical`, starts the engine, seeds the +`database` configuration entry, starts the database worker, and runs the selected case groups across all 3 drivers. Exits 0 on PASS, 1 on any FAIL. +### Startup order + +1. Docker compose (postgres + mysql) +2. iii engine (`config.yaml` — queue + observability only) +3. `npm run seed-config` — registers `configuration::register` for id `database` +4. Database worker binary (host process, reads config via `configuration::get`) +5. Harness test suite (`npm run dev`) + +Neither the database worker nor the harness is engine-managed; both connect +over WebSocket like external clients. + ## Flags | Flag | Effect | @@ -93,12 +105,24 @@ accepted; outside-tx COUNT=1`). |---|---| | `run-tests.sh` | Orchestrator | | `docker-compose.yml` | Postgres (wal_level=logical) + MySQL with healthchecks | -| `config.yaml` | Engine config (queue, observability, database, harness) | +| `config.yaml` | Engine infra only (queue, observability) | +| `workers/harness/src/seed-configuration.ts` | Bootstrap: `configuration::register` for id `database` | +| `workers/harness/src/database-config.ts` | E2e `databases` value (sqlite + pg + mysql) | +| `workers/harness/fixtures/database.schema.json` | JSON Schema fixture (sync with Rust via export test) | | `workers/harness/` | TypeScript smoke-test worker (runs as a host process) | | `workers/harness/src/cases-interactive-tx.ts` | Interactive-transaction lifecycle cases | | `workers/harness/src/cases-tx-control-bypass.ts` | Side-channel-finalization repros | | `reports/report.json` | Per-case results (latest run) | +### Regenerating the schema fixture + +When `WorkerConfig` changes in Rust, refresh the harness schema: + +```sh +cd ../.. # database/ crate root +EXPORT_E2E_SCHEMA=1 cargo test -p database export_e2e_schema_fixture -- --ignored +``` + ## CI The harness runs in `.github/workflows/database-e2e.yml` on any PR @@ -113,6 +137,8 @@ the same docker compose stack used locally, and shells out to test ports. Stop it, or edit `docker-compose.yml`. - **`worker binary missing`**: run without `--no-build` once. - **`iii engine binary missing`**: install with the script above. +- **Database worker did not respond**: tail `reports/database-*.log`. Common + causes: configuration seed failed, or postgres/mysql not healthy. - **Sentinel timeout**: tail `reports/harness-*.log` for the harness output. - **Docker daemon not running**: start Docker Desktop (or `colima start`) and re-run. Or use rootless podman: `COMPOSE='podman-compose' ./run-tests.sh`. diff --git a/database/tests/e2e/config.yaml b/database/tests/e2e/config.yaml index 7d87e3ed..3db29980 100644 --- a/database/tests/e2e/config.yaml +++ b/database/tests/e2e/config.yaml @@ -1,16 +1,13 @@ # iii engine configuration — passed via `iii -c config.yaml`. # -# The harness TS worker is NOT registered here. It runs as a plain host -# node process launched from run-tests.sh and connects to the engine via -# WebSocket like any external client — sidesteps the libkrun-VM-based -# managed-worker setup, which is overkill for a test harness. +# Infrastructure workers only (queue, observability). The `configuration` +# worker is enabled by default in the engine. # -# The database worker config is inlined under its worker entry. The -# engine serializes this `config:` value to /tmp/database-config.yaml -# and threads `--config ` through `iii-worker start` to the spawned -# binary (see engine/src/workers/registry_worker.rs::spawn and -# crates/iii-worker/src/cli/managed.rs::start_binary_worker on branch -# feat/registry-worker-config-delivery). +# The database worker and harness are NOT registered here. run-tests.sh: +# 1. starts this engine +# 2. runs harness seed-configuration.ts (configuration::register) +# 3. spawns the database binary as a host process +# 4. runs the harness test suite workers: - name: iii-queue @@ -25,36 +22,3 @@ workers: exporter: memory logs_console_output: true sampling_ratio: 1.0 - - - name: database - config: - databases: - sqlite_db: - url: sqlite:./data/iii.db - pool: - max: 10 - idle_timeout_ms: 30000 - acquire_timeout_ms: 5000 - pg_db: - url: postgres://iii:iii@127.0.0.1:55432/iii_test - pool: - max: 10 - idle_timeout_ms: 30000 - acquire_timeout_ms: 5000 - # Local docker postgres uses a self-signed cert that doesn't - # chain to any system CA. The worker's `tls.mode` defaults to - # `require` (chain-validated) — opt out for the test harness. - tls: - mode: disable - mysql_db: - url: mysql://iii:iii@127.0.0.1:53306/iii_test - pool: - max: 10 - idle_timeout_ms: 30000 - acquire_timeout_ms: 5000 - # Local docker mysql:8.4 ships an auto-generated self-signed cert - # that doesn't chain to any system CA. Without `tls.mode: disable` - # the worker's require-mode rustls verifier rejects it and every - # mysql RPC fails with DRIVER_ERROR. - tls: - mode: disable diff --git a/database/tests/e2e/run-tests.sh b/database/tests/e2e/run-tests.sh index e215e7a8..2d4361fb 100755 --- a/database/tests/e2e/run-tests.sh +++ b/database/tests/e2e/run-tests.sh @@ -21,6 +21,7 @@ COMPOSE="${COMPOSE:-docker compose}" REPORT_PATH="$ROOT_DIR/reports/report.json" TS=$(date +%Y%m%d-%H%M%S) ENGINE_LOG="$ROOT_DIR/reports/engine-$TS.log" +DATABASE_LOG="$ROOT_DIR/reports/database-$TS.log" HARNESS_LOG="$ROOT_DIR/reports/harness-$TS.log" SENTINEL_TIMEOUT="${HARNESS_TIMEOUT:-180}" HEALTH_TIMEOUT="${HEALTH_TIMEOUT:-60}" @@ -75,6 +76,7 @@ EOF done ENGINE_PID="" +DATABASE_PID="" HARNESS_PID="" cleanup() { local code=$? @@ -82,6 +84,10 @@ cleanup() { kill "$HARNESS_PID" 2>/dev/null || true wait "$HARNESS_PID" 2>/dev/null || true fi + if [[ -n "$DATABASE_PID" ]] && kill -0 "$DATABASE_PID" 2>/dev/null; then + kill "$DATABASE_PID" 2>/dev/null || true + wait "$DATABASE_PID" 2>/dev/null || true + fi if [[ -n "$ENGINE_PID" ]] && kill -0 "$ENGINE_PID" 2>/dev/null; then kill "$ENGINE_PID" 2>/dev/null || true wait "$ENGINE_PID" 2>/dev/null || true @@ -189,8 +195,9 @@ if [[ "$WITH_CARGO_TEST" -eq 1 ]]; then ) fi -# 6. Reset SQLite file +# 6. Reset SQLite file and any prior configuration fs state rm -f "$ROOT_DIR/data/test.sqlite" "$ROOT_DIR/data/iii.db" +rm -rf "$ROOT_DIR/data/configuration" # 7. Install harness deps if needed if [[ ! -d "$ROOT_DIR/workers/harness/node_modules" ]]; then @@ -230,7 +237,38 @@ while :; do done echo "[run-tests] engine listening" -# 10. Launch the harness as a host node process +# 10. Seed the `database` configuration entry via the harness bootstrap script. +echo "[run-tests] seeding database configuration" +( cd "$ROOT_DIR/workers/harness" && III_URL=ws://127.0.0.1:49134 npm run --silent seed-config ) + +# 11. Start the database worker as a host process (not engine-managed). +echo "[run-tests] starting database worker" +: > "$DATABASE_LOG" +( cd "$ROOT_DIR" && "$WORKER_BIN_TARGET" --url ws://127.0.0.1:49134 ) > "$DATABASE_LOG" 2>&1 & +DATABASE_PID=$! +echo "[run-tests] database worker pid=$DATABASE_PID" + +# 12. Wait for database::query to succeed (worker startup + pool build). +deadline=$(( $(date +%s) + 30 )) +while :; do + if "$III_BIN" trigger database::query db=sqlite_db sql='SELECT 1' >/dev/null 2>&1; then + break + fi + if ! kill -0 "$DATABASE_PID" 2>/dev/null; then + echo "[run-tests] FATAL: database worker exited before becoming ready; tail of database log:" >&2 + tail -40 "$DATABASE_LOG" >&2 + exit 1 + fi + if (( $(date +%s) > deadline )); then + echo "[run-tests] FATAL: database worker did not respond within 30s; tail of database log:" >&2 + tail -40 "$DATABASE_LOG" >&2 + exit 1 + fi + sleep 0.5 +done +echo "[run-tests] database worker ready" + +# 13. Launch the harness as a host node process echo "[run-tests] starting harness (mode=$MODE)" HARNESS_ENV=() if [[ -n "$FILTER" ]]; then @@ -244,7 +282,7 @@ HARNESS_ENV+=("HARNESS_MODE=$MODE") HARNESS_PID=$! echo "[run-tests] harness pid=$HARNESS_PID" -# 11. Wait for sentinel line +# 14. Wait for sentinel line sentinel="" deadline=$(( $(date +%s) + SENTINEL_TIMEOUT )) while (( $(date +%s) < deadline )); do @@ -271,7 +309,7 @@ if [[ -z "$sentinel" ]]; then exit 1 fi -# 12. Print summary +# 15. Print summary echo echo "=======================================================================" echo "$sentinel" diff --git a/database/tests/e2e/workers/harness/fixtures/database.schema.json b/database/tests/e2e/workers/harness/fixtures/database.schema.json new file mode 100644 index 00000000..f223dbaf --- /dev/null +++ b/database/tests/e2e/workers/harness/fixtures/database.schema.json @@ -0,0 +1,177 @@ +{ + "definitions": { + "DatabaseConfig": { + "description": "Per-database connection settings. The URL scheme selects the driver; `pool` and `tls` are optional and default when omitted.", + "properties": { + "pool": { + "allOf": [ + { + "$ref": "#/definitions/PoolConfig" + } + ], + "default": { + "acquire_timeout_ms": 5000, + "idle_timeout_ms": 30000, + "max": 10 + } + }, + "tls": { + "allOf": [ + { + "$ref": "#/definitions/TlsConfig" + } + ], + "default": { + "ca_cert": null, + "mode": "require", + "trust_native": true + } + }, + "url": { + "description": "Connection URL. Driver is inferred from the scheme: `sqlite:`, `postgres://` or `postgresql://`, or `mysql://`.", + "type": "string" + } + }, + "required": [ + "url" + ], + "type": "object" + }, + "PoolConfig": { + "properties": { + "acquire_timeout_ms": { + "default": 5000, + "description": "Fail pool acquisition when no connection is available within this many milliseconds.", + "format": "uint64", + "minimum": 0.0, + "type": "integer" + }, + "idle_timeout_ms": { + "default": 30000, + "description": "Close idle connections after this many milliseconds.", + "format": "uint64", + "minimum": 0.0, + "type": "integer" + }, + "max": { + "default": 10, + "description": "Maximum number of open connections in the pool.", + "format": "uint32", + "minimum": 0.0, + "type": "integer" + } + }, + "type": "object" + }, + "TlsConfig": { + "description": "TLS settings for a single database. Applies to postgres and mysql. Sqlite is local-file and ignores this block.\n\nDefault is `mode: require` — TLS handshake required, certificate chain validated against the system trust store, hostname verification skipped (matching libpq's `sslmode=require` semantics). Use `mode: verify-full` to additionally verify the certificate hostname matches the URL host, and `mode: disable` to opt out of TLS entirely (local-dev only).", + "properties": { + "ca_cert": { + "default": null, + "description": "Optional path to a PEM file containing one or more CA certificates. Additive by default — these certs **extend** the system trust store rather than replace it. Set `trust_native: false` for strict-isolation deployments that must only trust the operator-supplied bundle.", + "type": [ + "string", + "null" + ] + }, + "mode": { + "allOf": [ + { + "$ref": "#/definitions/TlsMode" + } + ], + "default": "require", + "description": "TLS mode: `disable` (plaintext), `require` (default), or `verify-full`." + }, + "trust_native": { + "default": true, + "description": "When true (default), the system/native trust store is loaded in addition to any `ca_cert` bundle. Set to `false` to trust only the `ca_cert` certificates — useful when an operator wants to pin trust to a private CA and explicitly *not* accept the public web PKI.\n\nEffective for postgres. MySQL is forced-additive: `mysql_async`'s rustls path always loads the Mozilla `webpki_roots` bundle and extends it with `ca_cert` — there is no upstream knob to suppress the bundled roots, so `trust_native: false` only affects postgres.\n\nNote: with both `trust_native: false` *and* `ca_cert: None` on postgres, no trust roots are available; pool construction fails with `CONFIG_ERROR`.", + "type": "boolean" + } + }, + "type": "object" + }, + "TlsMode": { + "oneOf": [ + { + "description": "No TLS. Plaintext connection. Local-dev only.", + "enum": [ + "disable" + ], + "type": "string" + }, + { + "description": "TLS handshake required; certificate chain validated; hostname NOT verified. Matches libpq's `sslmode=require`. The default.", + "enum": [ + "require" + ], + "type": "string" + }, + { + "description": "TLS handshake required; certificate chain validated; certificate hostname must match the URL host. Matches libpq's `sslmode=verify-full`.", + "enum": [ + "verify-full" + ], + "type": "string" + } + ] + } + }, + "description": "Top-level worker config registered with the `configuration` worker.", + "example": { + "databases": { + "primary": { + "pool": { + "acquire_timeout_ms": 5000, + "idle_timeout_ms": 30000, + "max": 10 + }, + "url": "sqlite:./data/iii.db" + } + } + }, + "examples": [ + { + "databases": { + "primary": { + "pool": { + "acquire_timeout_ms": 5000, + "idle_timeout_ms": 30000, + "max": 10 + }, + "tls": { + "ca_cert": null, + "mode": "require", + "trust_native": true + }, + "url": "sqlite:./data/iii.db" + } + } + } + ], + "properties": { + "databases": { + "additionalProperties": { + "$ref": "#/definitions/DatabaseConfig" + }, + "default": {}, + "description": "Named connection pools. Keys are logical database names referenced by RPC handlers (for example `primary`). At least one entry is required.", + "examples": [ + { + "primary": { + "pool": { + "acquire_timeout_ms": 5000, + "idle_timeout_ms": 30000, + "max": 10 + }, + "url": "sqlite:./data/iii.db" + } + } + ], + "minProperties": 1, + "type": "object" + } + }, + "title": "WorkerConfig", + "type": "object" +} diff --git a/database/tests/e2e/workers/harness/package.json b/database/tests/e2e/workers/harness/package.json index c7032dce..598d5a71 100644 --- a/database/tests/e2e/workers/harness/package.json +++ b/database/tests/e2e/workers/harness/package.json @@ -6,6 +6,7 @@ "description": "Self-asserting smoke harness for the iii database worker (SQLite, Postgres, MySQL).", "scripts": { "dev": "tsx src/worker.ts", + "seed-config": "tsx src/seed-configuration.ts", "build": "tsc" }, "dependencies": { diff --git a/database/tests/e2e/workers/harness/src/database-config.ts b/database/tests/e2e/workers/harness/src/database-config.ts new file mode 100644 index 00000000..510321a4 --- /dev/null +++ b/database/tests/e2e/workers/harness/src/database-config.ts @@ -0,0 +1,34 @@ +/** + * E2e `database` configuration value — single source of truth for the + * configuration worker entry seeded before the database worker starts. + */ + +const DEFAULT_POOL = { + max: 10, + idle_timeout_ms: 30_000, + acquire_timeout_ms: 5_000, +} as const; + +export const DATABASE_CONFIG_ID = 'database'; + +export const DATABASE_CONFIG_VALUE = { + databases: { + sqlite_db: { + url: 'sqlite:./data/iii.db', + pool: { ...DEFAULT_POOL }, + }, + pg_db: { + url: 'postgres://iii:iii@127.0.0.1:55432/iii_test', + pool: { ...DEFAULT_POOL }, + // Local docker postgres uses a self-signed cert that doesn't chain to + // any system CA. The worker's tls.mode defaults to require. + tls: { mode: 'disable' as const }, + }, + mysql_db: { + url: 'mysql://iii:iii@127.0.0.1:53306/iii_test', + pool: { ...DEFAULT_POOL }, + // Local docker mysql:8.4 ships an auto-generated self-signed cert. + tls: { mode: 'disable' as const }, + }, + }, +} as const; diff --git a/database/tests/e2e/workers/harness/src/seed-configuration.ts b/database/tests/e2e/workers/harness/src/seed-configuration.ts new file mode 100644 index 00000000..88b59c5a --- /dev/null +++ b/database/tests/e2e/workers/harness/src/seed-configuration.ts @@ -0,0 +1,88 @@ +/** + * Bootstrap: register the `database` configuration entry before the database + * worker binary starts. Invoked by run-tests.sh via `npm run seed-config`. + */ + +import { readFileSync } from 'node:fs'; +import { resolve, dirname } from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { registerWorker } from 'iii-sdk'; +import { + DATABASE_CONFIG_ID, + DATABASE_CONFIG_VALUE, +} from './database-config.ts'; + +const URL = process.env.III_URL ?? 'ws://127.0.0.1:49134'; +const CONFIG_RETRIES = 3; +const CONFIG_TIMEOUT_MS = 5_000; + +const __dirname = dirname(fileURLToPath(import.meta.url)); +const SCHEMA_PATH = resolve(__dirname, '../fixtures/database.schema.json'); + +function loadSchema(): unknown { + return JSON.parse(readFileSync(SCHEMA_PATH, 'utf8')); +} + +async function triggerWithRetry( + iii: ReturnType, + functionId: string, + payload: unknown, +): Promise { + let lastErr: unknown; + for (let attempt = 1; attempt <= CONFIG_RETRIES; attempt++) { + try { + return await iii.trigger({ + function_id: functionId, + payload, + timeout_ms: CONFIG_TIMEOUT_MS, + }); + } catch (e) { + lastErr = e; + if (attempt < CONFIG_RETRIES) { + console.warn( + `[seed-config] ${functionId} failed (attempt ${attempt}/${CONFIG_RETRIES}):`, + e, + ); + await new Promise((r) => setTimeout(r, 250 * attempt)); + } + } + } + throw lastErr; +} + +async function main(): Promise { + const iii = registerWorker(URL); + const schema = loadSchema(); + + console.log('[seed-config] registering configuration entry', { + id: DATABASE_CONFIG_ID, + url: URL, + }); + + await triggerWithRetry(iii, 'configuration::register', { + id: DATABASE_CONFIG_ID, + name: 'Database', + description: 'Connection pools for PostgreSQL, MySQL, and SQLite.', + schema, + initial_value: DATABASE_CONFIG_VALUE, + }); + + const got = (await triggerWithRetry(iii, 'configuration::get', { + id: DATABASE_CONFIG_ID, + })) as { id?: string; value?: unknown }; + + if (got?.id !== DATABASE_CONFIG_ID || got.value == null) { + throw new Error( + `configuration::get verification failed: ${JSON.stringify(got)}`, + ); + } + + console.log('[seed-config] configuration entry ready'); + await new Promise((r) => setTimeout(r, 200)); + await iii.shutdown(); +} + +main().catch((e) => { + console.error('[seed-config] fatal:', e?.stack ?? e); + process.exit(1); +}); diff --git a/database/tests/integration.rs b/database/tests/integration.rs index 5a0b1eda..79f94a11 100644 --- a/database/tests/integration.rs +++ b/database/tests/integration.rs @@ -15,6 +15,7 @@ use iii_observability::Logger; use serde_json::json; use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::RwLock; async fn build_state() -> AppState { let yaml = "databases:\n primary:\n url: \"sqlite::memory:\"\n"; @@ -25,13 +26,25 @@ async fn build_state() -> AppState { pools.insert(name.clone(), p); } AppState { - pools: Arc::new(pools), + pools: Arc::new(RwLock::new(pools)), handles: Arc::new(HandleRegistry::new()), transactions: TxRegistry::new(), log: Logger::new(), } } +#[test] +fn from_json_parity_with_yaml_seed_shape() { + let yaml = "databases:\n primary:\n url: \"sqlite::memory:\"\n"; + let from_yaml = WorkerConfig::from_yaml(yaml).unwrap(); + let from_json = WorkerConfig::from_json(&from_yaml.to_json()).unwrap(); + assert_eq!(from_yaml.databases["primary"].url, from_json.databases["primary"].url); + assert_eq!( + from_yaml.databases["primary"].driver, + from_json.databases["primary"].driver + ); +} + #[tokio::test(flavor = "multi_thread")] async fn end_to_end_query_execute_prepare_run_transaction() { let st = build_state().await; From 212b7b549f853987078a8105703e48c759c5deb5 Mon Sep 17 00:00:00 2001 From: Sergio Marcelino Date: Wed, 27 May 2026 17:34:53 -0300 Subject: [PATCH 2/3] chore: making iii next in database --- .github/workflows/database-e2e.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/database-e2e.yml b/.github/workflows/database-e2e.yml index 2b09fca3..076f772d 100644 --- a/.github/workflows/database-e2e.yml +++ b/.github/workflows/database-e2e.yml @@ -43,10 +43,10 @@ jobs: # GHA `services:` blocks can't pass `-c wal_level=logical` to postgres, # which the row-change tests require. Reuse the same docker-compose # stack the harness uses locally for dev/CI parity. - - name: Install iii engine (latest from main) + - name: Install iii engine (next) run: | curl -fsSL --retry 3 --retry-connrefused --retry-delay 5 \ - https://install.iii.dev/iii/main/install.sh | sh + https://install.iii.dev/iii/main/install.sh | sh -s -- --next echo "$HOME/.local/bin" >> "$GITHUB_PATH" - name: Verify engine From 649995cedb020a595acc2f5790ac2f4fbbd62c87 Mon Sep 17 00:00:00 2001 From: Sergio Marcelino Date: Wed, 27 May 2026 18:00:20 -0300 Subject: [PATCH 3/3] chore: fmt --- database/src/config.rs | 14 ++++++++++---- database/src/configuration.rs | 18 ++++++------------ database/src/main.rs | 4 +--- database/tests/integration.rs | 5 ++++- 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/database/src/config.rs b/database/src/config.rs index b1e703ef..b0b969a4 100644 --- a/database/src/config.rs +++ b/database/src/config.rs @@ -9,8 +9,8 @@ //! `configuration::get` are already expanded by the configuration worker //! (`${VAR:default}` syntax). -use schemars::JsonSchema; use schemars::schema::Schema; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; @@ -228,7 +228,8 @@ impl WorkerConfig { pub fn json_schema() -> Value { let root = schemars::schema_for!(WorkerConfig); - let mut schema = serde_json::to_value(&root.schema).expect("WorkerConfig JSON Schema serializes"); + let mut schema = + serde_json::to_value(&root.schema).expect("WorkerConfig JSON Schema serializes"); if let Some(obj) = schema.as_object_mut() { if !root.definitions.is_empty() { obj.insert( @@ -410,7 +411,10 @@ mod tests { #[test] fn json_schema_is_object_with_databases_property() { let schema = WorkerConfig::json_schema(); - assert!(schema.get("properties").and_then(|p| p.get("databases")).is_some()); + assert!(schema + .get("properties") + .and_then(|p| p.get("databases")) + .is_some()); } #[test] @@ -452,7 +456,9 @@ mod tests { let pool_schema = schema["definitions"]["PoolConfig"].as_object().unwrap(); for field in ["max", "idle_timeout_ms", "acquire_timeout_ms"] { assert!( - pool_schema["properties"][field].get("description").is_some(), + pool_schema["properties"][field] + .get("description") + .is_some(), "missing description for pool.{field}" ); } diff --git a/database/src/configuration.rs b/database/src/configuration.rs index 3e2e8650..2fd3ff0d 100644 --- a/database/src/configuration.rs +++ b/database/src/configuration.rs @@ -4,9 +4,7 @@ use crate::config::WorkerConfig; use crate::handlers::AppState; use crate::pool::{self, Pool}; -use iii_sdk::{ - III, IIIError, RegisterFunction, RegisterTriggerInput, TriggerRequest, -}; +use iii_sdk::{IIIError, RegisterFunction, RegisterTriggerInput, TriggerRequest, III}; use serde_json::{json, Value}; use std::collections::HashMap; use std::time::Duration; @@ -101,9 +99,7 @@ pub fn register_config_trigger(iii: &III, state: AppState) -> Result<(), IIIErro Ok::(json!({ "ok": true })) } }) - .description( - "Internal: reload connection pools when the database configuration changes.", - ), + .description("Internal: reload connection pools when the database configuration changes."), ); iii.register_trigger(RegisterTriggerInput { @@ -141,11 +137,7 @@ async fn on_config_change(state: &AppState, payload: Value) { } } -async fn trigger_with_retry( - iii: &III, - function_id: &str, - payload: Value, -) -> Result { +async fn trigger_with_retry(iii: &III, function_id: &str, payload: Value) -> Result { let mut last_err = String::new(); for attempt in 1..=CONFIG_RETRIES { match iii @@ -172,5 +164,7 @@ async fn trigger_with_retry( } } } - Err(format!("{function_id} failed after {CONFIG_RETRIES} attempts: {last_err}")) + Err(format!( + "{function_id} failed after {CONFIG_RETRIES} attempts: {last_err}" + )) } diff --git a/database/src/main.rs b/database/src/main.rs index 61925d54..5c8183af 100644 --- a/database/src/main.rs +++ b/database/src/main.rs @@ -19,9 +19,7 @@ use database::handlers::{ use database::transaction::TxRegistry; use database::triggers::handler::RowChangeTrigger; use iii_observability::{Logger, OtelConfig}; -use iii_sdk::{ - register_worker, InitOptions, RegisterFunction, RegisterTriggerType, -}; +use iii_sdk::{register_worker, InitOptions, RegisterFunction, RegisterTriggerType}; use std::sync::Arc; use tokio::sync::RwLock; diff --git a/database/tests/integration.rs b/database/tests/integration.rs index 79f94a11..76b5576e 100644 --- a/database/tests/integration.rs +++ b/database/tests/integration.rs @@ -38,7 +38,10 @@ fn from_json_parity_with_yaml_seed_shape() { let yaml = "databases:\n primary:\n url: \"sqlite::memory:\"\n"; let from_yaml = WorkerConfig::from_yaml(yaml).unwrap(); let from_json = WorkerConfig::from_json(&from_yaml.to_json()).unwrap(); - assert_eq!(from_yaml.databases["primary"].url, from_json.databases["primary"].url); + assert_eq!( + from_yaml.databases["primary"].url, + from_json.databases["primary"].url + ); assert_eq!( from_yaml.databases["primary"].driver, from_json.databases["primary"].driver