diff --git a/.design_docs/issue-242-design-decisions.md b/.design_docs/issue-242-design-decisions.md new file mode 100644 index 00000000..2ec97c59 --- /dev/null +++ b/.design_docs/issue-242-design-decisions.md @@ -0,0 +1,188 @@ +# Issue #242: Design Decisions Log + +This document records the design questions and decisions that shaped the implementation plan for issue #242 (programmatic control of precompute engine pipelines). It is intended to explain *why* the plan looks the way it does, not just *what* it does. + +--- + +## Problem Statement + +`asap-query-engine` acts as a proxy between Grafana and Prometheus. Currently, the `precompute_engine` inside it must be given a `streaming_config` (which metrics to precompute and how) at **startup time** via a static config file. + +The goal: start `asap-query-engine` with no precompute config, intercept queries from Grafana, call `asap-planner` on them to generate a `streaming_config` and `inference_config`, and then configure the running precompute engine with the result — no manual intervention required. + +--- + +## What Already Exists + +After exploring the codebase: + +- **Query interception** already exists — the HTTP proxy in `drivers/query/servers/http.rs` already intercepts and can record every query. +- **Planner integration** already exists — `LocalPlannerClient` and `QueryTracker` already collect queries and call the planner on a periodic loop. +- **The planner output is discarded** — `tracker.rs` logs the result and throws it away. This is the core gap. +- **`precompute_engine` has no runtime reconfiguration** — `agg_configs` are built from `streaming_config` at startup inside `run()` and never touched again. There is no API to add, remove, or update pipelines. + +--- + +## Decision 1: When to trigger planning + +**Question:** "Once on startup" — but at startup there are no intercepted queries yet. What does the planner plan? + +**Options considered:** +- (a) Call planner at startup with no queries, letting it do pure metric discovery from Prometheus. +- (b) Wait for the first observation window to elapse, collect real queries during that window, then plan once and configure. + +**Decision: (b)** — wait for the first real observation window, plan with actual queries. More meaningful input to the planner. During the observation window, all queries fall through to Prometheus via the existing fallback mechanism, so users see no gap in Grafana. + +**Future:** Subsequent observation windows will eventually trigger replanning (repeated reconfiguration). The design accommodates this without structural changes. + +--- + +## Decision 2: One-shot vs. repeated loop + +**Question:** After the first plan is applied, should the `QueryTracker` loop stop or keep running? + +**Decision:** Keep the loop running, but only *apply* the config on the first successful plan (via an `AtomicBool applied` flag). This keeps the tracker alive for observability and makes it trivial to extend to repeated reconfiguration later — just remove the flag check. + +--- + +## Decision 3: Runtime reconfiguration strategy (Option A vs. B) + +**Question:** How should the engine be reconfigured at runtime? + +**Option A — Lazy initialization (two-phase startup):** +- Start HTTP server with no engine; all queries fall back to Prometheus. +- After first window + plan, construct all components fresh from planner output. +- Install the new engine into the HTTP server via `Arc>>`. +- For *repeated* reconfiguration: tear down old engine and swap in a new one each time — store's accumulated sketch data is lost on every reconfiguration. + +**Option B — Start with empty configs, hot-swap internals:** +- Start all components immediately with empty `streaming_config` / `inference_config`. +- After planning, update shared state in-place: push new `agg_configs` to workers via message, swap `IngestState.agg_configs` via `ArcSwap`, update `SimpleEngine` and `SimpleMapStore` via `RwLock`. +- Workers are **not restarted** — they lazily pick up new aggregation configs as samples arrive. +- For *repeated* reconfiguration: same mechanism — update configs, workers adapt. Store's precomputed history is preserved across reconfigurations. + +**Decision: Option B** — chosen because it is more amenable to future repeated reconfiguration. Option A loses all precomputed sketch data on every config update, which is wasteful once the engine has been running for multiple windows. Option B preserves historical precomputed data and allows incremental updates. + +--- + +## Decision 4: Config replacement vs. merging + +**Question:** When new planner output arrives, does it replace the existing config entirely, or merge with it? + +**Decision: Replace** — the new planner output becomes the complete truth. However, the **input** to the planner includes the current `streaming_config` and `inference_config` (as `ControllerConfig.existing_*` fields), so the planner has the context it needs to make coherent decisions across windows. + +This means: if a metric was being precomputed and the planner decides it still should be, it will appear in the new config. If it doesn't appear, it is dropped — data for that aggregation expires naturally via the existing cleanup policy (see Decision 8). + +--- + +## Decision 5: Passing existing configs to the planner + +**Question:** `ControllerConfig` (in `asap-planner-rs`) currently has no fields for existing configs. The planner only knows about new query observations. For repeated reconfiguration, the planner needs context about what is already running to make coherent decisions. + +**Decision:** Add `existing_streaming_config: Option` and `existing_inference_config: Option` to `ControllerConfig` **now**, even though the planner does not yet use them. This wires the information through so the planner can use it in the future without a second round of type-signature changes. The fields are clearly marked with a `NOTE` comment. + +--- + +## Decision 6: Who applies the planner result + +**Question:** `QueryTracker` calls the planner but has no reference to the engine components. Who applies the result? + +**Options considered:** +- (a) **Channel/callback** — tracker sends `PlannerResult` via a `tokio::sync::watch` channel; a separate task in `main.rs` owns the receiver and applies it to all engine components. +- (b) **Tracker owns engine handles** — pass `Arc` references of engine components into `QueryTracker`; it applies the result directly. + +**Decision: (a)** — keeps `QueryTracker` decoupled. It only produces results; it does not know about engine internals. The applier task in `main.rs` is the single place that knows about all components and applies updates to them. + +--- + +## Decision 7: Atomic vs. non-atomic config update + +**Question:** `SimpleEngine` (inference) and `PrecomputeEngine` (streaming) are updated by separate calls. There is a brief window where one has the new config and the other does not. + +**Decision:** Accept the brief inconsistency. During the transition window, a query might be matched by the old inference config against data computed by the new streaming config (or vice versa), which could produce a miss and fall back to Prometheus. This is acceptable — it is transient and self-correcting within one query cycle. + +**Implementation note:** The inconsistency window is marked with a `NOTE` comment at the application site in `main.rs` and in `SimpleEngine`. + +--- + +## Decision 8: Stale precomputed data and in-flight state after config replace + +**Question:** When `streaming_config` is replaced, how is in-flight precompute data handled, and what happens to stale state in both the store and the workers? + +### In-flight data in precompute engine workers + +`UpdateAggConfigs` replaces each worker's `agg_configs` map but **does not touch `group_states`**. Existing `GroupState` entries (keyed by `(agg_id, group_key)`) for old aggregation IDs are left in the worker's HashMap. + +Consequences: +- **Data is not silently dropped.** The flush loop iterates over *all* `group_states`, so old groups still close their open windows and emit accumulated sketch data to the `SimpleMapStore` — under the old aggregation IDs. +- **No new samples reach old group states.** The ingest router only routes samples to agg IDs present in the new `agg_configs`, so old groups stop receiving input but still flush whatever they have accumulated. +- **Memory is never reclaimed.** Old `GroupState` entries are never evicted from the worker's HashMap. For the one-shot first-plan scenario this is a fixed-size overhead. For repeated reconfigurations this is a latent memory leak — stale group states accumulate in worker memory until process restart. + +The data committed to the store under old agg IDs becomes **orphaned**: the query engine now uses new inference/streaming configs pointing to new agg IDs, so those store entries are never queried and eventually expire via the cleanup policy. + +**Implementation:** On `UpdateAggConfigs`, workers now prune `group_states` entries whose `agg_id` is absent from the new config, after a final forced flush. Specifically: +1. Identify removed agg IDs (old `agg_configs` keys not present in `new_configs`). +2. For each removed group state, run the window-close logic with `effective_wm = i64::MAX` — since no new samples will arrive, all open windows are treated as due regardless of watermark. +3. Emit accumulated data from those windows to the store, then remove the entry from `group_states` and update `group_count`. +4. Swap in `new_configs`. + +The final flush happens **before** `agg_configs` is swapped so that `GroupState.config` (an `Arc`) is still valid during window-bound calculations. The emitted data is still written under old agg IDs (and will not be queried), but this avoids silently discarding partially-accumulated windows and correctly frees the worker memory. + +### Stale data already committed to `SimpleMapStore` + +**Decision:** Let it expire naturally via the existing cleanup policy. No active purge. + +**Rationale:** The cleanup policy already handles TTL-based eviction. Implementing an active purge would require iterating over potentially large store state and coordinating with in-progress queries. The natural expiry path is safe and requires no new code. + +**Implementation note:** A `NOTE` comment in `update_streaming_config` on the store marks the alternative (active purge) for future reference. + +--- + +## Decision 9: Worker update mechanism + +**Question:** Workers each hold their own `HashMap>`. To give them new configs without restarting, the options are: +- Send `UpdateAggConfigs` messages via the existing `WorkerMessage` channel. +- Use shared mutable state (e.g. `Arc>>`) that workers read on every sample. + +**Decision:** Message-passing (`WorkerMessage::UpdateAggConfigs`). This fits the existing actor-like architecture (workers already process typed messages) and avoids adding a lock acquisition on every sample's hot path. + +Workers lazily create `WindowManager` instances the first time they see a new `agg_id`, so new aggregations are picked up automatically as samples arrive after the update — no special initialization needed. + +--- + +## Decision 11: Synchronization primitive for `IngestState.agg_configs` + +**Question:** `IngestState` is behind `Arc` (immutable). The ingest handler reads `agg_configs` on the hot path — once per HTTP request, iterating over all configs for every sample. The applier task needs to swap in a new vec. What interior-mutability primitive to use? + +**Options considered:** +- `RwLock>` — readers hold the lock during the full iteration over configs. Lock held on the hot path. +- `RwLock>>` — readers briefly lock to clone the Arc, then iterate without holding the lock. +- `ArcSwap>` — truly lock-free reads via atomic pointer swap; no locking on the read path at all. + +**Decision: `ArcSwap>>`** — the ingest handler is on the hot path and should not pay any lock cost on reads. `ArcSwap::load()` is lock-free; the applier calls `ArcSwap::store(Arc::new(new_vec))` for the atomic swap. + +**Note:** This is distinct from `SimpleMapStore.streaming_config`, where `RwLock>` is used. The store's `streaming_config` is accessed only during batch inserts (less frequent, not per-sample), and the sequential ordering in the applier task means there is no real concurrent write race for the store. `RwLock>` (brief lock to clone the pointer, then use without lock) is sufficient there. + +**Note:** `SimpleEngine.inference_config` uses `RwLock` — queries are the read path, which is less frequent than ingest. Holding the read lock for the duration of a query lookup is acceptable. + +--- + +## Decision 10: Kafka / OTLP consumers during observation window + +**Question:** Kafka consumers and OTLP receivers also feed data into the precompute engine. With Option B (empty configs at startup), they will be active during the observation window but producing no useful work (no aggregation configs to match against). + +**Decision:** No special handling needed. With empty `agg_configs`, the ingest handler simply drops all incoming samples (no matching aggregation found). Once the first plan is applied, subsequent samples are processed correctly. This is the natural behavior and requires no additional code. + +--- + +## Summary of Key Structural Changes + +| Component | Before | After | +|---|---|---| +| `streaming_config` in store | `Arc` | `RwLock>` (brief lock to clone pointer, iterate without lock) | +| `inference_config` in `SimpleEngine` | `InferenceConfig` (owned) | `RwLock` (SimpleEngine is already behind Arc) | +| `agg_configs` in `IngestState` | `Vec>` | `ArcSwap>>` (lock-free reads on hot path) | +| Worker config updates | impossible | `WorkerMessage::UpdateAggConfigs` | +| Planner output | logged and discarded | sent via `watch` channel, applied by `main.rs` task | +| `PrecomputeEngine::run()` | creates channels internally, consumes self | channels created in `new()`, `handle()` extracted before `run()` | +| `ControllerConfig` | queries only | queries + `existing_streaming_config` + `existing_inference_config` | diff --git a/.design_docs/issue-242-plan.md b/.design_docs/issue-242-plan.md new file mode 100644 index 00000000..c46f7674 --- /dev/null +++ b/.design_docs/issue-242-plan.md @@ -0,0 +1,265 @@ +# Issue #242: Programmatic Control of Precompute Engine Pipelines + +## Goal + +Enable `asap-query-engine` to configure its precompute engine pipelines at runtime without manual intervention. Specifically: intercept queries from Grafana, call `asap-planner` on them after one observation window, and apply the resulting `streaming_config` + `inference_config` to the running engine — no static config files required. + +**Scope for this PR:** trigger once, after the first observation window. Repeated reconfiguration is future work, but the design is intentionally amenable to it. + +--- + +## Data Flow + +``` +Grafana → HTTP proxy → record query in QueryTracker + ↓ (all queries fall back to Prometheus during observation window) + +[after observation_window_secs] + +QueryTracker::evaluate() + → reads current streaming_config + inference_config (empty on first run) + → builds ControllerConfig (queries + existing configs) + → PlannerClient::plan() + → sends PlannerResult via watch::Sender> + → sets applied = true (won't send again until repeated-reconfig is implemented) + +main.rs applier task (watches the receiver) + → PrecomputeEngineHandle::update_streaming_config() + → SimpleEngine::update_inference_config() + → SimpleMapStore::update_streaming_config() +``` + +--- + +## Changes Required + +### 1. `asap-planner-rs/src/config/input.rs` — `ControllerConfig` + +Add two optional fields for existing configs. The planner ignores them for now; they are wired through so that future repeated-reconfig can pass the current state as context to the planner. + +```rust +// NOTE: reserved for future repeated-reconfig — planner does not yet use these +pub existing_streaming_config: Option, +pub existing_inference_config: Option, +``` + +--- + +### 2. `query_tracker/tracker.rs` — `QueryTracker` + +- Add `Arc>` and `Arc>` fields — read-only, used to populate `ControllerConfig.existing_*` before calling the planner. +- Add `applied: AtomicBool` — only sends the result the first time. The background loop keeps running (for observability / future extension) but subsequent results are dropped until repeated-reconfig is wired up. +- `start_background_loop` gains a `tokio::sync::watch::Sender>` parameter. + +```rust +pub struct QueryTracker { + entries: Mutex>, + config: QueryTrackerConfig, + streaming_config: Arc>, // read-only reference + inference_config: Arc>, // read-only reference + applied: AtomicBool, +} +``` + +On first successful plan: send `Some(result)` over the watch channel and set `applied = true`. + +--- + +### 3. `precompute_engine/series_router.rs` — `WorkerMessage` + +Add a new variant so workers can receive config updates without restarting: + +```rust +pub enum WorkerMessage { + // ... existing variants ... + UpdateAggConfigs(HashMap>), +} +``` + +--- + +### 4. `precompute_engine/engine.rs` — `PrecomputeEngine` + new `PrecomputeEngineHandle` + +Currently `run()` creates channels and workers internally and consumes `self`, making post-start updates impossible. Restructure: + +- **Move channel creation to `new()`** — senders and receivers created at construction, stored on the struct. +- Add `PrecomputeEngineHandle`: + +```rust +pub struct PrecomputeEngineHandle { + worker_senders: Vec>, + ingest_agg_configs: Arc>>>, +} + +impl PrecomputeEngineHandle { + /// Update the ingest handler's agg_configs and broadcast new configs to all workers. + pub async fn update_streaming_config(&self, config: &StreamingConfig) { + let agg_configs_map: HashMap> = config + .get_all_aggregation_configs() + .iter() + .map(|(&id, cfg)| (id, Arc::new(cfg.clone()))) + .collect(); + let agg_configs_vec: Vec> = + agg_configs_map.values().cloned().collect(); + + // Lock-free atomic swap — ingest handler readers see new configs immediately + self.ingest_agg_configs.store(Arc::new(agg_configs_vec)); + + for sender in &self.worker_senders { + let _ = sender + .send(WorkerMessage::UpdateAggConfigs(agg_configs_map.clone())) + .await; + } + } +} +``` + +- `PrecomputeEngine::handle() -> Arc` — callable before `run()`. +- `run()` uses the handle's senders and `ingest_agg_configs` rather than creating its own. + +--- + +### 5. `precompute_engine/ingest_handler.rs` — `IngestState` + +```rust +// Before +agg_configs: Vec>, + +// After +agg_configs: ArcSwap>>, +``` + +`IngestState` is behind `Arc` (immutable), so `agg_configs` needs interior mutability. `ArcSwap` is used because the ingest handler reads `agg_configs` on the hot path (once per request, iterates over all configs per sample). `ArcSwap::load()` is lock-free for readers; the applier does a single atomic pointer swap via `ArcSwap::store()`. + +```rust +// Reading (hot path, per request): +let configs = state.agg_configs.load(); +for config in configs.iter() { ... } + +// Writing (once, from PrecomputeEngineHandle::update_streaming_config): +self.ingest_agg_configs.store(Arc::new(new_vec)); +``` + +--- + +### 6. `precompute_engine/worker.rs` — handle `UpdateAggConfigs` + +Workers process `WorkerMessage::UpdateAggConfigs` in their run loop: + +```rust +WorkerMessage::UpdateAggConfigs(new_configs) => { + self.agg_configs = new_configs; +} +``` + +Workers lazily create `WindowManager` instances the first time they see a new `agg_id`, so adding aggregations at runtime works without additional changes. + +--- + +### 7. `engines/simple_engine/mod.rs` — `SimpleEngine` + +`SimpleEngine` is already behind `Arc` at the call site, so no extra `Arc` wrapper is needed on the field — just a `RwLock` for interior mutability. + +```rust +// Before +inference_config: InferenceConfig, + +// After +inference_config: RwLock, +``` + +Add update method: + +```rust +pub fn update_inference_config(&self, new_config: InferenceConfig) { + *self.inference_config.write().unwrap() = new_config; +} +``` + +> **NOTE:** `streaming_config` and `inference_config` are applied to their respective components independently, not atomically. There is a brief window where the precompute engine has a new `streaming_config` but `SimpleEngine` is still using the old `inference_config`. This is acceptable for the current use case. + +--- + +### 8. `stores/simple_map_store/*.rs` — `SimpleMapStore` (all variants) + +The store variants are behind `Arc` at the call site. The `RwLock` goes around the `Arc` (not around the config itself) so that readers briefly lock to clone the Arc pointer, then use it without holding the lock during lookups. + +```rust +// Before +streaming_config: Arc, + +// After +streaming_config: RwLock>, +``` + +Add update method on the store trait and each implementation: + +```rust +pub fn update_streaming_config(&self, new_config: StreamingConfig) { + *self.streaming_config.write().unwrap() = Arc::new(new_config); +} +``` + +Readers clone the Arc cheaply under a brief shared lock, then look up aggregation configs from it without holding the lock: + +```rust +let config = self.streaming_config.read().unwrap().clone(); +config.get_aggregation_config(aggregation_id) +``` + +> **NOTE:** Precomputed data for aggregations removed by the new config is not purged immediately. It expires naturally via the existing cleanup policy. If immediate purge is ever needed, `update_streaming_config` is the right place to add it. + +--- + +### 9. `main.rs` — wiring + +```rust +// Start with empty configs +let streaming_config = Arc::new(RwLock::new(StreamingConfig::empty())); +let inference_config = Arc::new(RwLock::new(InferenceConfig::empty())); + +// All components share Arc clones +let store = SimpleMapStore::new(streaming_config.clone(), cleanup_policy); +let engine = SimpleEngine::new(inference_config.clone(), streaming_config.clone(), ...); +let precompute = PrecomputeEngine::new(config, streaming_config.clone(), sink); +let pe_handle = precompute.handle(); // extract before run() consumes self +tokio::spawn(precompute.run()); + +// Tracker reads current configs, sends first result via watch channel +let (plan_tx, mut plan_rx) = tokio::sync::watch::channel(None::); +let tracker = Arc::new(QueryTracker::new( + tracker_config, + streaming_config.clone(), + inference_config.clone(), +)); +tracker.start_background_loop(planner_client, plan_tx); + +// Applier task: watches for first plan result and applies it +// NOTE: streaming_config and inference_config are not applied atomically — see SimpleEngine note +tokio::spawn(async move { + loop { + if plan_rx.changed().await.is_err() { break; } + if let Some(result) = plan_rx.borrow().clone() { + pe_handle.update_streaming_config(&result.streaming_config).await; + engine.update_inference_config(result.inference_config); + store.update_streaming_config(result.streaming_config); + } + } +}); +``` + +--- + +## Files Touched + +| File | Change | +|---|---| +| `asap-planner-rs/src/config/input.rs` | Add `existing_streaming_config`, `existing_inference_config` to `ControllerConfig` | +| `query_tracker/tracker.rs` | Add config refs, `AtomicBool`, accept watch sender in `start_background_loop` | +| `precompute_engine/series_router.rs` | Add `UpdateAggConfigs` variant to `WorkerMessage` | +| `precompute_engine/engine.rs` | Move channel creation to `new()`, add `PrecomputeEngineHandle`, expose `handle()` | +| `precompute_engine/ingest_handler.rs` | `agg_configs: ArcSwap>` (lock-free reads on hot path) | +| `precompute_engine/worker.rs` | Handle `UpdateAggConfigs` message | +| `engines/simple_engine/mod.rs` | `RwLock`, add `update_inference_config` | +| `stores/simple_map_store/*.rs` | `RwLock>`, add `update_streaming_config` | +| `main.rs` | Wire watch channel, spawn applier task | diff --git a/Cargo.lock b/Cargo.lock index dc030cd4..67a640b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,6 +147,15 @@ dependencies = [ "object", ] +[[package]] +name = "arc-swap" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207" +dependencies = [ + "rustversion", +] + [[package]] name = "arrayref" version = "0.3.9" @@ -3536,6 +3545,7 @@ name = "query_engine_rust" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "arrow", "asap_planner", "asap_sketchlib", diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 43464627..55422bc3 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -1,4 +1,6 @@ use asap_types::enums::CleanupPolicy; +use asap_types::inference_config::InferenceConfig; +use asap_types::streaming_config::StreamingConfig; use asap_types::PromQLSchema; use promql_utilities::data_model::KeyByLabelNames; use serde::Deserialize; @@ -13,6 +15,18 @@ pub struct ControllerConfig { /// returns no series for a metric. Prometheus-inferred labels take priority. #[serde(default)] pub metrics: Option>, + /// Current streaming config, passed as context for repeated reconfiguration. + /// NOTE: reserved for future use — the planner does not yet act on these fields. + /// They are wired through now so that repeated-reconfig support can be added + /// without a second round of type-signature changes. + #[serde(default)] + pub existing_streaming_config: Option, + /// Current inference config, passed as context for repeated reconfiguration. + /// NOTE: see existing_streaming_config — same future-use caveat applies. + /// Not serializable via serde (InferenceConfig does not impl Deserialize); + /// set programmatically only. + #[serde(skip)] + pub existing_inference_config: Option, } impl ControllerConfig { diff --git a/asap-planner-rs/src/query_log/converter.rs b/asap-planner-rs/src/query_log/converter.rs index dad93532..44dda288 100644 --- a/asap-planner-rs/src/query_log/converter.rs +++ b/asap-planner-rs/src/query_log/converter.rs @@ -42,5 +42,7 @@ pub fn to_controller_config( policy: Some(CleanupPolicy::ReadBased), }), metrics: None, + existing_streaming_config: None, + existing_inference_config: None, } } diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index b432627a..fb2bab67 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -57,6 +57,7 @@ lazy_static = "1.4" zstd = "0.13" reqwest = { version = "0.11", features = ["json"] } tracing-appender = "0.2" +arc-swap = "1" elastic_dsl_utilities.workspace = true asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" } diff --git a/asap-query-engine/src/engines/simple_engine/elastic.rs b/asap-query-engine/src/engines/simple_engine/elastic.rs index c9d77556..3cdf1682 100644 --- a/asap-query-engine/src/engines/simple_engine/elastic.rs +++ b/asap-query-engine/src/engines/simple_engine/elastic.rs @@ -52,7 +52,7 @@ impl SimpleEngine { // TODO: Figure out how to handle query configuration for ElasticSearch queries. let query_config = self.find_query_config(&query)?; let agg_info = self - .get_aggregation_id_info(query_config) + .get_aggregation_id_info(&query_config) .map_err(|e| { warn!("{}", e); e @@ -76,14 +76,13 @@ impl SimpleEngine { }) .ok()?; - let grouping_labels = self - .streaming_config + let sc = self.streaming_config.read().unwrap().clone(); + let grouping_labels = sc .get_aggregation_config(agg_info.aggregation_id_for_value) .map(|config| config.grouping_labels.clone()) .unwrap_or_else(|| query_metadata.query_output_labels.clone()); - let aggregated_labels = self - .streaming_config + let aggregated_labels = sc .get_aggregation_config(agg_info.aggregation_id_for_key) .map(|config| config.aggregated_labels.clone()) .unwrap_or_else(KeyByLabelNames::empty); diff --git a/asap-query-engine/src/engines/simple_engine/mod.rs b/asap-query-engine/src/engines/simple_engine/mod.rs index 806971a2..7d477d8b 100644 --- a/asap-query-engine/src/engines/simple_engine/mod.rs +++ b/asap-query-engine/src/engines/simple_engine/mod.rs @@ -12,7 +12,7 @@ use crate::engines::query_result::{InstantVectorElement, QueryResult}; // }; use crate::stores::{Store, TimestampedBucketsMap}; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Instant; use tracing::{debug, warn}; @@ -125,8 +125,12 @@ pub struct RangeQueryExecutionContext { pub struct SimpleEngine { store: Arc, // promsketch_store: Option>, - inference_config: InferenceConfig, - streaming_config: Arc, + /// Updated at runtime via update_inference_config(). RwLock provides interior + /// mutability since SimpleEngine is shared behind Arc. + inference_config: RwLock, + /// Updated at runtime via update_streaming_config(). Readers briefly lock to + /// clone the Arc pointer, then use without holding the lock. + streaming_config: RwLock>, prometheus_scrape_interval: u64, controller_patterns: HashMap>, query_language: QueryLanguage, @@ -276,25 +280,45 @@ impl SimpleEngine { Self { store, // promsketch_store, - inference_config, - streaming_config, + inference_config: RwLock::new(inference_config), + streaming_config: RwLock::new(streaming_config), prometheus_scrape_interval, controller_patterns, query_language, } } + /// Replace the inference config at runtime. Called by the applier task after + /// the planner fires. + /// + /// NOTE: streaming_config and inference_config are applied to their respective + /// components independently (not atomically). A brief window may exist where + /// the precompute engine has a new streaming_config but this engine still uses + /// the old inference_config, causing query misses that fall back to Prometheus. + pub fn update_inference_config(&self, new_config: InferenceConfig) { + *self.inference_config.write().unwrap() = new_config; + } + + /// Replace the streaming config at runtime. Called by the applier task after + /// the planner fires. + pub fn update_streaming_config(&self, new_config: Arc) { + *self.streaming_config.write().unwrap() = new_config; + } + /// Convert query timestamp (seconds) to data timestamp (milliseconds) pub fn convert_query_time_to_data_time(query_time: f64) -> u64 { (query_time * 1000.0) as u64 } /// Finds the query configuration for a given query string - fn find_query_config(&self, query: &str) -> Option<&QueryConfig> { + fn find_query_config(&self, query: &str) -> Option { self.inference_config + .read() + .unwrap() .query_configs .iter() .find(|config| config.query == query) + .cloned() } /// Validates and potentially aligns end timestamp based on query pattern @@ -344,6 +368,8 @@ impl SimpleEngine { // Latest window only let window_size = self .streaming_config + .read() + .unwrap() .get_aggregation_config(agg_info.aggregation_id_for_key) .map(|config| config.window_size * 1000) .ok_or_else(|| { @@ -375,9 +401,9 @@ impl SimpleEngine { timestamps: &QueryTimestamps, agg_info: &AggregationIdInfo, ) -> Result { + let sc = self.streaming_config.read().unwrap().clone(); // Get aggregation config for value to determine window type - let aggregation_config_for_value = self - .streaming_config + let aggregation_config_for_value = sc .get_aggregation_config(agg_info.aggregation_id_for_value) .ok_or_else(|| { format!( @@ -896,10 +922,10 @@ impl SimpleEngine { let mut aggregation_type_for_key: Option = None; let mut aggregation_type_for_value: Option = None; + let sc = self.streaming_config.read().unwrap().clone(); if query_config_aggregations.len() == 2 { for aggregation in query_config_aggregations { - let aggregation_type = self - .streaming_config + let aggregation_type = sc .get_aggregation_config(aggregation.aggregation_id) .map(|config| config.aggregation_type) .ok_or_else(|| { @@ -935,8 +961,7 @@ impl SimpleEngine { } else { // Single aggregation: key and value share the same aggregation let id = query_config_aggregations[0].aggregation_id; - let agg_type = self - .streaming_config + let agg_type = sc .get_aggregation_config(id) .map(|config| config.aggregation_type) .ok_or_else(|| format!("No streaming config for aggregation_id {id}"))?; diff --git a/asap-query-engine/src/engines/simple_engine/promql.rs b/asap-query-engine/src/engines/simple_engine/promql.rs index c49e7f0f..7c062201 100644 --- a/asap-query-engine/src/engines/simple_engine/promql.rs +++ b/asap-query-engine/src/engines/simple_engine/promql.rs @@ -155,14 +155,20 @@ impl SimpleEngine { pub fn find_query_config_promql_structural( &self, arm_ast: &promql_parser::parser::Expr, - ) -> Option<&QueryConfig> { + ) -> Option { let arm_canonical = format!("{}", arm_ast); - self.inference_config.query_configs.iter().find(|config| { - let config_canonical = promql_parser::parser::parse(&config.query) - .map(|ast| format!("{}", ast)) - .unwrap_or_default(); - config_canonical == arm_canonical - }) + self.inference_config + .read() + .unwrap() + .query_configs + .iter() + .find(|config| { + let config_canonical = promql_parser::parser::parse(&config.query) + .map(|ast| format!("{}", ast)) + .unwrap_or_default(); + config_canonical == arm_canonical + }) + .cloned() } /// Variant of `build_query_execution_context_promql` that accepts a pre-parsed @@ -222,7 +228,8 @@ impl SimpleEngine { ) -> Option { let (metric, spatial_filter) = get_metric_and_spatial_filter(match_result); - let promql_schema = match &self.inference_config.schema { + let ic = self.inference_config.read().unwrap(); + let promql_schema = match &ic.schema { SchemaConfig::PromQL(schema) => schema, _ => return None, }; @@ -299,14 +306,13 @@ impl SimpleEngine { let do_merge = query_pattern_type == QueryPatternType::OnlyTemporal || query_pattern_type == QueryPatternType::OneTemporalOneSpatial; - let grouping_labels = self - .streaming_config + let sc = self.streaming_config.read().unwrap().clone(); + let grouping_labels = sc .get_aggregation_config(agg_info.aggregation_id_for_value) .map(|config| config.grouping_labels.clone()) .unwrap_or_else(|| query_output_labels.clone()); - let aggregated_labels = self - .streaming_config + let aggregated_labels = sc .get_aggregation_config(agg_info.aggregation_id_for_key) .map(|config| config.aggregated_labels.clone()) .unwrap_or_else(KeyByLabelNames::empty); @@ -355,7 +361,7 @@ impl SimpleEngine { other => { // Leaf pattern: structural config lookup + context + plan let config = self.find_query_config_promql_structural(other)?; - let ctx = self.build_query_execution_context_from_ast(other, config, time)?; + let ctx = self.build_query_execution_context_from_ast(other, &config, time)?; let label_names = ctx.metadata.query_output_labels.labels.clone(); let plan = ctx.to_logical_plan().ok()?; Some((plan, label_names)) @@ -463,7 +469,7 @@ impl SimpleEngine { other => { let config = self.find_query_config_promql_structural(other)?; let base_context = - self.build_query_execution_context_from_ast(other, config, end)?; + self.build_query_execution_context_from_ast(other, &config, end)?; let label_names = base_context.metadata.query_output_labels.labels.clone(); let start_ms = Self::convert_query_time_to_data_time(start); @@ -472,6 +478,8 @@ impl SimpleEngine { let tumbling_window_ms = self .streaming_config + .read() + .unwrap() .get_aggregation_config(base_context.agg_info.aggregation_id_for_value) .map(|c| c.window_size * 1000)?; @@ -619,7 +627,7 @@ impl SimpleEngine { .map(|d| d.num_seconds() as u64 * 1000), }; - let all_labels = match &self.inference_config.schema { + let all_labels = match &self.inference_config.read().unwrap().schema { SchemaConfig::PromQL(schema) => schema .get_labels(&metric) .cloned() @@ -1053,7 +1061,7 @@ impl SimpleEngine { // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. let agg_info: AggregationIdInfo = if let Some(config) = self.find_query_config(&query) { - self.get_aggregation_id_info(config) + self.get_aggregation_id_info(&config) .map_err(|e| { warn!("{}", e); e @@ -1067,6 +1075,9 @@ impl SimpleEngine { let requirements = self.build_query_requirements_promql(&match_result, query_pattern_type); self.streaming_config + .read() + .unwrap() + .clone() .find_compatible_aggregation(&requirements)? }; @@ -1106,6 +1117,8 @@ impl SimpleEngine { // Get window size let tumbling_window_ms = self .streaming_config + .read() + .unwrap() .get_aggregation_config(base_context.agg_info.aggregation_id_for_value) .map(|config| config.window_size * 1000)?; diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index 388954d7..e84f2a5f 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -25,25 +25,29 @@ impl SimpleEngine { /// each template in query_configs and compares it structurally against the incoming /// query_data — ignoring absolute timestamps and comparing only metric, aggregation, /// labels, time column name, and duration. - fn find_query_config_sql(&self, query_data: &SQLQueryData) -> Option<&QueryConfig> { - let schema = match &self.inference_config.schema { - SchemaConfig::SQL(sql_schema) => sql_schema, + fn find_query_config_sql(&self, query_data: &SQLQueryData) -> Option { + let ic = self.inference_config.read().unwrap(); + let schema = match &ic.schema { + SchemaConfig::SQL(sql_schema) => sql_schema.clone(), _ => return None, }; - self.inference_config.query_configs.iter().find(|config| { - let template_statements = - match parser::parse_sql(&GenericDialect {}, config.query.as_str()) { - Ok(stmts) => stmts, - Err(_) => return false, - }; - let template_data = - match SQLPatternParser::new(schema, 0.0).parse_query(&template_statements) { - Some(data) => data, - None => return false, - }; - query_data.matches_sql_pattern(&template_data) - }) + ic.query_configs + .iter() + .find(|config| { + let template_statements = + match parser::parse_sql(&GenericDialect {}, config.query.as_str()) { + Ok(stmts) => stmts, + Err(_) => return false, + }; + let template_data = + match SQLPatternParser::new(&schema, 0.0).parse_query(&template_statements) { + Some(data) => data, + None => return false, + }; + query_data.matches_sql_pattern(&template_data) + }) + .cloned() } /// Calculates start timestamp for SQL queries @@ -208,7 +212,7 @@ impl SimpleEngine { time: f64, ) -> Option { // Get SQL schema from inference config - let schema = match &self.inference_config.schema { + let schema = match &self.inference_config.read().unwrap().schema { SchemaConfig::SQL(sql_schema) => sql_schema.clone(), SchemaConfig::PromQL(_) => { warn!("SQL query requested but config has PromQL schema"); @@ -378,7 +382,7 @@ impl SimpleEngine { let agg_info: AggregationIdInfo = if let Some(config) = self.find_query_config_sql(&query_data) { - self.get_aggregation_id_info(config) + self.get_aggregation_id_info(&config) .map_err(|e| { warn!("{}", e); e @@ -388,6 +392,9 @@ impl SimpleEngine { warn!("No query_config entry for SQL query. Attempting capability-based matching."); let requirements = self.build_query_requirements_sql(&match_result, query_pattern_type); self.streaming_config + .read() + .unwrap() + .clone() .find_compatible_aggregation(&requirements)? }; @@ -444,14 +451,13 @@ impl SimpleEngine { }) .ok()?; - let grouping_labels = self - .streaming_config + let sc = self.streaming_config.read().unwrap().clone(); + let grouping_labels = sc .get_aggregation_config(agg_info.aggregation_id_for_value) .map(|config| config.grouping_labels.clone()) .unwrap_or_else(|| metadata.query_output_labels.clone()); - let aggregated_labels = self - .streaming_config + let aggregated_labels = sc .get_aggregation_config(agg_info.aggregation_id_for_key) .map(|config| config.aggregated_labels.clone()) .unwrap_or_else(KeyByLabelNames::empty); @@ -526,7 +532,7 @@ impl SimpleEngine { let agg_info: AggregationIdInfo = if let Some(config) = self.find_query_config_sql(query_data) { - self.get_aggregation_id_info(config) + self.get_aggregation_id_info(&config) .map_err(|e| { warn!("{}", e); e @@ -539,6 +545,9 @@ impl SimpleEngine { let requirements = self.build_query_requirements_sql(match_result, QueryPatternType::OnlyTemporal); self.streaming_config + .read() + .unwrap() + .clone() .find_compatible_aggregation(&requirements)? }; let metric = &match_result.outer_data()?.metric; diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index 37996c55..3a8c8a51 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -47,7 +47,7 @@ pub use drivers::{ pub use precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; pub use precompute_engine::output_sink::StoreOutputSink; -pub use precompute_engine::PrecomputeEngine; +pub use precompute_engine::{PrecomputeEngine, PrecomputeEngineHandle}; pub use query_tracker::{QueryTracker, QueryTrackerConfig}; diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index fa589aa0..78385c63 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -1,9 +1,9 @@ use clap::Parser; use query_engine_rust::data_model::QueryLanguage; use std::fs; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use tokio::signal; -use tracing::{error, info}; +use tracing::{error, info, warn}; use sketch_core::config::{self, ImplMode}; @@ -14,8 +14,8 @@ use query_engine_rust::precompute_engine::PrecomputeWorkerDiagnostics; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::{ HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpReceiver, - OtlpReceiverConfig, PrecomputeEngine, PrecomputeEngineConfig, Result, SimpleEngine, - SimpleMapStore, StoreOutputSink, + OtlpReceiverConfig, PrecomputeEngine, PrecomputeEngineConfig, PrecomputeEngineHandle, Result, + SimpleEngine, SimpleMapStore, StoreOutputSink, }; #[derive(Parser, Debug)] @@ -203,6 +203,12 @@ async fn main() -> Result<()> { ); info!("Streaming config: {:?}", streaming_config); + // Shared config refs — passed to QueryTracker so it can populate ControllerConfig + // with the current configs as context for the planner. The applier task updates + // them after applying a new plan so that subsequent windows see the latest state. + let streaming_config_ref = Arc::new(RwLock::new(streaming_config.clone())); + let inference_config_ref = Arc::new(RwLock::new(Arc::new(inference_config.clone()))); + // Setup store (equivalent to Python's SimpleMapStore()) // Get cleanup policy from inference config let cleanup_policy = inference_config.cleanup_policy; @@ -317,6 +323,10 @@ async fn main() -> Result<()> { // Automatically enable when using precompute streaming engine let enable_precompute = args.enable_prometheus_remote_write || args.streaming_engine == StreamingEngine::Precompute; + + // Handle extracted before run() so the applier task can call update_streaming_config. + let mut pe_engine_handle: Option = None; + let precompute_handle = if enable_precompute { let precompute_config = PrecomputeEngineConfig { num_workers: args.precompute_num_workers, @@ -330,9 +340,10 @@ async fn main() -> Result<()> { late_data_policy: LateDataPolicy::Drop, }; let output_sink = Arc::new(StoreOutputSink::new(store.clone())); - let engine = - PrecomputeEngine::new(precompute_config, streaming_config.clone(), output_sink); - let worker_diagnostics = engine.diagnostics(); + let pe = PrecomputeEngine::new(precompute_config, streaming_config.clone(), output_sink); + let worker_diagnostics = pe.diagnostics(); + // Extract the handle before run() consumes the engine. + pe_engine_handle = Some(pe.handle()); info!( "Starting precompute engine on port {}", args.prometheus_remote_write_port @@ -345,7 +356,7 @@ async fn main() -> Result<()> { }); Some(tokio::spawn(async move { - if let Err(e) = engine.run().await { + if let Err(e) = pe.run().await { error!("Precompute engine error: {}", e); } })) @@ -381,7 +392,7 @@ async fn main() -> Result<()> { }; let query_tracker = if args.enable_query_tracker { - use query_engine_rust::planner_client::LocalPlannerClient; + use query_engine_rust::planner_client::{LocalPlannerClient, PlannerResult}; use query_engine_rust::QueryTrackerConfig; let tracker_config = QueryTrackerConfig { @@ -400,8 +411,59 @@ async fn main() -> Result<()> { args.query_language, args.prometheus_server.clone(), )); - let tracker = Arc::new(query_engine_rust::QueryTracker::new(tracker_config)); - let _tracker_handle = tracker.start_background_loop(planner_client); + + let (plan_tx, plan_rx) = tokio::sync::watch::channel(None::); + + let tracker = Arc::new(query_engine_rust::QueryTracker::new( + tracker_config, + streaming_config_ref.clone(), + inference_config_ref.clone(), + )); + let _tracker_handle = tracker.start_background_loop(planner_client, plan_tx); + + // Applier task: watches for the first plan result and applies it to all + // running components. + // NOTE: streaming_config and inference_config are not applied atomically + // across components. A brief window may exist where one component has the + // new config and another still has the old one, causing query misses that + // fall back to Prometheus. This is acceptable for a one-shot first-plan apply. + let engine_for_applier = engine.clone(); + let store_for_applier = store.clone(); + let streaming_config_ref_for_applier = streaming_config_ref.clone(); + let inference_config_ref_for_applier = inference_config_ref.clone(); + tokio::spawn(async move { + let mut rx = plan_rx; + loop { + if rx.changed().await.is_err() { + break; + } + let result = rx.borrow().clone(); + if let Some(result) = result { + // 1. Apply to precompute engine (lock-free ArcSwap + worker broadcast). + if let Some(ref handle) = pe_engine_handle { + if let Err(e) = handle + .update_streaming_config(&result.streaming_config) + .await + { + warn!("Applier: failed to update precompute engine: {}", e); + } + } + // 2. Apply to query engine. + engine_for_applier + .update_streaming_config(Arc::new(result.streaming_config.clone())); + engine_for_applier.update_inference_config(result.inference_config.clone()); + // 3. Apply to store. + store_for_applier.update_streaming_config(result.streaming_config.clone()); + // 4. Update shared config refs so future tracker windows see the new state. + *streaming_config_ref_for_applier.write().unwrap() = + Arc::new(result.streaming_config); + *inference_config_ref_for_applier.write().unwrap() = + Arc::new(result.inference_config); + info!("Applier: applied new plan from query tracker"); + } + } + }); + info!( "Query tracker enabled (observation window: {}s)", args.tracker_observation_window_secs diff --git a/asap-query-engine/src/planner_client.rs b/asap-query-engine/src/planner_client.rs index f1c5b302..42f4a85a 100644 --- a/asap-query-engine/src/planner_client.rs +++ b/asap-query-engine/src/planner_client.rs @@ -7,6 +7,7 @@ use asap_types::inference_config::InferenceConfig; use asap_types::streaming_config::StreamingConfig; use tracing::warn; +#[derive(Clone)] pub struct PlannerResult { pub streaming_config: StreamingConfig, pub inference_config: InferenceConfig, @@ -116,6 +117,8 @@ mod tests { }]), sketch_parameters: None, aggregate_cleanup: None, + existing_streaming_config: None, + existing_inference_config: None, } } diff --git a/asap-query-engine/src/precompute_engine/engine.rs b/asap-query-engine/src/precompute_engine/engine.rs index 8fd45b8d..fad7885f 100644 --- a/asap-query-engine/src/precompute_engine/engine.rs +++ b/asap-query-engine/src/precompute_engine/engine.rs @@ -6,6 +6,7 @@ use crate::precompute_engine::ingest_handler::{ use crate::precompute_engine::output_sink::OutputSink; use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; use crate::precompute_engine::worker::{Worker, WorkerRuntimeConfig}; +use arc_swap::ArcSwap; use asap_types::aggregation_config::AggregationConfig; use axum::{routing::post, Router}; use std::collections::HashMap; @@ -21,14 +22,61 @@ pub struct PrecomputeWorkerDiagnostics { pub worker_watermarks: Vec>, } +/// A cloneable handle for applying runtime config updates to a running engine. +/// +/// Obtained via `PrecomputeEngine::handle()` before calling `run()`. +/// Calling `update_streaming_config` swaps the ingest handler's agg_configs +/// atomically (lock-free via ArcSwap) and broadcasts the new map to all workers. +pub struct PrecomputeEngineHandle { + router: SeriesRouter, + ingest_agg_configs: Arc>>>, +} + +impl PrecomputeEngineHandle { + /// Apply a new streaming config to the running engine. + /// + /// Updates the ingest handler's agg_configs via a lock-free ArcSwap store, + /// then broadcasts the new config map to all workers via their message channels. + pub async fn update_streaming_config( + &self, + config: &StreamingConfig, + ) -> Result<(), Box> { + let agg_configs_map: HashMap> = config + .get_all_aggregation_configs() + .iter() + .map(|(&id, cfg)| (id, Arc::new(cfg.clone()))) + .collect(); + let agg_configs_vec: Vec> = + agg_configs_map.values().cloned().collect(); + + self.ingest_agg_configs.store(Arc::new(agg_configs_vec)); + self.router + .broadcast_update_agg_configs(agg_configs_map) + .await?; + + info!( + "PrecomputeEngineHandle: applied new streaming config ({} aggregations)", + config.get_all_aggregation_configs().len() + ); + Ok(()) + } +} + /// The top-level precompute engine orchestrator. /// /// Creates worker threads, the series router, and the Axum ingest server. +/// Call `handle()` before `run()` to obtain a `PrecomputeEngineHandle` for +/// applying runtime config updates while the engine is running. pub struct PrecomputeEngine { config: PrecomputeEngineConfig, streaming_config: Arc, output_sink: Arc, diagnostics: Arc, + /// Channels created at construction so handle() can be extracted before run(). + senders: Vec>, + receivers: Option>>, + /// Shared ingest agg_configs, swappable at runtime. + ingest_agg_configs: Arc>>>, } impl PrecomputeEngine { @@ -47,11 +95,33 @@ impl PrecomputeEngine { worker_group_counts, worker_watermarks, }); + + // Build channels and initial agg_configs at construction time so that + // handle() can be called before run(). + let channel_size = config.channel_buffer_size; + let mut senders = Vec::with_capacity(config.num_workers); + let mut receivers = Vec::with_capacity(config.num_workers); + for _ in 0..config.num_workers { + let (tx, rx) = mpsc::channel::(channel_size); + senders.push(tx); + receivers.push(rx); + } + + let agg_configs_vec: Vec> = streaming_config + .get_all_aggregation_configs() + .values() + .map(|cfg| Arc::new(cfg.clone())) + .collect(); + let ingest_agg_configs = Arc::new(ArcSwap::from_pointee(agg_configs_vec)); + Self { config, streaming_config, output_sink, diagnostics, + senders, + receivers: Some(receivers), + ingest_agg_configs, } } @@ -60,26 +130,29 @@ impl PrecomputeEngine { self.diagnostics.clone() } + /// Return a handle for applying runtime config updates to this engine. + /// Must be called before `run()`. + pub fn handle(&self) -> PrecomputeEngineHandle { + PrecomputeEngineHandle { + router: SeriesRouter::new(self.senders.clone()), + ingest_agg_configs: self.ingest_agg_configs.clone(), + } + } + /// Start the precompute engine. This spawns worker tasks and the HTTP /// ingest server, then blocks until shutdown. - pub async fn run(self) -> Result<(), Box> { + pub async fn run(mut self) -> Result<(), Box> { let num_workers = self.config.num_workers; - let channel_size = self.config.channel_buffer_size; - // Build MPSC channels for each worker - let mut senders = Vec::with_capacity(num_workers); - let mut receivers = Vec::with_capacity(num_workers); - for _ in 0..num_workers { - let (tx, rx) = mpsc::channel::(channel_size); - senders.push(tx); - receivers.push(rx); - } + let receivers = self + .receivers + .take() + .expect("PrecomputeEngine::run() called twice"); - // Build the router - let router = SeriesRouter::new(senders); + // Build the router from the pre-created senders. + let router = SeriesRouter::new(self.senders.clone()); - // Build aggregation config map from streaming config, wrapping each config - // in Arc so all workers share one copy per aggregation (no N×M deep clones). + // Build aggregation config map from streaming config for workers. let agg_configs: HashMap> = self .streaming_config .get_all_aggregation_configs() @@ -87,9 +160,6 @@ impl PrecomputeEngine { .map(|(&id, cfg)| (id, Arc::new(cfg.clone()))) .collect(); - // Build a Vec> for the ingest handler - let agg_configs_vec: Vec> = agg_configs.values().cloned().collect(); - // Spawn workers let mut worker_handles = Vec::with_capacity(num_workers); for (id, rx) in receivers.into_iter().enumerate() { @@ -120,11 +190,12 @@ impl PrecomputeEngine { num_workers, self.config.ingest_port ); - // Build the ingest state + // Build the ingest state, sharing the same Arc as the handle so + // that PrecomputeEngineHandle::update_streaming_config swaps are visible here. let ingest_state = Arc::new(IngestState { router, samples_ingested: std::sync::atomic::AtomicU64::new(0), - agg_configs: agg_configs_vec, + agg_configs: self.ingest_agg_configs.clone(), pass_raw_samples: self.config.pass_raw_samples, }); diff --git a/asap-query-engine/src/precompute_engine/ingest_handler.rs b/asap-query-engine/src/precompute_engine/ingest_handler.rs index 03b9ac51..dd594790 100644 --- a/asap-query-engine/src/precompute_engine/ingest_handler.rs +++ b/asap-query-engine/src/precompute_engine/ingest_handler.rs @@ -2,6 +2,7 @@ use crate::drivers::ingest::prometheus_remote_write::decode_prometheus_remote_wr use crate::drivers::ingest::victoriametrics_remote_write::decode_victoriametrics_remote_write; use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; use crate::precompute_engine::worker::{extract_metric_name, parse_labels_from_series_key}; +use arc_swap::ArcSwap; use asap_types::aggregation_config::AggregationConfig; use axum::{body::Bytes, extract::State, http::StatusCode}; use std::collections::HashMap; @@ -14,7 +15,10 @@ pub(crate) struct IngestState { pub(crate) router: SeriesRouter, pub(crate) samples_ingested: std::sync::atomic::AtomicU64, /// Aggregation configs for group-key extraction. - pub(crate) agg_configs: Vec>, + /// Wrapped in Arc so the same ArcSwap is shared with PrecomputeEngineHandle. + /// The handle calls ArcSwap::store() to push a new Vec; this state sees it + /// immediately via the shared Arc pointer (lock-free on the read path). + pub(crate) agg_configs: Arc>>>, /// When true, skip group-key extraction and pass raw samples through. pub(crate) pass_raw_samples: bool, } @@ -86,9 +90,11 @@ async fn route_decoded_samples( type SampleTuple = (String, i64, f64); let mut by_group: HashMap> = HashMap::new(); + // Load agg_configs once per request (lock-free ArcSwap read). + let agg_configs = state.agg_configs.load(); for s in &samples { let metric_name = extract_metric_name(&s.labels); - for config in &state.agg_configs { + for config in agg_configs.iter() { if config.metric != metric_name && config.spatial_filter_normalized != metric_name && config.spatial_filter != metric_name diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index 0edda2fb..dba95dd5 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -8,4 +8,4 @@ pub mod series_router; pub mod window_manager; pub mod worker; -pub use engine::{PrecomputeEngine, PrecomputeWorkerDiagnostics}; +pub use engine::{PrecomputeEngine, PrecomputeEngineHandle, PrecomputeWorkerDiagnostics}; diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs index 45ca1b2a..012a1cbb 100644 --- a/asap-query-engine/src/precompute_engine/series_router.rs +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -1,5 +1,7 @@ +use asap_types::aggregation_config::AggregationConfig; use futures::future::try_join_all; use std::collections::HashMap; +use std::sync::Arc; use std::time::Instant; use tokio::sync::mpsc; use xxhash_rust::xxh64::xxh64; @@ -32,6 +34,10 @@ pub enum WorkerMessage { Flush, /// Graceful shutdown. Shutdown, + /// Push a new set of aggregation configs to this worker. + /// Workers replace their local map on receipt; new agg_ids are picked up + /// lazily the next time a matching sample arrives. + UpdateAggConfigs(HashMap>), } /// Routes incoming samples to one of N workers based on a consistent hash. @@ -103,6 +109,20 @@ impl SeriesRouter { Ok(()) } + /// Broadcast updated aggregation configs to all workers. + pub async fn broadcast_update_agg_configs( + &self, + agg_configs: HashMap>, + ) -> Result<(), Box> { + for (i, sender) in self.senders.iter().enumerate() { + sender + .send(WorkerMessage::UpdateAggConfigs(agg_configs.clone())) + .await + .map_err(|e| format!("Failed to send UpdateAggConfigs to worker {}: {}", i, e))?; + } + Ok(()) + } + /// Broadcast shutdown to all workers. pub async fn broadcast_shutdown(&self) -> Result<(), Box> { for (i, sender) in self.senders.iter().enumerate() { diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 3e86980f..1c37723d 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -169,6 +169,95 @@ impl Worker { } break; } + WorkerMessage::UpdateAggConfigs(new_configs) => { + // Flush and evict group states for agg IDs that are being removed. + // Must happen before swapping agg_configs so GroupState.config is + // still valid during the final window close. + let removed_ids: Vec = self + .agg_configs + .keys() + .filter(|id| !new_configs.contains_key(id)) + .copied() + .collect(); + + if !removed_ids.is_empty() { + let mut emit_batch: Vec<(PrecomputedOutput, Box)> = + Vec::new(); + + for agg_id in &removed_ids { + // Drain all group states for this agg_id. + let removed_keys: Vec<_> = self + .group_states + .keys() + .filter(|(id, _)| id == agg_id) + .cloned() + .collect(); + + for key in removed_keys { + let Some(state) = self.group_states.remove(&key) else { + continue; + }; + if state.previous_watermark_ms == i64::MIN { + continue; // No samples received — nothing to emit. + } + // Force-close all open windows by advancing the watermark + // to i64::MAX. No new samples will arrive for this group. + let (group_key_str, mut active_panes) = + (key.1.clone(), state.active_panes); + let closed = state + .window_manager + .closed_windows(state.previous_watermark_ms, i64::MAX); + + for window_start in &closed { + let (_, window_end) = + state.window_manager.window_bounds(*window_start); + let pane_starts = + state.window_manager.panes_for_window(*window_start); + if let Some(accumulator) = + merge_panes_for_window(&mut active_panes, &pane_starts) + { + let group_key_lv = + build_group_key_label_values(&group_key_str); + let output = PrecomputedOutput::new( + *window_start as u64, + window_end as u64, + Some(group_key_lv), + *agg_id, + ); + emit_batch.push((output, accumulator)); + } + } + } + } + + if !emit_batch.is_empty() { + if let Err(e) = self.output_sink.emit_batch(emit_batch) { + warn!( + "Worker {}: error flushing removed agg_ids {:?}: {}", + self.id, removed_ids, e + ); + } + } + + self.group_count + .store(self.group_states.len(), Ordering::Relaxed); + info!( + "Worker {}: evicted {} removed agg_id(s) {:?}", + self.id, + removed_ids.len(), + removed_ids, + ); + } + + let added = new_configs.len().saturating_sub(self.agg_configs.len()); + self.agg_configs = new_configs; + info!( + "Worker {}: agg_configs updated ({} total, ~{} added)", + self.id, + self.agg_configs.len(), + added, + ); + } } } @@ -1967,4 +2056,106 @@ aggregations: "worker watermark should be published after flush" ); } + + // ----------------------------------------------------------------------- + // Test: UpdateAggConfigs enables processing of a new aggregation at runtime + // ----------------------------------------------------------------------- + + #[tokio::test] + async fn test_update_agg_configs_enables_new_aggregation_at_runtime() { + let config = make_agg_config( + 1, + "cpu", + AggregationType::SingleSubpopulation, + "Sum", + 10, // 10s tumbling window + 0, + vec![], + ); + + let sink = Arc::new(CapturingOutputSink::new()); + let (tx, rx) = tokio::sync::mpsc::channel(32); + let wm = Arc::new(AtomicI64::new(i64::MIN)); + // Start worker with NO agg configs — it doesn't know about agg_id=1 yet. + let worker = Worker::new( + 0, + rx, + sink.clone(), + HashMap::new(), + WorkerRuntimeConfig { + max_buffer_per_series: 10_000, + allowed_lateness_ms: 0, + pass_raw_samples: false, + raw_mode_aggregation_id: 0, + late_data_policy: LateDataPolicy::Drop, + }, + Arc::new(AtomicUsize::new(0)), + wm.clone(), + vec![wm], + ); + let handle = tokio::spawn(async move { worker.run().await }); + + // Sample arrives before config update — silently ignored (agg_id unknown). + tx.send(WorkerMessage::GroupSamples { + agg_id: 1, + group_key: String::new(), + samples: vec![("cpu".to_string(), 1_000, 1.0)], + ingest_received_at: std::time::Instant::now(), + }) + .await + .unwrap(); + + // Push the new agg config at runtime. + let mut new_configs = HashMap::new(); + new_configs.insert(1, Arc::new(config)); + tx.send(WorkerMessage::UpdateAggConfigs(new_configs)) + .await + .unwrap(); + + // Post-update samples — should be processed now. + tx.send(WorkerMessage::GroupSamples { + agg_id: 1, + group_key: String::new(), + samples: vec![("cpu".to_string(), 5_000, 2.0)], + ingest_received_at: std::time::Instant::now(), + }) + .await + .unwrap(); + // t=10_000 closes window [0, 10_000). + tx.send(WorkerMessage::GroupSamples { + agg_id: 1, + group_key: String::new(), + samples: vec![("cpu".to_string(), 10_000, 0.0)], + ingest_received_at: std::time::Instant::now(), + }) + .await + .unwrap(); + + tx.send(WorkerMessage::Shutdown).await.unwrap(); + handle.await.unwrap(); + + let captured = sink.drain(); + assert_eq!( + captured.len(), + 1, + "one window should close after UpdateAggConfigs" + ); + + let (output, acc) = &captured[0]; + assert_eq!(output.aggregation_id, 1); + assert_eq!(output.start_timestamp, 0); + assert_eq!(output.end_timestamp, 10_000); + + let sum_acc = acc + .as_any() + .downcast_ref::() + .expect("should be SumAccumulator"); + // Pre-update sample (t=1000, val=1.0) was dropped — agg_id was unknown. + // Post-update sample (t=5000, val=2.0) is the only one aggregated. + assert!( + (sum_acc.sum - 2.0).abs() < 1e-10, + "only post-update sample should be aggregated, got {}", + sum_acc.sum + ); + } } diff --git a/asap-query-engine/src/query_tracker/tracker.rs b/asap-query-engine/src/query_tracker/tracker.rs index f2323e7e..08223092 100644 --- a/asap-query-engine/src/query_tracker/tracker.rs +++ b/asap-query-engine/src/query_tracker/tracker.rs @@ -1,12 +1,16 @@ -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use asap_planner::query_log::{infer_queries, to_controller_config, LogEntry}; +use asap_types::inference_config::InferenceConfig; +use asap_types::streaming_config::StreamingConfig; use chrono::{DateTime, Utc}; +use tokio::sync::watch; use tokio::task::JoinHandle; use tracing::{info, warn}; -use crate::planner_client::PlannerClient; +use crate::planner_client::{PlannerClient, PlannerResult}; #[derive(Debug, Clone)] pub struct QueryTrackerConfig { @@ -19,13 +23,29 @@ pub struct QueryTrackerConfig { pub struct QueryTracker { entries: Mutex>, config: QueryTrackerConfig, + /// Read-only snapshot of the current streaming config, shared with the applier + /// task in main.rs. Used to populate ControllerConfig.existing_streaming_config. + streaming_config: Arc>>, + /// Read-only snapshot of the current inference config, shared with the applier task. + inference_config: Arc>>, + /// Set to true after the first successful plan is sent. The background loop keeps + /// running (for observability) but will not send further results until repeated- + /// reconfig is wired up. + applied: AtomicBool, } impl QueryTracker { - pub fn new(config: QueryTrackerConfig) -> Self { + pub fn new( + config: QueryTrackerConfig, + streaming_config: Arc>>, + inference_config: Arc>>, + ) -> Self { Self { entries: Mutex::new(Vec::new()), config, + streaming_config, + inference_config, + applied: AtomicBool::new(false), } } @@ -57,9 +77,14 @@ impl QueryTracker { } /// Spawn a background task that periodically evaluates collected queries and calls the planner. + /// + /// On the first successful plan, sends the result via `plan_tx` and sets `applied = true`. + /// The loop keeps running after that (for observability), but further results are dropped + /// until repeated-reconfig is wired up. pub fn start_background_loop( self: &Arc, planner_client: Arc, + plan_tx: watch::Sender>, ) -> JoinHandle<()> { let tracker = Arc::clone(self); tokio::spawn(async move { @@ -70,12 +95,16 @@ impl QueryTracker { loop { interval.tick().await; - Self::evaluate(&tracker, &planner_client).await; + Self::evaluate(&tracker, &planner_client, &plan_tx).await; } }) } - async fn evaluate(tracker: &Arc, planner_client: &Arc) { + async fn evaluate( + tracker: &Arc, + planner_client: &Arc, + plan_tx: &watch::Sender>, + ) { // Snapshot and drain collected entries. let entries = { let mut guard = tracker.entries.lock().unwrap(); @@ -106,7 +135,13 @@ impl QueryTracker { return; } - let controller_config = to_controller_config(instants, ranges); + // Build ControllerConfig, including current configs as context for the planner. + // NOTE: existing_* fields are wired through but the planner does not yet act on them. + let mut controller_config = to_controller_config(instants, ranges); + controller_config.existing_streaming_config = + Some((*tracker.streaming_config.read().unwrap().clone()).clone()); + controller_config.existing_inference_config = + Some((*tracker.inference_config.read().unwrap().clone()).clone()); info!( "query_tracker: calling planner with {} query groups", @@ -121,6 +156,14 @@ impl QueryTracker { result.inference_config.query_configs.len(), result.punted_queries.len(), ); + // Send result only on first successful plan. + if !tracker.applied.load(Ordering::Acquire) { + tracker.applied.store(true, Ordering::Release); + let _ = plan_tx.send(Some(result)); + info!("query_tracker: plan applied (first time); subsequent windows will run but not re-apply"); + } else { + info!("query_tracker: plan already applied; skipping re-apply (repeated-reconfig not yet implemented)"); + } } Err(e) => { warn!("query_tracker: planner failed: {}", e); @@ -143,6 +186,16 @@ mod tests { use asap_types::streaming_config::StreamingConfig; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; + use tokio::sync::watch; + + fn make_tracker(config: QueryTrackerConfig) -> QueryTracker { + let sc = Arc::new(RwLock::new(Arc::new(StreamingConfig::new(HashMap::new())))); + let ic = Arc::new(RwLock::new(Arc::new(InferenceConfig::new( + asap_types::enums::QueryLanguage::promql, + asap_types::enums::CleanupPolicy::NoCleanup, + )))); + QueryTracker::new(config, sc, ic) + } struct MockPlannerClient { call_count: AtomicUsize, @@ -173,7 +226,7 @@ mod tests { #[test] fn record_instant_appends_entry() { - let tracker = QueryTracker::new(QueryTrackerConfig { + let tracker = make_tracker(QueryTrackerConfig { observation_window_secs: 600, prometheus_scrape_interval: 15, }); @@ -187,7 +240,7 @@ mod tests { #[test] fn record_range_appends_entry() { - let tracker = QueryTracker::new(QueryTrackerConfig { + let tracker = make_tracker(QueryTrackerConfig { observation_window_secs: 600, prometheus_scrape_interval: 15, }); @@ -205,7 +258,7 @@ mod tests { #[tokio::test] async fn evaluate_calls_planner_with_entries() { - let tracker = Arc::new(QueryTracker::new(QueryTrackerConfig { + let tracker = Arc::new(make_tracker(QueryTrackerConfig { observation_window_secs: 600, prometheus_scrape_interval: 15, })); @@ -219,7 +272,13 @@ mod tests { } let mock_client = Arc::new(MockPlannerClient::new()); - QueryTracker::evaluate(&tracker, &(mock_client.clone() as Arc)).await; + let (plan_tx, _plan_rx) = watch::channel(None::); + QueryTracker::evaluate( + &tracker, + &(mock_client.clone() as Arc), + &plan_tx, + ) + .await; assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 1); // Entries should be drained after evaluate. @@ -228,14 +287,150 @@ mod tests { #[tokio::test] async fn evaluate_skips_when_no_entries() { - let tracker = Arc::new(QueryTracker::new(QueryTrackerConfig { + let tracker = Arc::new(make_tracker(QueryTrackerConfig { observation_window_secs: 600, prometheus_scrape_interval: 15, })); let mock_client = Arc::new(MockPlannerClient::new()); - QueryTracker::evaluate(&tracker, &(mock_client.clone() as Arc)).await; + let (plan_tx, _plan_rx) = watch::channel(None::); + QueryTracker::evaluate( + &tracker, + &(mock_client.clone() as Arc), + &plan_tx, + ) + .await; assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 0); } + + #[tokio::test] + async fn evaluate_sends_plan_only_once() { + let tracker = Arc::new(make_tracker(QueryTrackerConfig { + observation_window_secs: 600, + prometheus_scrape_interval: 15, + })); + + // Record enough for infer_queries to produce results. + for i in 0..5 { + tracker.record_instant( + "rate(http_requests_total[5m])", + 1700000000.0 + (i as f64 * 60.0), + ); + } + + let mock_client = Arc::new(MockPlannerClient::new()); + let (plan_tx, mut plan_rx) = watch::channel(None::); + + // First evaluate: result sent, applied flag set. + QueryTracker::evaluate( + &tracker, + &(mock_client.clone() as Arc), + &plan_tx, + ) + .await; + + assert!(tracker.applied.load(Ordering::SeqCst)); + // Mark the sent value as seen so subsequent changed() only fires for new sends. + assert!(plan_rx.borrow_and_update().is_some()); + + // Record entries for a second window. + for i in 0..5 { + tracker.record_instant( + "rate(http_requests_total[5m])", + 1700003600.0 + (i as f64 * 60.0), + ); + } + + // Second evaluate: planner is called again but must NOT re-send on the channel. + QueryTracker::evaluate( + &tracker, + &(mock_client.clone() as Arc), + &plan_tx, + ) + .await; + + assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 2); + + // changed() should time out — no new value sent. + let timed_out = + tokio::time::timeout(std::time::Duration::from_millis(10), plan_rx.changed()) + .await + .is_err(); + assert!(timed_out, "second evaluate must not send a new plan"); + } + + #[tokio::test] + async fn evaluate_populates_existing_configs_in_controller_config() { + use asap_planner::ControllerConfig; + use std::sync::Mutex; + + struct CapturingPlannerClient { + captured: Mutex>, + } + + impl CapturingPlannerClient { + fn new() -> Self { + Self { + captured: Mutex::new(None), + } + } + } + + #[async_trait::async_trait] + impl PlannerClient for CapturingPlannerClient { + async fn plan(&self, config: ControllerConfig) -> anyhow::Result { + *self.captured.lock().unwrap() = Some(config); + Ok(PlannerResult { + streaming_config: StreamingConfig::new(HashMap::new()), + inference_config: InferenceConfig::new( + asap_types::enums::QueryLanguage::promql, + asap_types::enums::CleanupPolicy::NoCleanup, + ), + punted_queries: vec![], + }) + } + } + + let sc = Arc::new(RwLock::new(Arc::new(StreamingConfig::new(HashMap::new())))); + let ic = Arc::new(RwLock::new(Arc::new(InferenceConfig::new( + asap_types::enums::QueryLanguage::promql, + asap_types::enums::CleanupPolicy::NoCleanup, + )))); + let tracker = Arc::new(QueryTracker::new( + QueryTrackerConfig { + observation_window_secs: 600, + prometheus_scrape_interval: 15, + }, + sc, + ic, + )); + + for i in 0..5 { + tracker.record_instant( + "rate(http_requests_total[5m])", + 1700000000.0 + (i as f64 * 60.0), + ); + } + + let client = Arc::new(CapturingPlannerClient::new()); + let (plan_tx, _plan_rx) = watch::channel(None::); + QueryTracker::evaluate( + &tracker, + &(client.clone() as Arc), + &plan_tx, + ) + .await; + + let captured = client.captured.lock().unwrap(); + let config = captured.as_ref().expect("planner should have been called"); + assert!( + config.existing_streaming_config.is_some(), + "existing_streaming_config must be populated from the shared ref" + ); + assert!( + config.existing_inference_config.is_some(), + "existing_inference_config must be populated from the shared ref" + ); + } } diff --git a/asap-query-engine/src/stores/simple_map_store/global.rs b/asap-query-engine/src/stores/simple_map_store/global.rs index e539ae89..08b59fc8 100644 --- a/asap-query-engine/src/stores/simple_map_store/global.rs +++ b/asap-query-engine/src/stores/simple_map_store/global.rs @@ -6,8 +6,7 @@ use crate::stores::simple_map_store::common::{ }; use crate::stores::{Store, StoreResult, TimestampedBucketsMap}; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::sync::Arc; -use std::sync::Mutex; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Instant; use tracing::{debug, error, info}; @@ -135,8 +134,10 @@ pub struct SimpleMapStoreGlobal { // Single global mutex protecting all data structures lock: Mutex, - // Store the streaming configuration - streaming_config: Arc, + // Store the streaming configuration. + // RwLock>: readers briefly lock to clone the Arc pointer, + // then use the Arc without holding the lock. + streaming_config: RwLock>, // Policy for cleaning up old aggregates cleanup_policy: CleanupPolicy, @@ -152,11 +153,16 @@ impl SimpleMapStoreGlobal { earliest_timestamp_per_aggregation_id: HashMap::new(), read_counts: HashMap::new(), }), - streaming_config, + streaming_config: RwLock::new(streaming_config), cleanup_policy, } } + /// Replace the streaming config at runtime. + pub fn update_streaming_config(&self, new_config: StreamingConfig) { + *self.streaming_config.write().unwrap() = Arc::new(new_config); + } + /// Collect diagnostic info about store contents. pub fn diagnostic_info(&self) -> super::StoreDiagnostics { use super::{AggregationDiagnostic, StoreDiagnostics}; @@ -246,10 +252,9 @@ impl Store for SimpleMapStoreGlobal { // Also pre-compute batch_min_ts per group to collapse N earliest-ts updates into 1. let mut grouped: GroupedBatch = HashMap::new(); + let sc = self.streaming_config.read().unwrap().clone(); for (output, precompute) in outputs { - let aggregation_config = self - .streaming_config - .get_aggregation_config(output.aggregation_id); + let aggregation_config = sc.get_aggregation_config(output.aggregation_id); if aggregation_config.is_none() { error!( diff --git a/asap-query-engine/src/stores/simple_map_store/mod.rs b/asap-query-engine/src/stores/simple_map_store/mod.rs index 29c78d60..17e6bace 100644 --- a/asap-query-engine/src/stores/simple_map_store/mod.rs +++ b/asap-query-engine/src/stores/simple_map_store/mod.rs @@ -64,6 +64,14 @@ impl SimpleMapStore { } } } + + /// Replace the streaming config at runtime. Delegates to the active variant. + pub fn update_streaming_config(&self, new_config: StreamingConfig) { + match self { + SimpleMapStore::Global(store) => store.update_streaming_config(new_config), + SimpleMapStore::PerKey(store) => store.update_streaming_config(new_config), + } + } } #[async_trait::async_trait] @@ -139,3 +147,84 @@ impl Store for SimpleMapStore { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::data_model::{AggregationType, PrecomputedOutput}; + use crate::precompute_operators::sum_accumulator::SumAccumulator; + use crate::stores::Store; + use asap_types::aggregation_config::AggregationConfig; + use asap_types::enums::WindowType; + use promql_utilities::data_model::key_by_label_names::KeyByLabelNames; + use std::collections::HashMap; + + fn make_agg_config(id: u64, metric: &str) -> AggregationConfig { + AggregationConfig::new( + id, + AggregationType::SingleSubpopulation, + "Sum".to_string(), + HashMap::new(), + KeyByLabelNames::new(vec![]), + KeyByLabelNames::new(vec![]), + KeyByLabelNames::new(vec![]), + String::new(), + 10, + 0, + WindowType::Tumbling, + metric.to_string(), + metric.to_string(), + None, + None, + None, + None, + ) + } + + /// Verify that `update_streaming_config` makes newly added aggregation IDs visible + /// to the store. Inserts for an unknown agg_id are silently dropped; after a config + /// update that adds the agg_id, inserts are accepted and queryable. + /// + /// Tested for both the Global and PerKey variants. + #[test] + fn test_update_streaming_config_accepts_new_agg_insert() { + for strategy in [LockStrategy::Global, LockStrategy::PerKey] { + let store = SimpleMapStore::new_with_strategy( + Arc::new(StreamingConfig::new(HashMap::new())), + CleanupPolicy::NoCleanup, + strategy, + ); + + // Insert for unknown agg_id=1 — should be silently dropped. + store + .insert_precomputed_output( + PrecomputedOutput::new(0, 10_000, None, 1), + Box::new(SumAccumulator::with_sum(99.0)), + ) + .unwrap(); + let result = store.query_precomputed_output("cpu", 1, 0, 20_000).unwrap(); + assert!( + result.is_empty(), + "insert for unknown agg_id should be silently dropped ({strategy:?})" + ); + + // Update streaming config to include agg_id=1. + let mut agg_configs = HashMap::new(); + agg_configs.insert(1, make_agg_config(1, "cpu")); + store.update_streaming_config(StreamingConfig::new(agg_configs)); + + // Insert again — now accepted. + store + .insert_precomputed_output( + PrecomputedOutput::new(0, 10_000, None, 1), + Box::new(SumAccumulator::with_sum(42.0)), + ) + .unwrap(); + let result = store.query_precomputed_output("cpu", 1, 0, 20_000).unwrap(); + assert!( + !result.is_empty(), + "insert after config update should be accepted ({strategy:?})" + ); + } + } +} diff --git a/asap-query-engine/src/stores/simple_map_store/per_key.rs b/asap-query-engine/src/stores/simple_map_store/per_key.rs index 728a5aab..5ea5abd3 100644 --- a/asap-query-engine/src/stores/simple_map_store/per_key.rs +++ b/asap-query-engine/src/stores/simple_map_store/per_key.rs @@ -166,8 +166,12 @@ pub struct SimpleMapStorePerKey { metrics: DashMap, // HashSet equivalent items_inserted: DashMap, - // Store the streaming configuration - streaming_config: Arc, + // Store the streaming configuration. + // RwLock>: readers briefly lock to clone the Arc pointer, + // then use the Arc without holding the lock. Writers (applier task) swap the Arc. + // NOTE: stale precomputed data for removed aggregations expires naturally via + // the existing cleanup policy; no active purge is performed on update. + streaming_config: RwLock>, // Policy for cleaning up old aggregates cleanup_policy: CleanupPolicy, @@ -180,11 +184,18 @@ impl SimpleMapStorePerKey { earliest_timestamps: DashMap::new(), metrics: DashMap::new(), items_inserted: DashMap::new(), - streaming_config, + streaming_config: RwLock::new(streaming_config), cleanup_policy, } } + /// Replace the streaming config at runtime. Called by the applier task after + /// the planner fires. Stale precomputed data for dropped aggregations expires + /// naturally via the existing cleanup policy. + pub fn update_streaming_config(&self, new_config: StreamingConfig) { + *self.streaming_config.write().unwrap() = Arc::new(new_config); + } + /// Collect diagnostic info about store contents. pub fn diagnostic_info(&self) -> super::StoreDiagnostics { use super::{AggregationDiagnostic, StoreDiagnostics}; @@ -348,8 +359,8 @@ impl SimpleMapStorePerKey { } // Get aggregation config once for cleanup settings - let aggregation_config = self - .streaming_config + let sc = self.streaming_config.read().unwrap().clone(); + let aggregation_config = sc .get_aggregation_config(aggregation_id) .ok_or_else(|| format!("Aggregation config not found for {}", aggregation_id))?; @@ -425,10 +436,9 @@ impl Store for SimpleMapStorePerKey { (String, Vec<(PrecomputedOutput, Box)>), > = HashMap::new(); + let sc = self.streaming_config.read().unwrap().clone(); for (output, precompute) in outputs { - let aggregation_config = self - .streaming_config - .get_aggregation_config(output.aggregation_id); + let aggregation_config = sc.get_aggregation_config(output.aggregation_id); if aggregation_config.is_none() { error!(