Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions database/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,20 @@ pub async fn apply_config(state: &AppState, cfg: WorkerConfig) -> Result<(), Str
/// Register the internal config-change handler and bind a `configuration` trigger.
pub fn register_config_trigger(iii: &III, state: AppState) -> Result<(), IIIError> {
let st = state.clone();
let engine = iii.clone();
iii.register_function(
CONFIG_FN_ID,
RegisterFunction::new_async(move |payload: Value| {
RegisterFunction::new_async(move |_payload: Value| {
let st = st.clone();
let engine = engine.clone();
async move {
on_config_change(&st, payload).await;
on_config_change(&engine, &st).await;
Ok::<Value, IIIError>(json!({ "ok": true }))
}
})
.description("Internal: reload connection pools when the database configuration changes."),
.description(
"Internal: reload connection pools from the authoritative configuration when it changes.",
),
);

iii.register_trigger(RegisterTriggerInput {
Expand All @@ -119,25 +123,29 @@ pub fn register_config_trigger(iii: &III, state: AppState) -> Result<(), IIIErro
Ok(())
}

async fn on_config_change(state: &AppState, payload: Value) {
let new_value = match payload.get("new_value") {
Some(v) if !v.is_null() => v.clone(),
_ => {
tracing::warn!("configuration event missing new_value; skipping pool reload");
/// Reload pools from the AUTHORITATIVE configuration.
///
/// The caller-supplied trigger payload is intentionally ignored:
/// `database::on-config-change` is a discoverable bus function, so trusting
/// `payload.new_value` would let any caller replace the live connection pools
/// (e.g. point them at an attacker-controlled database) without updating
/// persisted state. Re-fetch the stored value via `configuration::get` instead.
async fn on_config_change(iii: &III, state: &AppState) {
let cfg = match fetch_config(iii).await {
Ok(cfg) => cfg,
Err(e) => {
tracing::error!(
error = %e,
"config-change: failed to fetch authoritative configuration; keeping previous pools"
);
return;
}
};
match WorkerConfig::from_json(&new_value) {
Ok(cfg) => match apply_config(state, cfg).await {
Ok(()) => tracing::info!("database pools reloaded after configuration change"),
Err(e) => tracing::error!(
error = %e,
"failed to rebuild pools after configuration change; keeping previous pools"
),
},
match apply_config(state, cfg).await {
Ok(()) => tracing::info!("database pools reloaded after configuration change"),
Err(e) => tracing::error!(
error = %e,
"invalid configuration payload; keeping previous pools"
"failed to rebuild pools after configuration change; keeping previous pools"
),
}
}
Expand Down
7 changes: 7 additions & 0 deletions iii-permissions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ rules:
- '!oauth::openai-codex::login'
- '!run::start'
- '!router::stream_assistant'
# Internal lifecycle hooks — agents must never force a config reload directly.
# The handlers already re-fetch from configuration::get (ignoring the event
# payload), so a direct call cannot inject config. These denies are
# defense-in-depth: they close the agent-callable surface so a future handler
# change cannot inadvertently re-open the injection path.
- '!storage::on-config-change'
- '!database::on-config-change'

# Read-only / introspection (extend below for your tools).
- state::get
Expand Down
74 changes: 5 additions & 69 deletions storage/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ path = "src/main.rs"
path = "src/lib.rs"

[dependencies]
iii-sdk = "=0.16.0-next.2"
iii-sdk = "=0.19.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "signal", "time", "process", "io-util", "net"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand Down
115 changes: 73 additions & 42 deletions storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,42 +86,71 @@ iii.trigger(TriggerRequest {

## Configuration

`storage` reads one `config.yaml` describing one or more buckets. Each
bucket pins a `provider` (`s3` | `gcs` | `r2` | `local`) and the
credentials for that provider. Buckets without `notifications:` work
fine for RPCs; they just don't fire triggers.
The storage worker gets its live configuration from the `configuration` worker,
not from a local file. On startup it:

1. Registers its config schema with the `configuration` worker
(`configuration::register`, id `storage`).
2. Fetches the live, env-expanded config (`configuration::get`).
3. Subscribes to `configuration:updated` events and hot-reloads.

`--config <path>` is an **optional seed**: when given, the file is loaded and
sent as `initial_value` the first time the schema is registered (no stored value
yet). It is not the live source of truth — once a value exists in the
`configuration` worker, that value wins.

### Hot-reload scope

On a `configuration:updated` event the worker re-fetches the authoritative
config from the `configuration` worker (it does **not** trust the event
payload). It then rebuilds the in-memory **backend map only**, and only when the
bucket/notification **topology is unchanged** — i.e. when just backend
connection settings changed (credentials, endpoint, path-style).

Any change to the bucket set, a bucket's provider or underlying name, a
notification source, or the local rustfs data dir is **refused**: the worker
keeps the previously-running backends and logs that a restart is required.
This avoids a split-brain where RPC reads/writes move to a new backend while
the notification pollers/webhook stay wired to the old topology. A failed
rebuild likewise keeps the previous backends.

A fresh install with no configured buckets runs with zero backends until a
bucket is configured.

### Config shape

Each bucket pins a `provider` (`s3` | `gcs` | `r2` | `local`) and the
credentials for that provider. Buckets without `notifications:` work fine for
RPCs; they just don't fire triggers.

```yaml
workers:
- name: storage
config:
providers:
local:
data_dir: ./data/storage # rustfs sidecar root

buckets:
uploads:
provider: s3
bucket: my-app-uploads # underlying cloud bucket
region: us-east-1
notifications:
sqs_queue_url: https://sqs.us-east-1.amazonaws.com/123/my-app-uploads-events

documents:
provider: gcs
bucket: my-app-documents
# credentials_file: /etc/iii/gcs-sa.json # required for presignUrl

avatars:
provider: r2
bucket: avatars
account_id: ${R2_ACCOUNT_ID}
access_key_id: ${R2_ACCESS_KEY_ID}
secret_access_key: ${R2_SECRET_ACCESS_KEY}

scratch:
provider: local
bucket: scratch
providers:
local:
data_dir: ./data/storage # rustfs sidecar root

buckets:
uploads:
provider: s3
bucket: my-app-uploads # underlying cloud bucket
region: us-east-1
notifications:
sqs_queue_url: https://sqs.us-east-1.amazonaws.com/123/my-app-uploads-events

documents:
provider: gcs
bucket: my-app-documents
# credentials_file: /etc/iii/gcs-sa.json # required for presignUrl

avatars:
provider: r2
bucket: avatars
account_id: ${R2_ACCOUNT_ID}
access_key_id: ${R2_ACCESS_KEY_ID}
secret_access_key: ${R2_SECRET_ACCESS_KEY}

scratch:
provider: local
bucket: scratch
```

The map key (`uploads`) is the worker-facing bucket name handlers
Expand Down Expand Up @@ -214,21 +243,23 @@ triggers:
## Local development & testing

The committed `config.yaml` declares a single `scratch` bucket served by the
bundled rustfs sidecar, so you can run the worker against a local engine with
zero credentials.
bundled rustfs sidecar. Pass it as a seed so the `configuration` worker picks it
up on first boot — zero cloud credentials required.

```bash
# In one terminal: start the engine
# In one terminal: start the engine (must include the configuration worker)
iii start

# In another: build & run the worker
# In another: build & run the worker, seeding config.yaml on first registration
cargo run --release -- --url ws://127.0.0.1:49134 --config ./config.yaml
```

The worker spawns a rustfs process on a random port, waits for it to become
healthy, then registers `storage::putObject`, `storage::getObject`,
`storage::deleteObject`, and `storage::presignUrl`. Files land under
`./data/storage/` (configurable via `providers.local.data_dir`).
The worker registers its schema with the `configuration` worker (seeding
`config.yaml` if no stored value exists), fetches the live config, then spawns a
rustfs process on a random port, waits for it to become healthy, and registers
`storage::putObject`, `storage::getObject`, `storage::deleteObject`, and
`storage::presignUrl`. Files land under `./data/storage/` (configurable via
`providers.local.data_dir`).

Running `--manifest` prints the registry-publish JSON without touching the
engine — useful when testing CI flows:
Expand Down
1 change: 1 addition & 0 deletions storage/src/backend/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::Arc;

/// Runtime context for the local backend, supplied by main once the rustfs
/// sidecar has been spawned. S3/GCS/R2 arms ignore this.
#[derive(Clone)]
pub struct LocalBackendCtx {
pub port: u16,
pub access_key_id: String,
Expand Down
Loading
Loading