From 8f70c33469512dbcd87f9eaa796d8d4776240b55 Mon Sep 17 00:00:00 2001 From: Anderson Leal Date: Mon, 8 Jun 2026 18:17:45 -0300 Subject: [PATCH] feat(storage): migrate to iii-sdk 0.19 with config-worker integration + hardened reload Bump storage to iii-sdk 0.19.0 and adopt the configuration-worker pattern (mirroring the database worker): register a config schema, fetch live config over RPC, hot-reload backends on change, and register the storage::* functions inline. --config becomes an optional seed; manifest.rs and the rustfs sidecar / webhook receiver / SQS-PubSub-CF pollers are preserved. Config-reload security hardening (storage + database), fixing two adversarial -review findings: - on-config-change re-fetches the authoritative config via configuration::get and never trusts the trigger payload, closing an unauthenticated reconfiguration vector on a discoverable bus function. - storage refuses hot-reloads that change bucket/notification topology (WorkerConfig::topology signature compared in reloadable()), preventing a backend/notification split-brain; only backend-connection changes (credentials, endpoint, path-style) hot-apply. - iii-permissions.yaml denies agent invocation of the internal *::on-config-change functions as defense-in-depth. Verified: storage 91 lib + integration + clippy -D warnings; database 198 lib + 6 integration + clippy -D warnings, all green. --- database/src/configuration.rs | 42 ++-- iii-permissions.yaml | 7 + storage/Cargo.lock | 74 +----- storage/Cargo.toml | 2 +- storage/README.md | 115 ++++++---- storage/src/backend/factory.rs | 1 + storage/src/config.rs | 313 ++++++++++++++++++++++++-- storage/src/configuration.rs | 250 ++++++++++++++++++++ storage/src/handlers/delete_object.rs | 6 +- storage/src/handlers/get_object.rs | 6 +- storage/src/handlers/mod.rs | 85 ++----- storage/src/handlers/presign_url.rs | 6 +- storage/src/handlers/put_object.rs | 6 +- storage/src/lib.rs | 1 + storage/src/main.rs | 158 +++++++++---- 15 files changed, 804 insertions(+), 268 deletions(-) create mode 100644 storage/src/configuration.rs diff --git a/database/src/configuration.rs b/database/src/configuration.rs index c7e66787..6f373c0d 100644 --- a/database/src/configuration.rs +++ b/database/src/configuration.rs @@ -95,16 +95,20 @@ pub async fn apply_config(state: &AppState, cfg: WorkerConfig) -> Result<(), Str /// Register the internal config-change handler and bind a `configuration` trigger. pub fn register_config_trigger(iii: &III, state: AppState) -> Result<(), IIIError> { let st = state.clone(); + let engine = iii.clone(); iii.register_function( CONFIG_FN_ID, - RegisterFunction::new_async(move |payload: Value| { + RegisterFunction::new_async(move |_payload: Value| { let st = st.clone(); + let engine = engine.clone(); async move { - on_config_change(&st, payload).await; + on_config_change(&engine, &st).await; Ok::(json!({ "ok": true })) } }) - .description("Internal: reload connection pools when the database configuration changes."), + .description( + "Internal: reload connection pools from the authoritative configuration when it changes.", + ), ); iii.register_trigger(RegisterTriggerInput { @@ -119,25 +123,29 @@ pub fn register_config_trigger(iii: &III, state: AppState) -> Result<(), IIIErro Ok(()) } -async fn on_config_change(state: &AppState, payload: Value) { - let new_value = match payload.get("new_value") { - Some(v) if !v.is_null() => v.clone(), - _ => { - tracing::warn!("configuration event missing new_value; skipping pool reload"); +/// Reload pools from the AUTHORITATIVE configuration. +/// +/// The caller-supplied trigger payload is intentionally ignored: +/// `database::on-config-change` is a discoverable bus function, so trusting +/// `payload.new_value` would let any caller replace the live connection pools +/// (e.g. point them at an attacker-controlled database) without updating +/// persisted state. Re-fetch the stored value via `configuration::get` instead. +async fn on_config_change(iii: &III, state: &AppState) { + let cfg = match fetch_config(iii).await { + Ok(cfg) => cfg, + Err(e) => { + tracing::error!( + error = %e, + "config-change: failed to fetch authoritative configuration; keeping previous pools" + ); return; } }; - match WorkerConfig::from_json(&new_value) { - Ok(cfg) => match apply_config(state, cfg).await { - Ok(()) => tracing::info!("database pools reloaded after configuration change"), - Err(e) => tracing::error!( - error = %e, - "failed to rebuild pools after configuration change; keeping previous pools" - ), - }, + match apply_config(state, cfg).await { + Ok(()) => tracing::info!("database pools reloaded after configuration change"), Err(e) => tracing::error!( error = %e, - "invalid configuration payload; keeping previous pools" + "failed to rebuild pools after configuration change; keeping previous pools" ), } } diff --git a/iii-permissions.yaml b/iii-permissions.yaml index a12c900d..a023d759 100644 --- a/iii-permissions.yaml +++ b/iii-permissions.yaml @@ -32,6 +32,13 @@ rules: - '!oauth::openai-codex::login' - '!run::start' - '!router::stream_assistant' + # Internal lifecycle hooks — agents must never force a config reload directly. + # The handlers already re-fetch from configuration::get (ignoring the event + # payload), so a direct call cannot inject config. These denies are + # defense-in-depth: they close the agent-callable surface so a future handler + # change cannot inadvertently re-open the injection path. + - '!storage::on-config-change' + - '!database::on-config-change' # Read-only / introspection (extend below for your tools). - state::get diff --git a/storage/Cargo.lock b/storage/Cargo.lock index 65398ac3..feaa4f8e 100644 --- a/storage/Cargo.lock +++ b/storage/Cargo.lock @@ -871,18 +871,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "const-hex" -version = "1.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20d9a563d167a9cce0f94153382b33cb6eded6dfabff03c69ad65a28ea1514e0" -dependencies = [ - "cfg-if", - "cpufeatures 0.2.17", - "proptest", - "serde_core", -] - [[package]] name = "const-oid" version = "0.9.6" @@ -1934,22 +1922,17 @@ dependencies = [ [[package]] name = "iii-observability" -version = "0.16.0-next.2" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00ee84dacaf7e14750ebdc229523e52029441c4ae1f72d25972bf1f2e1edeb99" +checksum = "35ddb7c5dc5a7cac2da5250d623a960b1dbd7de956caa57c986f6895fe905c22" dependencies = [ - "async-trait", "futures-util", "opentelemetry", "opentelemetry-http", - "opentelemetry-proto", "opentelemetry_sdk", - "prost", "reqwest 0.12.28", - "serde", "serde_json", "sysinfo", - "thiserror 2.0.18", "tokio", "tokio-tungstenite", "tracing", @@ -1958,9 +1941,9 @@ dependencies = [ [[package]] name = "iii-sdk" -version = "0.16.0-next.2" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53c18b68b2aa09e18c68fb023c78316fe7ccf42181874eab584d66f40119b47e" +checksum = "fdad3fb4a9abff08e1a384db2b8dd75a780c5cc25b07e3fc4fd3179e55439d28" dependencies = [ "async-trait", "futures-util", @@ -2354,23 +2337,6 @@ dependencies = [ "reqwest 0.12.28", ] -[[package]] -name = "opentelemetry-proto" -version = "0.31.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" -dependencies = [ - "base64", - "const-hex", - "opentelemetry", - "opentelemetry_sdk", - "prost", - "serde", - "serde_json", - "tonic", - "tonic-prost", -] - [[package]] name = "opentelemetry_sdk" version = "0.31.0" @@ -2554,21 +2520,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "proptest" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b45fcc2344c680f5025fe57779faef368840d0bd1f42f216291f0dc4ace4744" -dependencies = [ - "bitflags", - "num-traits", - "rand 0.9.4", - "rand_chacha 0.9.0", - "rand_xorshift", - "regex-syntax", - "unarray", -] - [[package]] name = "prost" version = "0.14.3" @@ -2737,15 +2688,6 @@ dependencies = [ "getrandom 0.3.4", ] -[[package]] -name = "rand_xorshift" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "513962919efc330f829edb2535844d1b912b0fbe2ca165d613e4e8788bb05a5a" -dependencies = [ - "rand_core 0.9.5", -] - [[package]] name = "redox_syscall" version = "0.5.18" @@ -3444,7 +3386,7 @@ checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" [[package]] name = "storage" -version = "0.1.1" +version = "0.1.3" dependencies = [ "anyhow", "async-trait", @@ -3948,12 +3890,6 @@ version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" -[[package]] -name = "unarray" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" - [[package]] name = "unicase" version = "2.9.0" diff --git a/storage/Cargo.toml b/storage/Cargo.toml index e45bf8c6..d473f7fa 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -14,7 +14,7 @@ path = "src/main.rs" path = "src/lib.rs" [dependencies] -iii-sdk = "=0.16.0-next.2" +iii-sdk = "=0.19.0" tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "signal", "time", "process", "io-util", "net"] } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/storage/README.md b/storage/README.md index 71d14dc4..f54d7a1c 100644 --- a/storage/README.md +++ b/storage/README.md @@ -86,42 +86,71 @@ iii.trigger(TriggerRequest { ## Configuration -`storage` reads one `config.yaml` describing one or more buckets. Each -bucket pins a `provider` (`s3` | `gcs` | `r2` | `local`) and the -credentials for that provider. Buckets without `notifications:` work -fine for RPCs; they just don't fire triggers. +The storage worker gets its live configuration from the `configuration` worker, +not from a local file. On startup it: + +1. Registers its config schema with the `configuration` worker + (`configuration::register`, id `storage`). +2. Fetches the live, env-expanded config (`configuration::get`). +3. Subscribes to `configuration:updated` events and hot-reloads. + +`--config ` is an **optional seed**: when given, the file is loaded and +sent as `initial_value` the first time the schema is registered (no stored value +yet). It is not the live source of truth — once a value exists in the +`configuration` worker, that value wins. + +### Hot-reload scope + +On a `configuration:updated` event the worker re-fetches the authoritative +config from the `configuration` worker (it does **not** trust the event +payload). It then rebuilds the in-memory **backend map only**, and only when the +bucket/notification **topology is unchanged** — i.e. when just backend +connection settings changed (credentials, endpoint, path-style). + +Any change to the bucket set, a bucket's provider or underlying name, a +notification source, or the local rustfs data dir is **refused**: the worker +keeps the previously-running backends and logs that a restart is required. +This avoids a split-brain where RPC reads/writes move to a new backend while +the notification pollers/webhook stay wired to the old topology. A failed +rebuild likewise keeps the previous backends. + +A fresh install with no configured buckets runs with zero backends until a +bucket is configured. + +### Config shape + +Each bucket pins a `provider` (`s3` | `gcs` | `r2` | `local`) and the +credentials for that provider. Buckets without `notifications:` work fine for +RPCs; they just don't fire triggers. ```yaml -workers: - - name: storage - config: - providers: - local: - data_dir: ./data/storage # rustfs sidecar root - - buckets: - uploads: - provider: s3 - bucket: my-app-uploads # underlying cloud bucket - region: us-east-1 - notifications: - sqs_queue_url: https://sqs.us-east-1.amazonaws.com/123/my-app-uploads-events - - documents: - provider: gcs - bucket: my-app-documents - # credentials_file: /etc/iii/gcs-sa.json # required for presignUrl - - avatars: - provider: r2 - bucket: avatars - account_id: ${R2_ACCOUNT_ID} - access_key_id: ${R2_ACCESS_KEY_ID} - secret_access_key: ${R2_SECRET_ACCESS_KEY} - - scratch: - provider: local - bucket: scratch +providers: + local: + data_dir: ./data/storage # rustfs sidecar root + +buckets: + uploads: + provider: s3 + bucket: my-app-uploads # underlying cloud bucket + region: us-east-1 + notifications: + sqs_queue_url: https://sqs.us-east-1.amazonaws.com/123/my-app-uploads-events + + documents: + provider: gcs + bucket: my-app-documents + # credentials_file: /etc/iii/gcs-sa.json # required for presignUrl + + avatars: + provider: r2 + bucket: avatars + account_id: ${R2_ACCOUNT_ID} + access_key_id: ${R2_ACCESS_KEY_ID} + secret_access_key: ${R2_SECRET_ACCESS_KEY} + + scratch: + provider: local + bucket: scratch ``` The map key (`uploads`) is the worker-facing bucket name handlers @@ -214,21 +243,23 @@ triggers: ## Local development & testing The committed `config.yaml` declares a single `scratch` bucket served by the -bundled rustfs sidecar, so you can run the worker against a local engine with -zero credentials. +bundled rustfs sidecar. Pass it as a seed so the `configuration` worker picks it +up on first boot — zero cloud credentials required. ```bash -# In one terminal: start the engine +# In one terminal: start the engine (must include the configuration worker) iii start -# In another: build & run the worker +# In another: build & run the worker, seeding config.yaml on first registration cargo run --release -- --url ws://127.0.0.1:49134 --config ./config.yaml ``` -The worker spawns a rustfs process on a random port, waits for it to become -healthy, then registers `storage::putObject`, `storage::getObject`, -`storage::deleteObject`, and `storage::presignUrl`. Files land under -`./data/storage/` (configurable via `providers.local.data_dir`). +The worker registers its schema with the `configuration` worker (seeding +`config.yaml` if no stored value exists), fetches the live config, then spawns a +rustfs process on a random port, waits for it to become healthy, and registers +`storage::putObject`, `storage::getObject`, `storage::deleteObject`, and +`storage::presignUrl`. Files land under `./data/storage/` (configurable via +`providers.local.data_dir`). Running `--manifest` prints the registry-publish JSON without touching the engine — useful when testing CI flows: diff --git a/storage/src/backend/factory.rs b/storage/src/backend/factory.rs index 1314796a..be9a9fe5 100644 --- a/storage/src/backend/factory.rs +++ b/storage/src/backend/factory.rs @@ -8,6 +8,7 @@ use std::sync::Arc; /// Runtime context for the local backend, supplied by main once the rustfs /// sidecar has been spawned. S3/GCS/R2 arms ignore this. +#[derive(Clone)] pub struct LocalBackendCtx { pub port: u16, pub access_key_id: String, diff --git a/storage/src/config.rs b/storage/src/config.rs index 557e4f46..9f81e749 100644 --- a/storage/src/config.rs +++ b/storage/src/config.rs @@ -1,9 +1,57 @@ //! Configuration parsing for the storage worker. -use serde::Deserialize; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::BTreeMap; use std::collections::HashMap; -#[derive(Debug, Clone, Default, Deserialize)] +/// A signature of everything the boot-time notification/sidecar wiring depends +/// on. See [`WorkerConfig::topology`]. Two configs with equal topology differ +/// only in backend-connection settings that can be hot-applied; any other +/// difference requires a worker restart. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct Topology { + /// The configured rustfs data dir (may be relative) captured only when at + /// least one `provider: local` bucket exists; `None` otherwise. + pub local_data_dir: Option, + pub buckets: BTreeMap, +} + +/// Topology projection of one bucket — provider, underlying name, and +/// notification source. Compared by value; backend-connection fields like +/// credentials/endpoint are intentionally excluded. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketTopology { + /// One of "s3", "gcs", "r2", or "local". + pub provider: &'static str, + /// The underlying object-store bucket name override, if set. + pub underlying: Option, + /// The bucket's notification source identity; `None` when the bucket has no + /// notifications. + pub notifications: Option, +} + +/// Canonical identity of a bucket's notification source — exactly what the +/// boot-time pollers/webhook key on. Compared by value; never logged. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum NotificationKey { + Sqs { + queue_url: String, + region: String, + }, + Pubsub { + subscription: String, + }, + CfQueue { + account_id: String, + queue_id: String, + api_token: String, + }, + RustfsWebhook, +} + +#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)] pub struct WorkerConfig { #[serde(default)] pub providers: ProvidersConfig, @@ -11,12 +59,12 @@ pub struct WorkerConfig { pub buckets: HashMap, } -#[derive(Debug, Clone, Default, Deserialize)] +#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)] pub struct ProvidersConfig { pub local: Option, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] pub struct LocalProviderConfig { #[serde(default = "default_local_data_dir")] pub data_dir: String, @@ -26,7 +74,7 @@ fn default_local_data_dir() -> String { "./data/storage".to_string() } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] #[serde(tag = "provider", rename_all = "lowercase")] pub enum BucketConfig { S3(S3BucketConfig), @@ -35,7 +83,7 @@ pub enum BucketConfig { Local(LocalBucketConfig), } -#[derive(Clone, Deserialize)] +#[derive(Clone, Deserialize, Serialize, JsonSchema)] pub struct S3BucketConfig { pub bucket: Option, pub region: String, @@ -75,12 +123,12 @@ impl std::fmt::Debug for S3BucketConfig { } } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] pub struct S3Notifications { pub sqs_queue_url: String, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] pub struct GcsBucketConfig { pub bucket: Option, pub credentials_file: Option, @@ -91,12 +139,12 @@ pub struct GcsBucketConfig { pub notifications: Option, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] pub struct GcsNotifications { pub pubsub_subscription: String, } -#[derive(Clone, Deserialize)] +#[derive(Clone, Deserialize, Serialize, JsonSchema)] pub struct R2BucketConfig { pub bucket: Option, pub account_id: String, @@ -123,7 +171,7 @@ impl std::fmt::Debug for R2BucketConfig { } } -#[derive(Clone, Deserialize)] +#[derive(Clone, Deserialize, Serialize, JsonSchema)] pub struct R2Notifications { pub queue_id: String, pub api_token: String, @@ -138,7 +186,7 @@ impl std::fmt::Debug for R2Notifications { } } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] pub struct LocalBucketConfig { pub bucket: Option, } @@ -161,16 +209,108 @@ impl WorkerConfig { if self.buckets.is_empty() { return Err("config must declare at least one bucket".into()); } - // Note: a `provider: local` bucket without `providers.local` is valid; - // the default data_dir is materialised at backend init time, not here. - // Trigger ↔ bucket cross-validation runs at trigger registration time - // (handler.rs) — the worker config never sees the trigger spec since - // triggers are registered dynamically via the SDK. + self.validate_bucket_names() + } + + /// Per-bucket-name validation only — no "at least one bucket" requirement. + /// The live configuration (fetched from the configuration worker) and the + /// built-in default may legitimately declare zero buckets on a fresh + /// install, in which case the worker runs with no backends until the + /// operator configures one. + fn validate_bucket_names(&self) -> Result<(), String> { for name in self.buckets.keys() { validate_bucket_name(name).map_err(|e| format!("bucket `{name}`: {e}"))?; } Ok(()) } + + /// Parse a config from a JSON value already env-expanded by the + /// configuration worker. Unlike [`from_yaml`], zero buckets is allowed. + pub fn from_json(value: &Value) -> Result { + let cfg: WorkerConfig = + serde_json::from_value(value.clone()).map_err(|e| format!("json parse: {e}"))?; + cfg.validate_bucket_names()?; + Ok(cfg) + } + + pub fn to_json(&self) -> Value { + serde_json::to_value(self).expect("WorkerConfig serializes") + } + + /// Build the topology signature used to decide whether a live config update + /// can be hot-applied (backends only) or requires a restart. Captures the + /// bucket set, each bucket's provider + underlying name + notification + /// source, and the rustfs data dir — i.e. exactly what `main.rs` reads once + /// at startup to wire `wired_buckets`, the webhook receiver, and the + /// SQS/Pub-Sub/CF-Queue pollers. + pub fn topology(&self) -> Topology { + let needs_local = self + .buckets + .values() + .any(|b| matches!(b, BucketConfig::Local(_))); + let local_data_dir = needs_local.then(|| { + self.providers + .local + .as_ref() + .map(|l| l.data_dir.clone()) + .unwrap_or_else(default_local_data_dir) + }); + let mut buckets = BTreeMap::new(); + for (name, bc) in &self.buckets { + let entry = match bc { + BucketConfig::S3(s) => BucketTopology { + provider: "s3", + underlying: s.bucket.clone(), + notifications: s.notifications.as_ref().map(|n| NotificationKey::Sqs { + queue_url: n.sqs_queue_url.clone(), + region: s.region.clone(), + }), + }, + BucketConfig::Gcs(g) => BucketTopology { + provider: "gcs", + underlying: g.bucket.clone(), + notifications: g.notifications.as_ref().map(|n| NotificationKey::Pubsub { + subscription: n.pubsub_subscription.clone(), + }), + }, + BucketConfig::R2(r) => BucketTopology { + provider: "r2", + underlying: r.bucket.clone(), + notifications: r.notifications.as_ref().map(|n| NotificationKey::CfQueue { + account_id: r.account_id.clone(), + queue_id: n.queue_id.clone(), + api_token: n.api_token.clone(), + }), + }, + BucketConfig::Local(l) => BucketTopology { + provider: "local", + underlying: l.bucket.clone(), + // Local buckets are always wired to the rustfs webhook. + notifications: Some(NotificationKey::RustfsWebhook), + }, + }; + buckets.insert(name.clone(), entry); + } + Topology { + local_data_dir, + buckets, + } + } + + pub fn json_schema() -> Value { + let root = schemars::schema_for!(WorkerConfig); + let mut schema = + serde_json::to_value(&root.schema).expect("WorkerConfig JSON Schema serializes"); + if let Some(obj) = schema.as_object_mut() { + if !root.definitions.is_empty() { + obj.insert( + "definitions".into(), + serde_json::to_value(&root.definitions).expect("definitions serialize"), + ); + } + } + schema + } } /// Validate a worker-facing bucket name. Matches a tightened S3 bucket naming @@ -523,4 +663,143 @@ buckets: assert_eq!(redact_secret("AKIAIOSFODNN7EXAMPLE"), "***[20]***"); assert_eq!(redact_secret(""), "***[0]***"); } + + #[test] + fn to_json_from_json_roundtrips() { + let yaml = + "buckets:\n uploads:\n provider: s3\n bucket: my-app\n region: us-east-1\n"; + let cfg = WorkerConfig::from_yaml(yaml).unwrap(); + let json = cfg.to_json(); + let back = WorkerConfig::from_json(&json).unwrap(); + match &back.buckets["uploads"] { + BucketConfig::S3(s) => { + assert_eq!(s.region, "us-east-1"); + assert_eq!(s.bucket.as_deref(), Some("my-app")); + } + other => panic!("expected S3, got {other:?}"), + } + } + + #[test] + fn from_json_tolerates_zero_buckets() { + let back = WorkerConfig::from_json(&serde_json::json!({ "buckets": {} })).unwrap(); + assert!(back.buckets.is_empty()); + } + + #[test] + fn default_serializes_and_reparses_as_empty() { + let json = WorkerConfig::default().to_json(); + let back = WorkerConfig::from_json(&json).unwrap(); + assert!(back.buckets.is_empty()); + } + + #[test] + fn from_json_still_validates_bucket_names() { + let err = WorkerConfig::from_json(&serde_json::json!({ + "buckets": { "Bad Name": { "provider": "local" } } + })) + .unwrap_err(); + assert!(err.contains("Bad Name"), "got: {err}"); + } + + #[test] + fn from_yaml_still_requires_at_least_one_bucket() { + let err = WorkerConfig::from_yaml("buckets: {}\n").unwrap_err(); + assert!(err.contains("at least one bucket"), "got: {err}"); + } + + #[test] + fn json_schema_has_buckets_property() { + let schema = WorkerConfig::json_schema(); + assert!(schema + .get("properties") + .and_then(|p| p.get("buckets")) + .is_some()); + } + + #[test] + fn topology_ignores_credential_changes() { + let a = WorkerConfig::from_yaml( + "buckets:\n up:\n provider: r2\n account_id: acc\n access_key_id: k1\n secret_access_key: s1\n", + ) + .unwrap(); + let b = WorkerConfig::from_yaml( + "buckets:\n up:\n provider: r2\n account_id: acc\n access_key_id: k2\n secret_access_key: s2\n", + ) + .unwrap(); + assert_eq!(a.topology(), b.topology()); + } + + #[test] + fn topology_ignores_s3_endpoint_change() { + let a = WorkerConfig::from_yaml( + "buckets:\n up:\n provider: s3\n region: us-east-1\n bucket: b\n", + ) + .unwrap(); + let b = WorkerConfig::from_yaml( + "buckets:\n up:\n provider: s3\n region: us-east-1\n bucket: b\n endpoint_url: http://minio:9000\n", + ) + .unwrap(); + assert_eq!(a.topology(), b.topology()); + } + + #[test] + fn topology_changes_when_bucket_added() { + let a = + WorkerConfig::from_yaml("buckets:\n up:\n provider: s3\n region: us-east-1\n") + .unwrap(); + let b = WorkerConfig::from_yaml( + "buckets:\n up:\n provider: s3\n region: us-east-1\n extra:\n provider: s3\n region: us-east-1\n", + ) + .unwrap(); + assert_ne!(a.topology(), b.topology()); + } + + #[test] + fn topology_changes_when_notification_source_changes() { + let a = WorkerConfig::from_yaml( + "buckets:\n up:\n provider: s3\n region: us-east-1\n notifications:\n sqs_queue_url: https://sqs/old\n", + ) + .unwrap(); + let b = WorkerConfig::from_yaml( + "buckets:\n up:\n provider: s3\n region: us-east-1\n notifications:\n sqs_queue_url: https://sqs/new\n", + ) + .unwrap(); + assert_ne!(a.topology(), b.topology()); + } + + #[test] + fn topology_changes_when_s3_region_changes_with_notifications() { + let a = WorkerConfig::from_yaml( + "buckets:\n up:\n provider: s3\n region: us-east-1\n notifications:\n sqs_queue_url: https://sqs/q\n", + ) + .unwrap(); + let b = WorkerConfig::from_yaml( + "buckets:\n up:\n provider: s3\n region: eu-west-1\n notifications:\n sqs_queue_url: https://sqs/q\n", + ) + .unwrap(); + assert_ne!(a.topology(), b.topology()); + } + + #[test] + fn topology_changes_when_local_data_dir_changes() { + let a = WorkerConfig::from_yaml( + "providers:\n local:\n data_dir: /a\nbuckets:\n up:\n provider: local\n", + ) + .unwrap(); + let b = WorkerConfig::from_yaml( + "providers:\n local:\n data_dir: /b\nbuckets:\n up:\n provider: local\n", + ) + .unwrap(); + assert_ne!(a.topology(), b.topology()); + } + + #[test] + fn topology_changes_when_provider_changes() { + let a = + WorkerConfig::from_yaml("buckets:\n up:\n provider: s3\n region: us-east-1\n") + .unwrap(); + let b = WorkerConfig::from_yaml("buckets:\n up:\n provider: local\n").unwrap(); + assert_ne!(a.topology(), b.topology()); + } } diff --git a/storage/src/configuration.rs b/storage/src/configuration.rs new file mode 100644 index 00000000..ddac0c1f --- /dev/null +++ b/storage/src/configuration.rs @@ -0,0 +1,250 @@ +//! Integration with the `configuration` worker — register, fetch, and hot-reload +//! the `storage` configuration entry. + +use crate::backend::factory::{self, LocalBackendCtx}; +use crate::backend::Backend; +use crate::config::{Topology, WorkerConfig}; +use crate::handlers::AppState; +use iii_sdk::{IIIError, RegisterFunction, RegisterTriggerInput, TriggerRequest, III}; +use serde_json::{json, Value}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +pub const CONFIG_ID: &str = "storage"; +const CONFIG_FN_ID: &str = "storage::on-config-change"; +const CONFIG_TIMEOUT_MS: u64 = 5_000; +const CONFIG_RETRIES: u32 = 3; + +/// Register the `storage` configuration schema with the configuration worker. +/// When `seed` is present, its value is installed as `initial_value`. Otherwise, +/// the built-in default is seeded only when no stored value exists yet. +pub async fn register_config(iii: &III, seed: Option<&WorkerConfig>) -> Result<(), String> { + let mut payload = json!({ + "id": CONFIG_ID, + "name": "Storage", + "description": "Object storage buckets across AWS S3, GCS, Cloudflare R2, and a managed local rustfs backend.", + "schema": WorkerConfig::json_schema(), + }); + if let Some(seed) = seed { + payload["initial_value"] = seed.to_json(); + } else if should_seed_default_value(iii).await? { + payload["initial_value"] = WorkerConfig::default().to_json(); + } + trigger_with_retry(iii, "configuration::register", payload).await?; + Ok(()) +} + +/// Read the live `storage` configuration (env-expanded by the configuration worker). +pub async fn fetch_config(iii: &III) -> Result { + let value = get_config_value(iii).await?; + if value.is_null() { + tracing::info!("no configuration value found; using built-in default configuration"); + return Ok(WorkerConfig::default()); + } + WorkerConfig::from_json(&value) +} + +async fn should_seed_default_value(iii: &III) -> Result { + match try_get_config_value(iii).await? { + None => Ok(true), + Some(value) if value.is_null() => Ok(true), + Some(_) => Ok(false), + } +} + +async fn get_config_value(iii: &III) -> Result { + try_get_config_value(iii) + .await? + .ok_or_else(|| format!("configuration `{CONFIG_ID}` not found")) +} + +/// Returns `Ok(None)` when the entry does not exist (`NOT_FOUND`). +async fn try_get_config_value(iii: &III) -> Result, String> { + match trigger_with_retry(iii, "configuration::get", json!({ "id": CONFIG_ID })).await { + Ok(resp) => Ok(resp.get("value").cloned()), + Err(e) if e.contains("NOT_FOUND") => Ok(None), + Err(e) => Err(e), + } +} + +/// Build object-storage backends for every configured bucket. `local_ctx` is the +/// running rustfs sidecar context (port + ephemeral credentials); it must be +/// `Some` whenever the config declares a `provider: local` bucket, otherwise the +/// local backend build fails. On hot-reload the boot-time `local_ctx` is reused — +/// the sidecar is never respawned. +pub async fn build_backends( + cfg: &WorkerConfig, + local_ctx: Option<&LocalBackendCtx>, +) -> Result>, String> { + let mut backends = HashMap::new(); + for (name, bucket_cfg) in &cfg.buckets { + let b = factory::build(name, bucket_cfg, &cfg.providers, local_ctx) + .await + .map_err(|e| format!("building backend `{name}`: {e}"))?; + tracing::info!(bucket = %name, provider = b.provider(), "backend ready"); + backends.insert(name.clone(), b); + } + Ok(backends) +} + +/// Decide whether a freshly-fetched config can be hot-applied. Returns the +/// config when its topology matches the boot-time topology (only +/// backend-connection settings changed), or an error describing the +/// topology change that requires a worker restart. +fn reloadable(cfg: WorkerConfig, boot_topology: &Topology) -> Result { + if cfg.topology() != *boot_topology { + return Err( + "configuration change alters bucket/notification topology (bucket add/remove, \ + provider, underlying-bucket, notification-source, or local data-dir)" + .to_string(), + ); + } + Ok(cfg) +} + +/// Rebuild the backend map from `cfg` and hot-swap it under the write lock. The +/// rustfs sidecar, webhook receiver, and notification pollers are NOT touched — +/// adding/removing a local bucket or changing a notifications source requires a +/// worker restart. A failed rebuild leaves the running backends untouched. +pub async fn apply_config(state: &AppState, cfg: WorkerConfig) -> Result<(), String> { + let new_backends = build_backends(&cfg, state.local_ctx.as_ref()).await?; + let mut backends_guard = state.backends.write().await; + *backends_guard = new_backends; + Ok(()) +} + +/// Register the internal config-change handler and bind a `configuration` trigger. +/// +/// `boot_topology` is the bucket/notification topology captured at startup; any +/// reload that would change it is refused (those require a worker restart). +pub fn register_config_trigger( + iii: &III, + state: AppState, + boot_topology: Topology, +) -> Result<(), IIIError> { + let st = state.clone(); + let engine = iii.clone(); + iii.register_function( + CONFIG_FN_ID, + RegisterFunction::new_async(move |_payload: Value| { + let st = st.clone(); + let engine = engine.clone(); + let boot_topology = boot_topology.clone(); + async move { + on_config_change(&engine, &st, &boot_topology).await; + Ok::(json!({ "ok": true })) + } + }) + .description( + "Internal: rebuild storage backends from the authoritative configuration when it changes.", + ), + ); + + iii.register_trigger(RegisterTriggerInput { + trigger_type: "configuration".to_string(), + function_id: CONFIG_FN_ID.to_string(), + config: json!({ + "configuration_id": CONFIG_ID, + "event_types": ["configuration:updated"], + }), + metadata: None, + })?; + Ok(()) +} + +/// Reload backends from the AUTHORITATIVE configuration. +/// +/// The caller-supplied trigger payload is intentionally ignored: +/// `storage::on-config-change` is a discoverable bus function, so trusting +/// `payload.new_value` would let any caller inject arbitrary backend config +/// (redirecting writes or wiping backends) without updating persisted state. +/// Instead we re-fetch the stored value via `configuration::get`. A +/// topology-changing update is refused (it requires a restart). +async fn on_config_change(iii: &III, state: &AppState, boot_topology: &Topology) { + let cfg = match fetch_config(iii).await { + Ok(cfg) => cfg, + Err(e) => { + tracing::error!( + error = %e, + "config-change: failed to fetch authoritative configuration; keeping previous backends" + ); + return; + } + }; + let cfg = match reloadable(cfg, boot_topology) { + Ok(cfg) => cfg, + Err(reason) => { + tracing::warn!( + reason = %reason, + "config-change refused: topology change requires a worker restart; keeping previous backends" + ); + return; + } + }; + match apply_config(state, cfg).await { + Ok(()) => tracing::info!( + "storage backends reloaded after configuration change (topology unchanged; only backend connection settings updated)" + ), + Err(e) => tracing::error!( + error = %e, + "failed to rebuild backends after configuration change; keeping previous backends" + ), + } +} + +async fn trigger_with_retry(iii: &III, function_id: &str, payload: Value) -> Result { + let mut last_err = String::new(); + for attempt in 1..=CONFIG_RETRIES { + match iii + .trigger(TriggerRequest { + function_id: function_id.to_string(), + payload: payload.clone(), + action: None, + timeout_ms: Some(CONFIG_TIMEOUT_MS), + }) + .await + { + Ok(v) => return Ok(v), + Err(e) => { + last_err = e.to_string(); + if attempt < CONFIG_RETRIES { + tracing::warn!( + function_id, + attempt, + error = %last_err, + "configuration RPC failed; retrying" + ); + tokio::time::sleep(Duration::from_millis(250 * u64::from(attempt))).await; + } + } + } + } + Err(format!( + "{function_id} failed after {CONFIG_RETRIES} attempts: {last_err}" + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::WorkerConfig; + + fn cfg(yaml: &str) -> WorkerConfig { + WorkerConfig::from_yaml(yaml).unwrap() + } + + #[test] + fn reloadable_allows_credential_only_change() { + let boot = cfg("buckets:\n up:\n provider: r2\n account_id: a\n access_key_id: k1\n secret_access_key: s1\n"); + let next = cfg("buckets:\n up:\n provider: r2\n account_id: a\n access_key_id: k2\n secret_access_key: s2\n"); + assert!(reloadable(next, &boot.topology()).is_ok()); + } + + #[test] + fn reloadable_refuses_topology_change() { + let boot = cfg("buckets:\n up:\n provider: s3\n region: us-east-1\n"); + let next = cfg("buckets:\n up:\n provider: s3\n region: us-east-1\n added:\n provider: s3\n region: us-east-1\n"); + assert!(reloadable(next, &boot.topology()).is_err()); + } +} diff --git a/storage/src/handlers/delete_object.rs b/storage/src/handlers/delete_object.rs index 461eda07..5663ddfb 100644 --- a/storage/src/handlers/delete_object.rs +++ b/storage/src/handlers/delete_object.rs @@ -24,7 +24,7 @@ pub async fn handle(state: &AppState, req: DeleteReq) -> Result>) -> AppState { let m = Arc::new(MockBackend::default()); @@ -57,7 +58,8 @@ mod tests { let mut map = HashMap::new(); map.insert("uploads".to_string(), m as Arc); AppState { - backends: Arc::new(map), + backends: Arc::new(RwLock::new(map)), + local_ctx: None, } } diff --git a/storage/src/handlers/get_object.rs b/storage/src/handlers/get_object.rs index a4e143f4..8b28d5f5 100644 --- a/storage/src/handlers/get_object.rs +++ b/storage/src/handlers/get_object.rs @@ -31,7 +31,7 @@ pub async fn handle(state: &AppState, req: GetReq) -> Result { message: "key must not be empty".into(), })); } - let backend = state.backend(&req.bucket).map_err(err_to_str)?; + let backend = state.backend(&req.bucket).await.map_err(err_to_str)?; let resp = backend .get(BackendGetReq { key: req.key.clone(), @@ -67,6 +67,7 @@ mod tests { use serde_json::json; use std::collections::HashMap; use std::sync::Arc; + use tokio::sync::RwLock; fn state_with( resp: Option>, @@ -77,7 +78,8 @@ mod tests { map.insert("uploads".to_string(), m.clone() as Arc); ( AppState { - backends: Arc::new(map), + backends: Arc::new(RwLock::new(map)), + local_ctx: None, }, m, ) diff --git a/storage/src/handlers/mod.rs b/storage/src/handlers/mod.rs index e762548b..a24b5622 100644 --- a/storage/src/handlers/mod.rs +++ b/storage/src/handlers/mod.rs @@ -1,10 +1,11 @@ //! RPC handlers for `storage::*` functions. +use crate::backend::factory::LocalBackendCtx; use crate::backend::Backend; use crate::error::StorageError; -use iii_sdk::{IIIError, RegisterFunction, III}; use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::RwLock; pub mod delete_object; pub mod get_object; @@ -17,14 +18,22 @@ pub const INLINE_BODY_CAP: u64 = 10 * 1024 * 1024; #[derive(Clone)] pub struct AppState { - /// Keyed by the **worker-facing bucket name** (config map key). - pub backends: Arc>>, + /// Keyed by the **worker-facing bucket name** (config map key). Wrapped in + /// an `RwLock` so the configuration-change handler can hot-swap the whole + /// map without restarting the worker. + pub backends: Arc>>>, + /// Boot-time rustfs sidecar context, reused when rebuilding local backends + /// on a config change (the sidecar is never respawned). `None` when no + /// `provider: local` bucket was configured at boot. + pub local_ctx: Option, } impl AppState { - pub fn backend(&self, bucket: &str) -> Result<&Arc, StorageError> { - self.backends + pub async fn backend(&self, bucket: &str) -> Result, StorageError> { + let backends = self.backends.read().await; + backends .get(bucket) + .cloned() .ok_or_else(|| StorageError::UnknownBucket { bucket: bucket.to_string(), }) @@ -35,69 +44,3 @@ impl AppState { pub fn err_to_str(e: StorageError) -> String { e.to_wire_string() } - -/// Register every `storage::*` RPC function with the engine. -/// -/// Spec contract (`workers/binary-worker.md` §7): a worker exposes a single -/// `register_all` per-folder so `main.rs` stays a thin entry point. -pub fn register_all(iii: &III, state: &AppState) { - register_put_object(iii, state); - register_get_object(iii, state); - register_delete_object(iii, state); - register_presign_url(iii, state); - tracing::info!("storage handlers registered"); -} - -fn register_put_object(iii: &III, state: &AppState) { - let st = state.clone(); - iii.register_function( - "storage::putObject", - RegisterFunction::new_async(move |req: put_object::PutReq| { - let st = st.clone(); - async move { put_object::handle(&st, req).await.map_err(IIIError::from) } - }) - .description("Write an object to a configured bucket. Body is base64; max 10MB inline."), - ); -} - -fn register_get_object(iii: &III, state: &AppState) { - let st = state.clone(); - iii.register_function( - "storage::getObject", - RegisterFunction::new_async(move |req: get_object::GetReq| { - let st = st.clone(); - async move { get_object::handle(&st, req).await.map_err(IIIError::from) } - }) - .description("Read an object. Body is base64; for large objects use presignUrl."), - ); -} - -fn register_delete_object(iii: &III, state: &AppState) { - let st = state.clone(); - iii.register_function( - "storage::deleteObject", - RegisterFunction::new_async(move |req: delete_object::DeleteReq| { - let st = st.clone(); - async move { - delete_object::handle(&st, req) - .await - .map_err(IIIError::from) - } - }) - .description("Delete an object. No-op when the object does not exist."), - ); -} - -fn register_presign_url(iii: &III, state: &AppState) { - let st = state.clone(); - iii.register_function( - "storage::presignUrl", - RegisterFunction::new_async(move |req: presign_url::PresignReq| { - let st = st.clone(); - async move { presign_url::handle(&st, req).await.map_err(IIIError::from) } - }) - .description( - "Issue a short-lived URL the browser can hit directly to PUT or GET an object.", - ), - ); -} diff --git a/storage/src/handlers/presign_url.rs b/storage/src/handlers/presign_url.rs index 532517e8..6db01302 100644 --- a/storage/src/handlers/presign_url.rs +++ b/storage/src/handlers/presign_url.rs @@ -60,7 +60,7 @@ pub async fn handle(state: &AppState, req: PresignReq) -> Result AppState { let m = Arc::new(MockBackend::default()); let mut map = HashMap::new(); map.insert("uploads".to_string(), m as Arc); AppState { - backends: Arc::new(map), + backends: Arc::new(RwLock::new(map)), + local_ctx: None, } } diff --git a/storage/src/handlers/put_object.rs b/storage/src/handlers/put_object.rs index 21a383f3..82a586f2 100644 --- a/storage/src/handlers/put_object.rs +++ b/storage/src/handlers/put_object.rs @@ -37,7 +37,7 @@ pub async fn handle(state: &AppState, req: PutReq) -> Result { message: "key must not be empty".into(), })); } - let backend = state.backend(&req.bucket).map_err(err_to_str)?; + let backend = state.backend(&req.bucket).await.map_err(err_to_str)?; // Reject before allocating: a base64 string of length L decodes to at // most (L*3)/4 bytes. If even the upper bound exceeds the cap, refuse // without ever allocating the decode buffer. @@ -93,6 +93,7 @@ mod tests { use crate::backend::Backend; use serde_json::json; use std::sync::Arc; + use tokio::sync::RwLock; /// Returns `(AppState, Arc)` — keep the typed Arc for /// inspection alongside the trait-object stored in AppState. @@ -102,7 +103,8 @@ mod tests { map.insert("uploads".to_string(), m.clone() as Arc); ( AppState { - backends: Arc::new(map), + backends: Arc::new(RwLock::new(map)), + local_ctx: None, }, m, ) diff --git a/storage/src/lib.rs b/storage/src/lib.rs index f7bfc58f..de334fbb 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -2,6 +2,7 @@ pub mod backend; pub mod config; +pub mod configuration; pub mod error; pub mod handlers; pub mod manifest; diff --git a/storage/src/main.rs b/storage/src/main.rs index 16b0f8ca..5fb5df1d 100644 --- a/storage/src/main.rs +++ b/storage/src/main.rs @@ -1,25 +1,30 @@ use anyhow::{Context, Result}; use clap::Parser; -use iii_sdk::{register_worker, InitOptions, RegisterTriggerType, WorkerMetadata}; +use iii_sdk::{ + register_worker, IIIError, InitOptions, RegisterFunction, RegisterTriggerType, WorkerMetadata, +}; use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use storage::backend::factory::{self, LocalBackendCtx}; +use storage::backend::factory::LocalBackendCtx; use storage::backend::local; use storage::config::{redact_url, BucketConfig as CfgBucket, WorkerConfig}; -use storage::handlers::AppState; +use storage::configuration; +use storage::handlers::{delete_object, get_object, presign_url, put_object, AppState}; use storage::rustfs::{health, spawn}; use storage::triggers::dispatcher::EngineDispatcher; use storage::triggers::handler::{ObjectCreatedHandler, ObjectDeletedHandler}; use storage::triggers::pollers::{cf_queue, pubsub, rustfs_webhook, sqs}; use storage::triggers::registry::TriggerRegistry; +use tokio::sync::RwLock; #[derive(Parser, Debug)] #[command(name = "storage", about = "storage worker")] struct Cli { - #[arg(long, default_value = "./config.yaml")] - config: String, + /// Optional seed config.yaml used to populate `initial_value` on first register. + #[arg(long)] + config: Option, #[arg(long, default_value = "ws://127.0.0.1:49134")] url: String, /// Print the registry publish manifest as JSON and exit. No engine connection. @@ -49,23 +54,55 @@ async fn main() -> Result<()> { tracing::info!( name = storage::worker_name(), - config = %cli.config, + seed_config = cli.config.as_deref().unwrap_or("(none)"), url = %redact_url(&cli.url), "starting" ); - let cfg = match WorkerConfig::from_file(&cli.config) { - Ok(c) => c, - Err(e) => { - tracing::warn!( - error = %e, - path = %cli.config, - "failed to load config, using defaults" - ); - WorkerConfig::default() - } + let iii = register_worker( + &cli.url, + InitOptions { + metadata: Some(WorkerMetadata { + runtime: "rust".to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + name: storage::worker_name().to_string(), + os: std::env::consts::OS.to_string(), + pid: Some(std::process::id()), + telemetry: None, + ..WorkerMetadata::default() + }), + ..Default::default() + }, + ); + + let seed = match &cli.config { + Some(path) => match WorkerConfig::from_file(path) { + Ok(cfg) => { + tracing::info!(path = %path, "loaded seed config for initial registration"); + Some(cfg) + } + Err(e) => { + tracing::warn!( + path = %path, + error = %e, + "failed to load seed config; relying on existing configuration entry" + ); + None + } + }, + None => None, }; + configuration::register_config(&iii, seed.as_ref()) + .await + .map_err(anyhow::Error::msg) + .context("registering storage configuration schema")?; + + let cfg = configuration::fetch_config(&iii) + .await + .map_err(anyhow::Error::msg) + .context("loading storage configuration")?; + // Pre-compute which buckets have a notifications source. Trigger // registrations targeting any other bucket fail fast in the handler. let mut wired_buckets: HashSet = HashSet::new(); @@ -145,36 +182,16 @@ async fn main() -> Result<()> { tracing::info!(port, "rustfs sidecar ready"); } - let mut backends = HashMap::new(); - for (name, bucket_cfg) in &cfg.buckets { - let b = factory::build(name, bucket_cfg, &cfg.providers, local_ctx.as_ref()) - .await - .map_err(|e| anyhow::anyhow!(format!("{e}"))) - .with_context(|| format!("building backend `{name}`"))?; - tracing::info!(bucket = %name, provider = b.provider(), "backend ready"); - backends.insert(name.clone(), b); - } + let backends = configuration::build_backends(&cfg, local_ctx.as_ref()) + .await + .map_err(anyhow::Error::msg) + .context("building initial storage backends")?; let state = AppState { - backends: Arc::new(backends), + backends: Arc::new(RwLock::new(backends)), + local_ctx: local_ctx.clone(), }; - let iii = register_worker( - &cli.url, - InitOptions { - metadata: Some(WorkerMetadata { - runtime: "rust".to_string(), - version: env!("CARGO_PKG_VERSION").to_string(), - name: storage::worker_name().to_string(), - os: std::env::consts::OS.to_string(), - pid: Some(std::process::id()), - telemetry: None, - ..WorkerMetadata::default() - }), - ..Default::default() - }, - ); - let registry = Arc::new(TriggerRegistry::new()); let dispatcher = Arc::new(EngineDispatcher::new(iii.clone(), registry.clone())); @@ -368,7 +385,59 @@ async fn main() -> Result<()> { tracing::info!(queue_id = %queue_id, "cf-queue poller started"); } - storage::handlers::register_all(&iii, &state); + // Register the four storage::* RPC functions inline. + { + let st = state.clone(); + iii.register_function( + "storage::putObject", + RegisterFunction::new_async(move |req: put_object::PutReq| { + let st = st.clone(); + async move { put_object::handle(&st, req).await.map_err(IIIError::from) } + }) + .description( + "Write an object to a configured bucket. Body is base64; max 10MB inline.", + ), + ); + } + { + let st = state.clone(); + iii.register_function( + "storage::getObject", + RegisterFunction::new_async(move |req: get_object::GetReq| { + let st = st.clone(); + async move { get_object::handle(&st, req).await.map_err(IIIError::from) } + }) + .description("Read an object. Body is base64; for large objects use presignUrl."), + ); + } + { + let st = state.clone(); + iii.register_function( + "storage::deleteObject", + RegisterFunction::new_async(move |req: delete_object::DeleteReq| { + let st = st.clone(); + async move { + delete_object::handle(&st, req) + .await + .map_err(IIIError::from) + } + }) + .description("Delete an object. No-op when the object does not exist."), + ); + } + { + let st = state.clone(); + iii.register_function( + "storage::presignUrl", + RegisterFunction::new_async(move |req: presign_url::PresignReq| { + let st = st.clone(); + async move { presign_url::handle(&st, req).await.map_err(IIIError::from) } + }) + .description( + "Issue a short-lived URL the browser can hit directly to PUT or GET an object.", + ), + ); + } let _ = iii.register_trigger_type(RegisterTriggerType::new( "storage::object-created", @@ -387,6 +456,9 @@ async fn main() -> Result<()> { }, )); + configuration::register_config_trigger(&iii, state.clone(), cfg.topology()) + .context("registering configuration change trigger")?; + tracing::info!("storage registered 4 functions and 2 trigger types, waiting for invocations"); wait_for_shutdown_signal().await?; tracing::info!("storage shutting down");