From 87f19743f97f178e63d11c9834e7dbd0cb8ae27a Mon Sep 17 00:00:00 2001 From: Anderson Leal Date: Mon, 15 Jun 2026 16:37:56 -0300 Subject: [PATCH] feat(iii-directory): migrate runtime config to the configuration worker iii-directory was the last common Rust worker still loading its runtime config from a local config.yaml parsed once at boot. Move it to the engine-built-in `configuration` worker (id `iii-directory`), mirroring the `database`/`storage` pattern: register a JSON Schema + seed at boot, read the authoritative env-expanded value via `configuration::get`, and bind a `configuration:updated` trigger for hot reload. Full hot-reload parity with `database`: - Tunable fields apply live: registry_url, download_timeout_ms, registry_cache_ttl_ms, filter_unregistered. - Topology changes are refused with a "restart required" log: skills_folder, local_skills_folder, auto_download. Implementation: - New src/configuration.rs (port of database/src/configuration.rs) with a SharedState bundling the live config, shared cache TTL, both caches, and the boot topology. on_config_change re-fetches the authoritative value (ignores the trigger payload), refuses topology changes, else swaps config + updates TTL + clears caches. - SkillsConfig: derive JsonSchema; add json_schema/from_json/to_json/from_file/ topology/into_shared and the SharedConfig alias (Arc>). - Handlers snapshot the live config per call via load_full(); RegistryCache and RegisteredWorkersCache read a shared Arc TTL. - main.rs: --config is now an optional seed; connect -> register_config -> fetch_config -> build state/caches -> register fns -> register_config_trigger. - config.yaml -> config.yaml.example (seed); README Configure section + manifest default_config updated. Verified: cargo build, clippy --all-targets, fmt --check, 249 lib tests, and all test targets compile + link. Co-Authored-By: Claude Opus 4.7 (1M context) --- iii-directory/Cargo.lock | 12 +- iii-directory/Cargo.toml | 1 + iii-directory/README.md | 60 +++++-- iii-directory/config.yaml | 16 -- iii-directory/config.yaml.example | 34 ++++ iii-directory/src/config.rs | 164 ++++++++++++++++- iii-directory/src/configuration.rs | 226 ++++++++++++++++++++++++ iii-directory/src/functions/download.rs | 16 +- iii-directory/src/functions/mod.rs | 16 +- iii-directory/src/functions/prompts.rs | 12 +- iii-directory/src/functions/registry.rs | 58 ++++-- iii-directory/src/functions/skills.rs | 57 +++--- iii-directory/src/lib.rs | 1 + iii-directory/src/main.rs | 144 ++++++++++----- iii-directory/src/manifest.rs | 5 +- iii-directory/tests/common/workers.rs | 5 +- 16 files changed, 688 insertions(+), 139 deletions(-) delete mode 100644 iii-directory/config.yaml create mode 100644 iii-directory/config.yaml.example create mode 100644 iii-directory/src/configuration.rs diff --git a/iii-directory/Cargo.lock b/iii-directory/Cargo.lock index 83abad8f..c851022a 100644 --- a/iii-directory/Cargo.lock +++ b/iii-directory/Cargo.lock @@ -76,6 +76,15 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "arc-swap" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207" +dependencies = [ + "rustversion", +] + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -1065,9 +1074,10 @@ dependencies = [ [[package]] name = "iii-directory" -version = "0.8.0" +version = "0.8.1" dependencies = [ "anyhow", + "arc-swap", "async-trait", "chrono", "clap", diff --git a/iii-directory/Cargo.toml b/iii-directory/Cargo.toml index 3ebb43f4..1c441a36 100644 --- a/iii-directory/Cargo.toml +++ b/iii-directory/Cargo.toml @@ -16,6 +16,7 @@ path = "src/lib.rs" [dependencies] iii-sdk = "=0.16.0-next.2" +arc-swap = "1" tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "signal", "time", "fs", "process"] } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/iii-directory/README.md b/iii-directory/README.md index 6d98e0e6..3ebca3a2 100644 --- a/iii-directory/README.md +++ b/iii-directory/README.md @@ -78,24 +78,50 @@ npx skills add iii-hq/workers --all ## Configuration +Runtime settings live in the **`configuration` worker** under id +**`iii-directory`** (the same pattern `database` and `storage` use). At boot +the worker registers its JSON Schema, reads the live value via +`configuration::get` (the configuration worker env-expands `${VAR}`), and binds +a `configuration` trigger so it re-fetches on change. + +Persisted values default to `./data/configuration/iii-directory.yaml` (fs +adapter). Edit that file directly, call `configuration::set id=iii-directory`, +or use the console Workers tab — all three propagate without a redeploy. + +### Fields + ```yaml -# Folder that backs every read (`directory::skills::list`, -# `directory::skills::get`, `directory::prompts::*`) and every write -# from `directory::skills::download`. Relative paths are resolved -# against the process current working directory; absolute paths are -# used as-is. -skills_folder: ./skills - -# Workers registry base URL — used by `directory::skills::download` -# and the `directory::registry::*` proxies when a `worker=` source is -# specified. Override for self-hosted deployments. -registry_url: https://api.workers.iii.dev - -# Timeout for a single download (`git clone` or HTTP request) in ms. -download_timeout_ms: 60000 +# TOPOLOGY — changing any of these requires a worker restart. +skills_folder: ~/.iii/skills # read/write root for skills + prompts +local_skills_folder: ./.iii/skills # project-scoped overrides (whole-namespace local-wins) +auto_download: true # subscribe to worker-add + run the boot reconcile + +# TUNABLE — hot-reload live on `configuration:updated`. +registry_url: https://api.workers.iii.dev # workers registry base URL +download_timeout_ms: 60000 # per git-clone / HTTP request timeout (ms) +registry_cache_ttl_ms: 60000 # in-process TTL for registry::workers::* responses +filter_unregistered: true # hide skills whose namespace isn't an installed worker ``` -The folder is created on first download if it doesn't exist. +The `skills_folder` is created on first download if it doesn't exist. + +### Zero-config default + seed + +With no seed and no stored value the worker uses built-in defaults +(`skills_folder: ~/.iii/skills`, `registry_url: https://api.workers.iii.dev`). +Pass `--config ` to supply a YAML seed: when present and no value is +stored yet, its contents become `initial_value` on `configuration::register` +(see [`config.yaml.example`](config.yaml.example)). Engine-managed deployments +inline the config under the worker entry; the engine delivers it via `--config`. + +### Hot reload + +On `configuration::set` (or an external edit to the persisted file), the worker +re-fetches the authoritative value. Tunable changes apply in place and the +registry caches are cleared so a repointed `registry_url` takes effect +immediately. Topology changes (`skills_folder` / `local_skills_folder` / +`auto_download`) are refused with a "restart required" log; the previous +configuration is kept until the worker restarts. --- @@ -265,7 +291,9 @@ block on downstream latency. ### Run from source ```bash -cargo run --release -- --url ws://127.0.0.1:49134 --config ./config.yaml +# --config is an optional YAML seed (see config.yaml.example); omit it to +# rely on the value stored in the `configuration` worker (or built-in defaults). +cargo run --release -- --url ws://127.0.0.1:49134 --config ./config.yaml.example ``` ### Tests diff --git a/iii-directory/config.yaml b/iii-directory/config.yaml deleted file mode 100644 index e97f06c9..00000000 --- a/iii-directory/config.yaml +++ /dev/null @@ -1,16 +0,0 @@ -# iii-directory runtime config. - -# Folder that backs every read (`directory::skills::list`, -# `directory::skills::get`, `directory::prompts::*`) and every write -# from `directory::skills::download`. Relative paths are resolved -# against the process current working directory; absolute paths are -# used as-is. -skills_folder: ./skills - -# Workers registry base URL — used by `directory::skills::download` and -# the `directory::registry::*` proxies when a `worker=` source is -# specified. -registry_url: https://api.workers.iii.dev - -# Timeout for a single download (`git clone` or HTTP request) in ms. -download_timeout_ms: 60000 diff --git a/iii-directory/config.yaml.example b/iii-directory/config.yaml.example new file mode 100644 index 00000000..02dc1efa --- /dev/null +++ b/iii-directory/config.yaml.example @@ -0,0 +1,34 @@ +# Optional seed file for first-time registration +# (`iii-directory --config ./config.yaml.example`). +# +# As of this version, iii-directory's runtime config lives in the +# `configuration` worker under id `iii-directory` (the same pattern +# `database` and `storage` use). This file is only a SEED: when the worker +# is launched with `--config ` AND no value is yet stored for id +# `iii-directory`, its contents are passed as `initial_value` on +# `configuration::register` (the configuration worker env-expands `${VAR}`). +# After that first register the stored value is authoritative; edit it with +# `configuration::set id=iii-directory` (or the console Workers tab), or by +# editing `./data/configuration/iii-directory.yaml` directly. +# +# Tunable fields hot-reload on change (registry_url, download_timeout_ms, +# registry_cache_ttl_ms, filter_unregistered). Topology fields require a +# worker restart (skills_folder, local_skills_folder, auto_download). +# +# When omitted entirely, the worker seeds the built-in defaults +# (skills_folder: ~/.iii/skills, registry_url: https://api.workers.iii.dev). + +# Folder that backs every read (`directory::skills::list`, +# `directory::skills::get`, `directory::prompts::*`) and every write +# from `directory::skills::download`. Relative paths are resolved +# against the process current working directory; absolute paths are +# used as-is. +skills_folder: ./skills + +# Workers registry base URL — used by `directory::skills::download` and +# the `directory::registry::*` proxies when a `worker=` source is +# specified. +registry_url: https://api.workers.iii.dev + +# Timeout for a single download (`git clone` or HTTP request) in ms. +download_timeout_ms: 60000 diff --git a/iii-directory/src/config.rs b/iii-directory/src/config.rs index f574d6bd..42abedd4 100644 --- a/iii-directory/src/config.rs +++ b/iii-directory/src/config.rs @@ -6,9 +6,19 @@ //! glob arrays, no scopes — everything lives on disk under one root. use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::Result; +use arc_swap::ArcSwap; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use serde_json::Value; + +/// Shared, hot-reloadable config handle. Handlers snapshot the current +/// value per call (`handle.load_full()`, lock-free) so a +/// `configuration:updated` reload that `store`s a new value takes effect +/// on the next invocation without re-registering any function. +pub type SharedConfig = Arc>; /// Default base URL for the workers registry. Overrideable via /// `registry_url:` in the config so self-hosted deployments can repoint. @@ -51,7 +61,7 @@ fn default_auto_download() -> bool { true } -#[derive(Deserialize, Serialize, Debug, Clone)] +#[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)] pub struct SkillsConfig { /// Folder that backs every read (`directory::skills::list`, /// `directory::skills::get`, `directory::prompts::*`) and every @@ -173,6 +183,74 @@ impl SkillsConfig { pub fn registry_base(&self) -> &str { self.registry_url.trim_end_matches('/') } + + /// Restart-requiring fields. A `configuration:updated` reload that + /// changes any of these is refused (logged "restart required"): + /// `skills_folder` / `local_skills_folder` are the on-disk read/write + /// roots baked into running tasks, and `auto_download` wires the + /// `worker`-trigger subscription + boot reconcile at startup — none can + /// be re-wired safely in place. + pub fn topology(&self) -> Topology { + Topology { + skills_folder: self.skills_folder.clone(), + local_skills_folder: self.local_skills_folder.clone(), + auto_download: self.auto_download, + } + } + + /// JSON Schema registered with the `configuration` worker so the + /// console can render an editor for the `iii-directory` entry. + pub fn json_schema() -> Value { + let root = schemars::schema_for!(SkillsConfig); + let mut schema = + serde_json::to_value(&root.schema).expect("SkillsConfig JSON Schema serializes"); + if let Some(obj) = schema.as_object_mut() { + if !root.definitions.is_empty() { + obj.insert( + "definitions".into(), + serde_json::to_value(&root.definitions).expect("definitions serialize"), + ); + } + obj.insert("example".into(), SkillsConfig::default().to_json()); + } + schema + } + + /// Parse a YAML seed file. Used only to build `initial_value` on first + /// `configuration::register`; the configuration worker env-expands + /// `${VAR}` on read, so no local expansion is done here. + pub fn from_yaml(yaml: &str) -> Result { + serde_yaml::from_str(yaml).map_err(|e| format!("yaml parse: {e}")) + } + + /// Read and parse a YAML seed file from disk. + pub fn from_file(path: &str) -> Result { + let raw = std::fs::read_to_string(path).map_err(|e| format!("read {path}: {e}"))?; + Self::from_yaml(&raw) + } + + /// Parse the authoritative value returned by `configuration::get`. + pub fn from_json(value: &Value) -> Result { + serde_json::from_value(value.clone()).map_err(|e| format!("json parse: {e}")) + } + + /// Serialize for `initial_value` on register. + pub fn to_json(&self) -> Value { + serde_json::to_value(self).expect("SkillsConfig serializes") + } + + /// Wrap into a shared, hot-reloadable handle (see [`SharedConfig`]). + pub fn into_shared(self) -> SharedConfig { + Arc::new(ArcSwap::from_pointee(self)) + } +} + +/// Restart-requiring subset of [`SkillsConfig`] (see [`SkillsConfig::topology`]). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Topology { + pub skills_folder: String, + pub local_skills_folder: String, + pub auto_download: bool, } pub fn load_config(path: &str) -> Result { @@ -300,4 +378,88 @@ auto_download: false }; assert_eq!(cfg.registry_base(), "https://api.example"); } + + #[test] + fn json_schema_is_object_with_known_properties() { + let schema = SkillsConfig::json_schema(); + let props = schema + .get("properties") + .and_then(|p| p.as_object()) + .unwrap(); + for field in [ + "skills_folder", + "local_skills_folder", + "registry_url", + "download_timeout_ms", + "registry_cache_ttl_ms", + "filter_unregistered", + "auto_download", + ] { + assert!(props.contains_key(field), "schema missing {field}"); + } + assert!(schema.get("example").is_some()); + } + + #[test] + fn to_json_from_json_roundtrip() { + let cfg = SkillsConfig { + skills_folder: "./my-skills".into(), + registry_url: "https://example.com/registry".into(), + download_timeout_ms: 1234, + registry_cache_ttl_ms: 5678, + filter_unregistered: false, + auto_download: false, + ..SkillsConfig::default() + }; + let back = SkillsConfig::from_json(&cfg.to_json()).unwrap(); + assert_eq!(back.skills_folder, cfg.skills_folder); + assert_eq!(back.registry_url, cfg.registry_url); + assert_eq!(back.download_timeout_ms, cfg.download_timeout_ms); + assert_eq!(back.registry_cache_ttl_ms, cfg.registry_cache_ttl_ms); + assert_eq!(back.filter_unregistered, cfg.filter_unregistered); + assert_eq!(back.auto_download, cfg.auto_download); + } + + #[test] + fn from_yaml_matches_from_json_for_seed_shape() { + let yaml = "skills_folder: ./s\nregistry_url: https://r\ndownload_timeout_ms: 10\n"; + let from_yaml = SkillsConfig::from_yaml(yaml).unwrap(); + let from_json = SkillsConfig::from_json(&from_yaml.to_json()).unwrap(); + assert_eq!(from_yaml.skills_folder, from_json.skills_folder); + assert_eq!(from_yaml.registry_url, from_json.registry_url); + assert_eq!(from_yaml.download_timeout_ms, from_json.download_timeout_ms); + } + + #[test] + fn topology_equal_when_only_tunables_differ() { + let base = SkillsConfig::default(); + let tuned = SkillsConfig { + registry_url: "https://other".into(), + download_timeout_ms: 1, + registry_cache_ttl_ms: 2, + filter_unregistered: !base.filter_unregistered, + ..base.clone() + }; + assert_eq!(base.topology(), tuned.topology()); + } + + #[test] + fn topology_differs_when_a_topology_field_changes() { + let base = SkillsConfig::default(); + let folder = SkillsConfig { + skills_folder: "/other".into(), + ..base.clone() + }; + let local = SkillsConfig { + local_skills_folder: "/other-local".into(), + ..base.clone() + }; + let auto = SkillsConfig { + auto_download: !base.auto_download, + ..base.clone() + }; + assert_ne!(base.topology(), folder.topology()); + assert_ne!(base.topology(), local.topology()); + assert_ne!(base.topology(), auto.topology()); + } } diff --git a/iii-directory/src/configuration.rs b/iii-directory/src/configuration.rs new file mode 100644 index 00000000..30e54ed2 --- /dev/null +++ b/iii-directory/src/configuration.rs @@ -0,0 +1,226 @@ +//! Integration with the `configuration` worker — register, fetch, and +//! hot-reload the `iii-directory` configuration entry. +//! +//! Mirrors the `database` worker's pattern: register a JSON Schema + seed +//! at boot, read the authoritative (env-expanded) value via +//! `configuration::get`, and bind a `configuration` trigger so a +//! `configuration:updated` event re-fetches and applies the change. +//! +//! Tunable fields (`registry_url`, `download_timeout_ms`, +//! `registry_cache_ttl_ms`, `filter_unregistered`) hot-reload in place; +//! topology fields (`skills_folder`, `local_skills_folder`, +//! `auto_download`) are refused with a "restart required" log because +//! they define on-disk roots and boot-time task wiring. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use iii_sdk::{IIIError, RegisterFunction, RegisterTriggerInput, TriggerRequest, III}; +use serde_json::{json, Value}; + +use crate::config::{SharedConfig, SkillsConfig, Topology}; +use crate::functions::registry::RegistryCache; +use crate::functions::skills::RegisteredWorkersCache; + +pub const CONFIG_ID: &str = "iii-directory"; +const CONFIG_FN_ID: &str = "directory::on-config-change"; +const CONFIG_TIMEOUT_MS: u64 = 5_000; +const CONFIG_RETRIES: u32 = 3; + +/// Runtime resources a `configuration:updated` reload must reach: the live +/// config snapshot handlers read, the shared cache-TTL cell, the two caches +/// to clear, and the immutable boot-time topology used to detect +/// restart-requiring changes. +#[derive(Clone)] +pub struct SharedState { + /// Live tunable snapshot read by handlers via `load_full()`. + pub config: SharedConfig, + /// Shared TTL (ms) backing both caches; updated on reload. + pub cache_ttl_ms: Arc, + /// Registry HTTP-response cache; cleared on reload. + pub registry_cache: RegistryCache, + /// Installed-worker-name cache; invalidated on reload. + pub registered_cache: Arc, + /// Restart-only fields captured at boot. + boot_topology: Topology, +} + +impl SharedState { + pub fn new( + config: SharedConfig, + cache_ttl_ms: Arc, + registry_cache: RegistryCache, + registered_cache: Arc, + boot_topology: Topology, + ) -> Self { + Self { + config, + cache_ttl_ms, + registry_cache, + registered_cache, + boot_topology, + } + } +} + +/// Register the `iii-directory` configuration schema with the configuration +/// worker. When `seed` is present, its value is installed as `initial_value`. +/// Otherwise, built-in defaults are seeded only when no stored value exists. +pub async fn register_config(iii: &III, seed: Option<&SkillsConfig>) -> Result<(), String> { + let mut payload = json!({ + "id": CONFIG_ID, + "name": "iii-directory", + "description": "Skills/prompts folders, workers-registry URL, download timeouts, \ + and skill-visibility filters for the iii-directory worker.", + "schema": SkillsConfig::json_schema(), + }); + if let Some(seed) = seed { + payload["initial_value"] = seed.to_json(); + } else if should_seed_default_value(iii).await? { + payload["initial_value"] = SkillsConfig::default().to_json(); + } + trigger_with_retry(iii, "configuration::register", payload).await?; + Ok(()) +} + +/// Read the live `iii-directory` configuration (env-expanded by the +/// configuration worker). +pub async fn fetch_config(iii: &III) -> Result { + let value = get_config_value(iii).await?; + if value.is_null() { + tracing::info!("no configuration value found; using built-in default configuration"); + return Ok(SkillsConfig::default()); + } + SkillsConfig::from_json(&value) +} + +async fn should_seed_default_value(iii: &III) -> Result { + match try_get_config_value(iii).await? { + None => Ok(true), + Some(value) if value.is_null() => Ok(true), + Some(_) => Ok(false), + } +} + +async fn get_config_value(iii: &III) -> Result { + try_get_config_value(iii) + .await? + .ok_or_else(|| format!("configuration `{CONFIG_ID}` not found")) +} + +/// Returns `Ok(None)` when the entry does not exist (`NOT_FOUND`). +async fn try_get_config_value(iii: &III) -> Result, String> { + match trigger_with_retry(iii, "configuration::get", json!({ "id": CONFIG_ID })).await { + Ok(resp) => Ok(resp.get("value").cloned()), + Err(e) if e.contains("NOT_FOUND") => Ok(None), + Err(e) => Err(e), + } +} + +/// Apply a freshly-fetched configuration: swap the live snapshot, update the +/// shared cache TTL, and clear both caches so a repointed `registry_url` +/// takes effect immediately and stale entries from the old registry drop. +pub async fn apply_config(state: &SharedState, cfg: SkillsConfig) { + state + .cache_ttl_ms + .store(cfg.registry_cache_ttl_ms, Ordering::Relaxed); + state.config.store(Arc::new(cfg)); + state.registry_cache.clear().await; + state.registered_cache.invalidate().await; +} + +/// Register the internal config-change handler and bind a `configuration` +/// trigger for `configuration:updated` on the `iii-directory` entry. +pub fn register_config_trigger(iii: &III, state: SharedState) -> Result<(), IIIError> { + let st = state.clone(); + let engine = iii.clone(); + iii.register_function( + CONFIG_FN_ID, + RegisterFunction::new_async(move |_payload: Value| { + let st = st.clone(); + let engine = engine.clone(); + async move { + on_config_change(&engine, &st).await; + Ok::(json!({ "ok": true })) + } + }) + .description( + "Internal: reload tunable iii-directory settings from the authoritative \ + configuration when it changes.", + ), + ); + + iii.register_trigger(RegisterTriggerInput { + trigger_type: "configuration".to_string(), + function_id: CONFIG_FN_ID.to_string(), + config: json!({ + "configuration_id": CONFIG_ID, + "event_types": ["configuration:updated"], + }), + metadata: None, + })?; + Ok(()) +} + +/// Reload from the AUTHORITATIVE configuration. +/// +/// The caller-supplied trigger payload is intentionally ignored: +/// `directory::on-config-change` is a discoverable bus function, so trusting +/// `payload.new_value` would let any caller repoint the registry URL or +/// download roots without updating persisted state. Re-fetch the stored +/// value via `configuration::get` instead. Topology changes are refused — +/// the on-disk roots and the auto-download wiring are fixed at boot. +async fn on_config_change(iii: &III, state: &SharedState) { + let cfg = match fetch_config(iii).await { + Ok(cfg) => cfg, + Err(e) => { + tracing::error!( + error = %e, + "config-change: failed to fetch authoritative configuration; keeping previous config" + ); + return; + } + }; + if cfg.topology() != state.boot_topology { + tracing::warn!( + "configuration change alters topology (skills_folder, local_skills_folder, or \ + auto_download); a restart is required to apply it — keeping previous configuration" + ); + return; + } + apply_config(state, cfg).await; + tracing::info!("iii-directory configuration reloaded (tunable fields applied; caches cleared)"); +} + +async fn trigger_with_retry(iii: &III, function_id: &str, payload: Value) -> Result { + let mut last_err = String::new(); + for attempt in 1..=CONFIG_RETRIES { + match iii + .trigger(TriggerRequest { + function_id: function_id.to_string(), + payload: payload.clone(), + action: None, + timeout_ms: Some(CONFIG_TIMEOUT_MS), + }) + .await + { + Ok(v) => return Ok(v), + Err(e) => { + last_err = e.to_string(); + if attempt < CONFIG_RETRIES { + tracing::warn!( + function_id, + attempt, + error = %last_err, + "configuration RPC failed; retrying" + ); + tokio::time::sleep(Duration::from_millis(250 * u64::from(attempt))).await; + } + } + } + } + Err(format!( + "{function_id} failed after {CONFIG_RETRIES} attempts: {last_err}" + )) +} diff --git a/iii-directory/src/functions/download.rs b/iii-directory/src/functions/download.rs index 63486026..0508cc6f 100644 --- a/iii-directory/src/functions/download.rs +++ b/iii-directory/src/functions/download.rs @@ -16,7 +16,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use crate::config::SkillsConfig; +use crate::config::{SharedConfig, SkillsConfig}; use crate::sources::{self, registry::VersionSpec, DownloadResult}; use crate::trigger_types::{self, SubscriberSet}; @@ -108,7 +108,7 @@ pub enum ClassifiedInput { /// with `master`-default repos can override via the `branch` field. pub const DEFAULT_REPO_BRANCH: &str = "main"; -pub fn register(iii: &Arc, cfg: &Arc, subscribers: &super::Subscribers) { +pub fn register(iii: &Arc, cfg: &SharedConfig, subscribers: &super::Subscribers) { register_download(iii, cfg, subscribers); register_download_from_registry(iii, cfg, subscribers); register_download_from_repo(iii, cfg, subscribers); @@ -135,7 +135,7 @@ async fn run_and_fan_out( /// source set. Kept for back-compat; new callers should prefer the /// explicit `download_from_registry` / `download_from_repo`, whose /// schemas make the source unambiguous. -fn register_download(iii: &Arc, cfg: &Arc, subscribers: &super::Subscribers) { +fn register_download(iii: &Arc, cfg: &SharedConfig, subscribers: &super::Subscribers) { let iii_inner = iii.clone(); let cfg_inner = cfg.clone(); let skills_subs = subscribers.skills.clone(); @@ -144,7 +144,7 @@ fn register_download(iii: &Arc, cfg: &Arc, subscribers: &supe "directory::skills::download", RegisterFunction::new_async(move |req: DownloadInput| { let iii = iii_inner.clone(); - let cfg = cfg_inner.clone(); + let cfg = cfg_inner.load_full(); let skills_subs = skills_subs.clone(); let prompts_subs = prompts_subs.clone(); async move { run_and_fan_out(&iii, &cfg, &skills_subs, &prompts_subs, req).await } @@ -167,7 +167,7 @@ fn register_download(iii: &Arc, cfg: &Arc, subscribers: &supe /// schema level (no "specify exactly one of two groups" guesswork). fn register_download_from_registry( iii: &Arc, - cfg: &Arc, + cfg: &SharedConfig, subscribers: &super::Subscribers, ) { let iii_inner = iii.clone(); @@ -178,7 +178,7 @@ fn register_download_from_registry( "directory::skills::download_from_registry", RegisterFunction::new_async(move |req: RegistryDownloadInput| { let iii = iii_inner.clone(); - let cfg = cfg_inner.clone(); + let cfg = cfg_inner.load_full(); let skills_subs = skills_subs.clone(); let prompts_subs = prompts_subs.clone(); async move { @@ -208,7 +208,7 @@ fn register_download_from_registry( /// the schema level. fn register_download_from_repo( iii: &Arc, - cfg: &Arc, + cfg: &SharedConfig, subscribers: &super::Subscribers, ) { let iii_inner = iii.clone(); @@ -219,7 +219,7 @@ fn register_download_from_repo( "directory::skills::download_from_repo", RegisterFunction::new_async(move |req: RepoDownloadInput| { let iii = iii_inner.clone(); - let cfg = cfg_inner.clone(); + let cfg = cfg_inner.load_full(); let skills_subs = skills_subs.clone(); let prompts_subs = prompts_subs.clone(); async move { diff --git a/iii-directory/src/functions/mod.rs b/iii-directory/src/functions/mod.rs index ae625449..2ec70881 100644 --- a/iii-directory/src/functions/mod.rs +++ b/iii-directory/src/functions/mod.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use iii_sdk::III; -use crate::config::SkillsConfig; +use crate::config::{SharedConfig, SkillsConfig}; use crate::fs_source::{self, SourceKind}; use crate::trigger_types::{RegisteredTriggerTypes, SubscriberSet}; @@ -47,12 +47,7 @@ impl From<&RegisteredTriggerTypes> for Subscribers { } } -/// Register every function the worker exposes against `iii`. -pub fn register_all( - iii: &Arc, - cfg: &Arc, - trigger_types: &RegisteredTriggerTypes, -) { +pub fn register_all(iii: &Arc, cfg: &SharedConfig, trigger_types: &RegisteredTriggerTypes) { skills::register(iii, cfg); prompts::register(iii, cfg); let subs = Subscribers::from(trigger_types); @@ -66,19 +61,18 @@ pub fn register_all( ); } -/// Register all functions with a pre-built registered-workers cache. -/// Used when the cache is shared with auto-download event handlers. pub fn register_all_with_cache( iii: &Arc, - cfg: &Arc, + cfg: &SharedConfig, trigger_types: &RegisteredTriggerTypes, cache: &std::sync::Arc, + registry_cache: registry::RegistryCache, ) { skills::register_with_cache(iii, cfg, cache); prompts::register(iii, cfg); let subs = Subscribers::from(trigger_types); download::register(iii, cfg, &subs); - registry::register(iii, cfg); + registry::register_with_cache(iii, cfg, registry_cache); engine_fn::register(iii); tracing::info!( "iii-directory registered 3 directory::skills::* (list + get + index), \ diff --git a/iii-directory/src/functions/prompts.rs b/iii-directory/src/functions/prompts.rs index 2b97ad05..0335fd50 100644 --- a/iii-directory/src/functions/prompts.rs +++ b/iii-directory/src/functions/prompts.rs @@ -20,7 +20,7 @@ use iii_sdk::{IIIError, RegisterFunction, III}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::config::SkillsConfig; +use crate::config::{SharedConfig, SkillsConfig}; use crate::fs_source; use crate::functions::error::{not_found_message, NextAction}; @@ -63,17 +63,17 @@ pub struct PromptGetOutput { pub modified_at: String, } -pub fn register(iii: &Arc, cfg: &Arc) { +pub fn register(iii: &Arc, cfg: &SharedConfig) { register_list_prompts(iii, cfg); register_get_prompt(iii, cfg); } -fn register_list_prompts(iii: &Arc, cfg: &Arc) { +fn register_list_prompts(iii: &Arc, cfg: &SharedConfig) { let cfg_inner = cfg.clone(); iii.register_function( "directory::prompts::list", RegisterFunction::new_async(move |_input: ListPromptsInput| { - let cfg = cfg_inner.clone(); + let cfg = cfg_inner.load_full(); async move { let (prompts, _skipped) = fs_source::scan_prompts_merged( &cfg.resolved_skills_folder(), @@ -99,12 +99,12 @@ fn register_list_prompts(iii: &Arc, cfg: &Arc) { ); } -fn register_get_prompt(iii: &Arc, cfg: &Arc) { +fn register_get_prompt(iii: &Arc, cfg: &SharedConfig) { let cfg_inner = cfg.clone(); iii.register_function( "directory::prompts::get", RegisterFunction::new_async(move |req: PromptGetInput| { - let cfg = cfg_inner.clone(); + let cfg = cfg_inner.load_full(); async move { get_prompt(&cfg, req).await.map_err(IIIError::Handler) } }) .description( diff --git a/iii-directory/src/functions/registry.rs b/iii-directory/src/functions/registry.rs index 6f936710..61049468 100644 --- a/iii-directory/src/functions/registry.rs +++ b/iii-directory/src/functions/registry.rs @@ -32,6 +32,7 @@ //! * `GET {base}/w/{slug}/skills?version=…` → //! `{ name, version, skills: [{path, content}], prompts: [{name, description?, args_schema?, content}] }`. +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -41,7 +42,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::sync::RwLock; -use crate::config::SkillsConfig; +use crate::config::{SharedConfig, SkillsConfig}; use crate::functions::error::{invalid_input_message, not_found_message, NextAction}; use crate::sources::build_http_client; use crate::sources::registry::validate_worker_name; @@ -240,7 +241,9 @@ pub struct WorkerInfoOutput { #[derive(Clone)] pub struct RegistryCache { inner: Arc>>, - ttl: Duration, + /// Live TTL in ms, shared with the worker-list cache and updated on a + /// `configuration:updated` reload (see `configuration::apply_config`). + ttl_ms: Arc, } struct CacheEntry { @@ -249,17 +252,24 @@ struct CacheEntry { } impl RegistryCache { + /// Construct with a fixed TTL (used by unit tests). pub fn new(ttl: Duration) -> Self { + Self::new_shared(Arc::new(AtomicU64::new(ttl.as_millis() as u64))) + } + + /// Construct sharing a live TTL cell with the rest of the worker. + pub fn new_shared(ttl_ms: Arc) -> Self { Self { inner: Arc::new(RwLock::new(std::collections::HashMap::new())), - ttl, + ttl_ms, } } pub async fn get(&self, key: &str) -> Option { let map = self.inner.read().await; let entry = map.get(key)?; - if entry.inserted_at.elapsed() > self.ttl { + let ttl = Duration::from_millis(self.ttl_ms.load(Ordering::Relaxed)); + if entry.inserted_at.elapsed() > ttl { return None; } serde_json::from_value(entry.value.clone()).ok() @@ -286,19 +296,26 @@ impl RegistryCache { // ---------- registration ---------- -pub fn register(iii: &Arc, cfg: &Arc) { - let cache = RegistryCache::new(Duration::from_millis(cfg.registry_cache_ttl_ms)); +pub fn register(iii: &Arc, cfg: &SharedConfig) { + let ttl_ms = cfg.load().registry_cache_ttl_ms; + let cache = RegistryCache::new(Duration::from_millis(ttl_ms)); + register_with_cache(iii, cfg, cache); +} + +/// Register the registry proxy functions against a shared cache so a +/// `configuration:updated` reload can clear it (and repoint `registry_url`). +pub fn register_with_cache(iii: &Arc, cfg: &SharedConfig, cache: RegistryCache) { register_worker_list(iii, cfg, cache.clone()); register_worker_info(iii, cfg, cache); } -fn register_worker_list(iii: &Arc, cfg: &Arc, cache: RegistryCache) { +fn register_worker_list(iii: &Arc, cfg: &SharedConfig, cache: RegistryCache) { let cfg_inner = cfg.clone(); let cache_inner = cache; iii.register_function( "directory::registry::workers::list", RegisterFunction::new_async(move |req: WorkerListInput| { - let cfg = cfg_inner.clone(); + let cfg = cfg_inner.load_full(); let cache = cache_inner.clone(); async move { worker_list(&cfg, &cache, req) @@ -319,13 +336,13 @@ fn register_worker_list(iii: &Arc, cfg: &Arc, cache: Registry ); } -fn register_worker_info(iii: &Arc, cfg: &Arc, cache: RegistryCache) { +fn register_worker_info(iii: &Arc, cfg: &SharedConfig, cache: RegistryCache) { let cfg_inner = cfg.clone(); let cache_inner = cache; iii.register_function( "directory::registry::workers::info", RegisterFunction::new_async(move |req: WorkerInfoInput| { - let cfg = cfg_inner.clone(); + let cfg = cfg_inner.load_full(); let cache = cache_inner.clone(); async move { worker_info(&cfg, &cache, req) @@ -1024,4 +1041,25 @@ mod tests { let v: Option = cache.get("k").await; assert!(v.is_none()); } + + #[tokio::test] + async fn registry_cache_ttl_is_live_via_shared_cell() { + // The TTL is read from a shared cell on every `get`, so a + // `configuration:updated` reload that stores a new value changes the + // effective freshness window in place (no cache rebuild). + let ttl = Arc::new(AtomicU64::new(60_000)); + let cache = RegistryCache::new_shared(ttl.clone()); + cache.put("k".into(), &json!({ "x": 1 })).await; + let fresh: Option = cache.get("k").await; + assert!(fresh.is_some(), "entry should be fresh under the 60s TTL"); + + // Shrink the shared TTL to 0 → the same entry is now stale. + ttl.store(0, Ordering::Relaxed); + tokio::time::sleep(Duration::from_millis(2)).await; + let stale: Option = cache.get("k").await; + assert!( + stale.is_none(), + "entry should be stale after TTL shrinks to 0" + ); + } } diff --git a/iii-directory/src/functions/skills.rs b/iii-directory/src/functions/skills.rs index 074af9ab..f33d3bda 100644 --- a/iii-directory/src/functions/skills.rs +++ b/iii-directory/src/functions/skills.rs @@ -28,6 +28,7 @@ //! fired from the download function on success. use std::collections::HashSet; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; @@ -38,7 +39,7 @@ use serde::Serialize; use serde_json::json; use tokio::sync::Mutex; -use crate::config::SkillsConfig; +use crate::config::{SharedConfig, SkillsConfig}; use crate::fs_source::{self, FsSkill, SkillFrontmatter}; use crate::functions::error::{invalid_input_message, not_found_message, NextAction, SuggestEntry}; @@ -221,11 +222,19 @@ pub(crate) struct CacheEntry { pub struct RegisteredWorkersCache { /// `pub(crate)` so tests in sibling modules can inspect / populate. pub(crate) inner: Mutex>, - ttl_ms: u64, + /// Live TTL in ms, shared with the registry cache and updated on a + /// `configuration:updated` reload (see `configuration::apply_config`). + ttl_ms: Arc, } impl RegisteredWorkersCache { + /// Construct with a fixed TTL (used by unit tests). pub fn new(ttl_ms: u64) -> Self { + Self::new_shared(Arc::new(AtomicU64::new(ttl_ms))) + } + + /// Construct sharing a live TTL cell with the rest of the worker. + pub fn new_shared(ttl_ms: Arc) -> Self { Self { inner: Mutex::new(None), ttl_ms, @@ -252,7 +261,9 @@ impl RegisteredWorkersCache { { let lock = self.inner.lock().await; if let Some(entry) = lock.as_ref() { - if entry.fetched_at.elapsed().as_millis() < self.ttl_ms as u128 { + if entry.fetched_at.elapsed().as_millis() + < self.ttl_ms.load(Ordering::Relaxed) as u128 + { return Some(entry.workers.clone()); } } @@ -429,22 +440,26 @@ pub(crate) fn filter_to_registered( .collect() } -pub fn register(iii: &Arc, cfg: &Arc) { - let cache = Arc::new(RegisteredWorkersCache::new(cfg.registry_cache_ttl_ms)); +pub fn register(iii: &Arc, cfg: &SharedConfig) { + let cache = Arc::new(RegisteredWorkersCache::new( + cfg.load().registry_cache_ttl_ms, + )); register_list_skills(iii, cfg, &cache); register_get_skill(iii, cfg, &cache); register_index_skills(iii, cfg, &cache); } -/// Expose the cache so main.rs can share it with the event handler. -pub fn make_registered_cache(cfg: &SkillsConfig) -> Arc { - Arc::new(RegisteredWorkersCache::new(cfg.registry_cache_ttl_ms)) +/// Expose the cache so main.rs can share it with the event handler. Takes +/// the live TTL cell so a `configuration:updated` reload changes the +/// effective freshness window in place. +pub fn make_registered_cache(ttl_ms: Arc) -> Arc { + Arc::new(RegisteredWorkersCache::new_shared(ttl_ms)) } /// Register all skills functions with a shared cache instance. pub fn register_with_cache( iii: &Arc, - cfg: &Arc, + cfg: &SharedConfig, cache: &Arc, ) { register_list_skills(iii, cfg, cache); @@ -452,18 +467,14 @@ pub fn register_with_cache( register_index_skills(iii, cfg, cache); } -fn register_list_skills( - iii: &Arc, - cfg: &Arc, - cache: &Arc, -) { +fn register_list_skills(iii: &Arc, cfg: &SharedConfig, cache: &Arc) { let cfg_inner = cfg.clone(); let iii_inner = iii.clone(); let cache_inner = cache.clone(); iii.register_function( "directory::skills::list", RegisterFunction::new_async(move |input: ListSkillsInput| { - let cfg = cfg_inner.clone(); + let cfg = cfg_inner.load_full(); let iii = iii_inner.clone(); let cache = cache_inner.clone(); async move { @@ -541,18 +552,14 @@ fn list_skills_filtered(entries: Vec, input: &ListSkillsInput) -> Vec, - cfg: &Arc, - cache: &Arc, -) { +fn register_get_skill(iii: &Arc, cfg: &SharedConfig, cache: &Arc) { let cfg_inner = cfg.clone(); let iii_inner = iii.clone(); let cache_inner = cache.clone(); iii.register_function( "directory::skills::get", RegisterFunction::new_async(move |req: SkillGetInput| { - let cfg = cfg_inner.clone(); + let cfg = cfg_inner.load_full(); let iii = iii_inner.clone(); let cache = cache_inner.clone(); async move { @@ -566,18 +573,14 @@ fn register_get_skill( ); } -fn register_index_skills( - iii: &Arc, - cfg: &Arc, - cache: &Arc, -) { +fn register_index_skills(iii: &Arc, cfg: &SharedConfig, cache: &Arc) { let cfg_inner = cfg.clone(); let iii_inner = iii.clone(); let cache_inner = cache.clone(); iii.register_function( "directory::skills::index", RegisterFunction::new_async(move |_input: IndexSkillsInput| { - let cfg = cfg_inner.clone(); + let cfg = cfg_inner.load_full(); let iii = iii_inner.clone(); let cache = cache_inner.clone(); async move { diff --git a/iii-directory/src/lib.rs b/iii-directory/src/lib.rs index 8a2fdc29..a32f4042 100644 --- a/iii-directory/src/lib.rs +++ b/iii-directory/src/lib.rs @@ -38,6 +38,7 @@ //! change notifications to their clients. pub mod config; +pub mod configuration; pub mod fs_source; pub mod functions; pub mod manifest; diff --git a/iii-directory/src/main.rs b/iii-directory/src/main.rs index e6a74e6c..ef7eb932 100644 --- a/iii-directory/src/main.rs +++ b/iii-directory/src/main.rs @@ -22,24 +22,26 @@ //! call `engine::functions::list`, `engine::triggers::list`, etc., //! directly. +use std::sync::atomic::AtomicU64; use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; use clap::Parser; use iii_sdk::{ register_worker, InitOptions, RegisterFunction, TriggerRequest, WorkerMetadata, III, }; use serde_json::json; -use iii_directory::config::SkillsConfig; +use iii_directory::config::{SharedConfig, SkillsConfig}; use iii_directory::functions::download::{ download_worker_skills, reconcile_decision, InFlightGuard, }; +use iii_directory::functions::registry::RegistryCache; use iii_directory::functions::skills::{ make_registered_cache, RegisteredWorkersCache, ENGINE_NAMESPACE, }; use iii_directory::sources::registry::VersionSpec; -use iii_directory::{config, functions, manifest, trigger_types}; +use iii_directory::{configuration, functions, manifest, trigger_types}; #[derive(Parser, Debug)] #[command( @@ -47,8 +49,11 @@ use iii_directory::{config, functions, manifest, trigger_types}; about = "Engine introspection (functions / triggers / workers), workers registry proxy, and filesystem-backed skill + prompt reader." )] struct Cli { - #[arg(long, default_value = "./config.yaml")] - config: String, + /// Optional YAML seed used to populate `initial_value` on the first + /// `configuration::register`. After first boot the authoritative config + /// lives in the `configuration` worker under id `iii-directory`. + #[arg(long)] + config: Option, #[arg(long, default_value = "ws://127.0.0.1:49134")] url: String, @@ -74,26 +79,7 @@ async fn main() -> Result<()> { return Ok(()); } - let cfg = match config::load_config(&cli.config) { - Ok(c) => { - tracing::info!( - skills_folder = %c.resolved_skills_folder().display(), - local_skills_folder = %c.local_skills_folder().display(), - registry_url = %c.registry_base(), - filter_unregistered = c.filter_unregistered, - auto_download = c.auto_download, - "loaded config from {}", - cli.config - ); - c - } - Err(e) => { - tracing::warn!(error = %e, path = %cli.config, "failed to load config, using defaults"); - config::SkillsConfig::default() - } - }; - let cfg = Arc::new(cfg); - + // Connect to the engine first so the configuration RPCs are reachable. let iii = register_worker( &cli.url, InitOptions { @@ -111,26 +97,97 @@ async fn main() -> Result<()> { ); let iii = Arc::new(iii); - // Shared registered-workers cache used by read functions and - // invalidated by the worker-add event handler. - let cache = make_registered_cache(&cfg); + // Optional YAML seed used only to populate `initial_value` on the first + // `configuration::register`. + let seed = match cli.config.as_deref() { + Some(path) => match SkillsConfig::from_file(path) { + Ok(cfg) => { + tracing::info!(path = %path, "loaded seed config for initial registration"); + Some(cfg) + } + Err(e) => { + tracing::warn!( + path = %path, + error = %e, + "failed to load seed config; relying on existing configuration entry" + ); + None + } + }, + None => None, + }; + + // Register the schema (+ seed) and fetch the authoritative, env-expanded + // value from the `configuration` worker. + configuration::register_config(&iii, seed.as_ref()) + .await + .map_err(anyhow::Error::msg) + .context("registering iii-directory configuration schema")?; + let cfg = configuration::fetch_config(&iii) + .await + .map_err(anyhow::Error::msg) + .context("loading iii-directory configuration")?; + + tracing::info!( + skills_folder = %cfg.resolved_skills_folder().display(), + local_skills_folder = %cfg.local_skills_folder().display(), + registry_url = %cfg.registry_base(), + filter_unregistered = cfg.filter_unregistered, + auto_download = cfg.auto_download, + "loaded configuration from the configuration worker" + ); - // Custom trigger types come first because the download function - // captures the subscriber sets it'll fan out to on success. + // Shared, hot-reloadable state. Topology is captured at boot; tunable + // fields live in `cfg_handle` (swapped on reload) and the shared + // `cache_ttl_ms` cell read by both caches. + let boot_topology = cfg.topology(); + let auto_download = cfg.auto_download; + let cache_ttl_ms = Arc::new(AtomicU64::new(cfg.registry_cache_ttl_ms)); + let registered_cache = make_registered_cache(cache_ttl_ms.clone()); + let registry_cache = RegistryCache::new_shared(cache_ttl_ms.clone()); + let cfg_handle: SharedConfig = cfg.into_shared(); + + // Custom trigger types come first because the download function captures + // the subscriber sets it'll fan out to on success. let registered = trigger_types::register_all(&iii); - functions::register_all_with_cache(&iii, &cfg, ®istered, &cache); - functions::log_fs_health(&cfg); + functions::register_all_with_cache( + &iii, + &cfg_handle, + ®istered, + ®istered_cache, + registry_cache.clone(), + ); + functions::log_fs_health(&cfg_handle.load_full()); - // Auto-download: subscribe to worker add events + boot reconcile. - if cfg.auto_download { + // Auto-download: subscribe to worker add events + boot reconcile. Wired + // from the boot value of `auto_download` (a topology field — changing it + // requires a restart). + if auto_download { let in_flight = Arc::new(InFlightGuard::new()); - setup_auto_download(&iii, &cfg, &cache, &in_flight); - spawn_boot_reconcile(iii.clone(), cfg.clone(), cache.clone(), in_flight); + setup_auto_download(&iii, &cfg_handle, ®istered_cache, &in_flight); + spawn_boot_reconcile( + iii.clone(), + cfg_handle.clone(), + registered_cache.clone(), + in_flight, + ); } - let fn_count = if cfg.auto_download { 10 } else { 9 }; + // Bind the configuration-change trigger so tunable fields hot-reload. + let state = configuration::SharedState::new( + cfg_handle.clone(), + cache_ttl_ms, + registry_cache, + registered_cache, + boot_topology, + ); + configuration::register_config_trigger(&iii, state) + .context("registering configuration change trigger")?; + + let fn_count = if auto_download { 10 } else { 9 }; tracing::info!( - "iii-directory ready: {} directory::* functions + 2 custom trigger types", + "iii-directory ready: {} directory::* functions + 2 custom trigger types + \ + configuration hot-reload", fn_count ); @@ -144,7 +201,7 @@ async fn main() -> Result<()> { /// subscribe to the `worker` trigger type for `add` operations. fn setup_auto_download( iii: &Arc, - cfg: &Arc, + cfg: &SharedConfig, cache: &Arc, in_flight: &Arc, ) { @@ -156,7 +213,7 @@ fn setup_auto_download( iii.register_function( "directory::__on_worker_added", RegisterFunction::new_async(move |input: serde_json::Value| { - let cfg = cfg_inner.clone(); + let cfg = cfg_inner.load_full(); let cache = cache_inner.clone(); let in_flight = in_flight_inner.clone(); async move { @@ -325,11 +382,16 @@ async fn fetch_worker_list_with_retry(iii: &III) -> Option, - cfg: Arc, + cfg: SharedConfig, cache: Arc, in_flight: Arc, ) { tokio::spawn(async move { + // Snapshot the live config for this one-shot boot pass. The fields it + // reads (skills_folder / local_skills_folder) are topology — fixed for + // the process lifetime — so a snapshot is sufficient. + let cfg = cfg.load_full(); + // Small delay so the engine has time to wire us up. tokio::time::sleep(std::time::Duration::from_secs(2)).await; diff --git a/iii-directory/src/manifest.rs b/iii-directory/src/manifest.rs index 6c01c550..3133e024 100644 --- a/iii-directory/src/manifest.rs +++ b/iii-directory/src/manifest.rs @@ -2,7 +2,7 @@ use serde::Serialize; -use crate::config::{DEFAULT_REGISTRY_URL, DEFAULT_SKILLS_FOLDER}; +use crate::config::{DEFAULT_LOCAL_SKILLS_FOLDER, DEFAULT_REGISTRY_URL, DEFAULT_SKILLS_FOLDER}; #[derive(Serialize)] pub struct ModuleManifest { @@ -22,9 +22,12 @@ pub fn build_manifest() -> ModuleManifest { .to_string(), default_config: serde_json::json!({ "skills_folder": DEFAULT_SKILLS_FOLDER, + "local_skills_folder": DEFAULT_LOCAL_SKILLS_FOLDER, "registry_url": DEFAULT_REGISTRY_URL, "download_timeout_ms": 60_000, "registry_cache_ttl_ms": 60_000, + "filter_unregistered": true, + "auto_download": true, }), supported_targets: vec![env!("TARGET").to_string()], } diff --git a/iii-directory/tests/common/workers.rs b/iii-directory/tests/common/workers.rs index 0400626b..934a5097 100644 --- a/iii-directory/tests/common/workers.rs +++ b/iii-directory/tests/common/workers.rs @@ -64,7 +64,10 @@ pub async fn register_all(iii: &Arc) -> Result> { ..SkillsConfig::default() }); let registered = trigger_types::register_all(iii); - functions::register_all(iii, &cfg, ®istered); + // `register_all` reads the live (hot-reloadable) handle; wrap a snapshot. + // `Shared.cfg` keeps the plain `Arc` the step defs assert on. + let cfg_handle = (*cfg).clone().into_shared(); + functions::register_all(iii, &cfg_handle, ®istered); // Give the SDK a beat to publish the function registrations before // scenarios start triggering them.