diff --git a/asap-query-engine/src/bin/bench_precompute_sketch.rs b/asap-query-engine/src/bin/bench_precompute_sketch.rs index 16f2e668..acc1bb32 100644 --- a/asap-query-engine/src/bin/bench_precompute_sketch.rs +++ b/asap-query-engine/src/bin/bench_precompute_sketch.rs @@ -10,7 +10,9 @@ use query_engine_rust::drivers::ingest::prometheus_remote_write::{ }; use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; use query_engine_rust::precompute_engine::output_sink::OutputSink; -use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::precompute_engine::{ + HttpIngestConfig, HttpIngestSource, IngestSource, PrecomputeEngine, +}; use query_engine_rust::stores::{SimpleMapStore, Store}; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; @@ -166,7 +168,6 @@ async fn start_engine( ) { let config = PrecomputeEngineConfig { num_workers: workers, - ingest_port: port, allowed_lateness_ms: 5_000, max_buffer_per_series: 100_000, flush_interval_ms: 100, @@ -175,7 +176,9 @@ async fn start_engine( raw_mode_aggregation_id: 0, late_data_policy: LateDataPolicy::Drop, }; - let engine = PrecomputeEngine::new(config, streaming_config, sink); + let sources: Vec> = + vec![Box::new(HttpIngestSource::new(HttpIngestConfig { port }))]; + let engine = PrecomputeEngine::new(config, streaming_config, sink, sources); tokio::spawn(async move { if let Err(err) = engine.run().await { eprintln!("precompute engine on port {port} failed: {err}"); diff --git a/asap-query-engine/src/bin/e2e_quickstart_resource_test.rs b/asap-query-engine/src/bin/e2e_quickstart_resource_test.rs index fb1e7904..d926c9ab 100644 --- a/asap-query-engine/src/bin/e2e_quickstart_resource_test.rs +++ b/asap-query-engine/src/bin/e2e_quickstart_resource_test.rs @@ -17,7 +17,9 @@ use query_engine_rust::drivers::ingest::prometheus_remote_write::{ }; use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; use query_engine_rust::precompute_engine::output_sink::StoreOutputSink; -use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::precompute_engine::{ + HttpIngestConfig, HttpIngestSource, IngestSource, PrecomputeEngine, +}; use query_engine_rust::stores::{SimpleMapStore, Store}; use std::collections::HashMap; use std::sync::Arc; @@ -225,7 +227,6 @@ async fn main() -> Result<(), Box> { let engine_config = PrecomputeEngineConfig { num_workers: NUM_WORKERS, - ingest_port: INGEST_PORT, allowed_lateness_ms: 5_000, max_buffer_per_series: 10_000, flush_interval_ms: 1_000, @@ -235,7 +236,11 @@ async fn main() -> Result<(), Box> { late_data_policy: LateDataPolicy::Drop, }; let output_sink = Arc::new(StoreOutputSink::new(store.clone())); - let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink); + let sources: Vec> = + vec![Box::new(HttpIngestSource::new(HttpIngestConfig { + port: INGEST_PORT, + }))]; + let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink, sources); tokio::spawn(async move { if let Err(e) = engine.run().await { eprintln!("Precompute engine error: {e}"); diff --git a/asap-query-engine/src/bin/precompute_engine.rs b/asap-query-engine/src/bin/precompute_engine.rs index 3cf2459d..4e2befb2 100644 --- a/asap-query-engine/src/bin/precompute_engine.rs +++ b/asap-query-engine/src/bin/precompute_engine.rs @@ -7,7 +7,7 @@ use query_engine_rust::drivers::query::adapters::AdapterConfig; use query_engine_rust::engines::SimpleEngine; use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink}; -use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::precompute_engine::{HttpIngestConfig, HttpIngestSource, PrecomputeEngine}; use query_engine_rust::stores::SimpleMapStore; use query_engine_rust::{HttpServer, HttpServerConfig}; use std::sync::Arc; @@ -128,7 +128,6 @@ async fn main() -> Result<(), Box> { // Build the precompute engine config let engine_config = PrecomputeEngineConfig { num_workers: args.num_workers, - ingest_port: args.ingest_port, allowed_lateness_ms: args.allowed_lateness_ms, max_buffer_per_series: args.max_buffer_per_series, flush_interval_ms: args.flush_interval_ms, @@ -146,8 +145,13 @@ async fn main() -> Result<(), Box> { Arc::new(StoreOutputSink::new(store)) }; + let sources: Vec> = + vec![Box::new(HttpIngestSource::new(HttpIngestConfig { + port: args.ingest_port, + }))]; + // Build and run the engine - let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink); + let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink, sources); info!("Starting precompute engine..."); engine.run().await?; diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index de5006db..2fc781fc 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -21,7 +21,9 @@ use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEng use query_engine_rust::precompute_engine::output_sink::{ NoopOutputSink, RawPassthroughSink, StoreOutputSink, }; -use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::precompute_engine::{ + HttpIngestConfig, HttpIngestSource, IngestSource, PrecomputeEngine, +}; use query_engine_rust::stores::SimpleMapStore; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::{HttpServer, HttpServerConfig}; @@ -142,7 +144,6 @@ async fn main() -> Result<(), Box> { // Start precompute engine let engine_config = PrecomputeEngineConfig { num_workers: 2, - ingest_port: INGEST_PORT, allowed_lateness_ms: 5000, max_buffer_per_series: 10000, flush_interval_ms: 200, @@ -152,7 +153,16 @@ async fn main() -> Result<(), Box> { late_data_policy: LateDataPolicy::Drop, }; let output_sink = Arc::new(StoreOutputSink::new(store.clone())); - let engine = PrecomputeEngine::new(engine_config, streaming_config.clone(), output_sink); + let sources: Vec> = + vec![Box::new(HttpIngestSource::new(HttpIngestConfig { + port: INGEST_PORT, + }))]; + let engine = PrecomputeEngine::new( + engine_config, + streaming_config.clone(), + output_sink, + sources, + ); tokio::spawn(async move { if let Err(e) = engine.run().await { eprintln!("Precompute engine error: {e}"); @@ -282,7 +292,6 @@ async fn main() -> Result<(), Box> { let raw_agg_id: u64 = 1; let raw_engine_config = PrecomputeEngineConfig { num_workers: 4, - ingest_port: RAW_INGEST_PORT, allowed_lateness_ms: 5000, max_buffer_per_series: 10000, flush_interval_ms: 200, @@ -292,7 +301,16 @@ async fn main() -> Result<(), Box> { late_data_policy: LateDataPolicy::Drop, }; let raw_sink = Arc::new(RawPassthroughSink::new(store.clone())); - let raw_engine = PrecomputeEngine::new(raw_engine_config, streaming_config.clone(), raw_sink); + let raw_sources: Vec> = + vec![Box::new(HttpIngestSource::new(HttpIngestConfig { + port: RAW_INGEST_PORT, + }))]; + let raw_engine = PrecomputeEngine::new( + raw_engine_config, + streaming_config.clone(), + raw_sink, + raw_sources, + ); tokio::spawn(async move { if let Err(e) = raw_engine.run().await { eprintln!("Raw precompute engine error: {e}"); @@ -630,7 +648,6 @@ async fn run_single_bench( let noop_sink = Arc::new(NoopOutputSink::new()); let engine_config = PrecomputeEngineConfig { num_workers, - ingest_port: port, allowed_lateness_ms: 5000, max_buffer_per_series: 100_000, flush_interval_ms: 100, @@ -639,7 +656,9 @@ async fn run_single_bench( raw_mode_aggregation_id: 0, late_data_policy: LateDataPolicy::Drop, }; - let engine = PrecomputeEngine::new(engine_config, streaming_config, noop_sink.clone()); + let sources: Vec> = + vec![Box::new(HttpIngestSource::new(HttpIngestConfig { port }))]; + let engine = PrecomputeEngine::new(engine_config, streaming_config, noop_sink.clone(), sources); tokio::spawn(async move { if let Err(e) = engine.run().await { eprintln!("Bench engine error: {e}"); diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index 3a8c8a51..b0d40dd2 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -47,7 +47,10 @@ pub use drivers::{ pub use precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; pub use precompute_engine::output_sink::StoreOutputSink; -pub use precompute_engine::{PrecomputeEngine, PrecomputeEngineHandle}; +pub use precompute_engine::{ + HttpIngestConfig, HttpIngestSource, IngestContext, IngestSource, PrecomputeEngine, + PrecomputeEngineHandle, +}; pub use query_tracker::{QueryTracker, QueryTrackerConfig}; diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index 8f84c653..87463b1a 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -17,9 +17,10 @@ use query_engine_rust::precompute_engine::PrecomputeWorkerDiagnostics; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::InferenceConfig; use query_engine_rust::{ - HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpReceiver, - OtlpReceiverConfig, PrecomputeEngine, PrecomputeEngineConfig, PrecomputeEngineHandle, Result, - SimpleEngine, SimpleMapStore, StoreOutputSink, + HttpIngestConfig, HttpIngestSource, HttpServer, HttpServerConfig, IngestSource, KafkaConsumer, + KafkaConsumerConfig, OtlpReceiver, OtlpReceiverConfig, PrecomputeEngine, + PrecomputeEngineConfig, PrecomputeEngineHandle, Result, SimpleEngine, SimpleMapStore, + StoreOutputSink, }; #[derive(Parser, Debug)] @@ -342,7 +343,6 @@ async fn main() -> Result<()> { let precompute_handle = if enable_precompute { let precompute_config = PrecomputeEngineConfig { num_workers: args.precompute_num_workers, - ingest_port: args.prometheus_remote_write_port, allowed_lateness_ms: args.precompute_allowed_lateness_ms, max_buffer_per_series: args.precompute_max_buffer_per_series, flush_interval_ms: args.precompute_flush_interval_ms, @@ -352,7 +352,16 @@ async fn main() -> Result<()> { late_data_policy: LateDataPolicy::Drop, }; let output_sink = Arc::new(StoreOutputSink::new(store.clone())); - let pe = PrecomputeEngine::new(precompute_config, streaming_config.clone(), output_sink); + let sources: Vec> = + vec![Box::new(HttpIngestSource::new(HttpIngestConfig { + port: args.prometheus_remote_write_port, + }))]; + let pe = PrecomputeEngine::new( + precompute_config, + streaming_config.clone(), + output_sink, + sources, + ); let worker_diagnostics = pe.diagnostics(); // Extract the handle before run() consumes the engine. pe_engine_handle = Some(pe.handle()); diff --git a/asap-query-engine/src/precompute_engine/config.rs b/asap-query-engine/src/precompute_engine/config.rs index c509ae59..656f8edb 100644 --- a/asap-query-engine/src/precompute_engine/config.rs +++ b/asap-query-engine/src/precompute_engine/config.rs @@ -14,8 +14,6 @@ pub enum LateDataPolicy { pub struct PrecomputeEngineConfig { /// Number of worker threads for parallel processing. pub num_workers: usize, - /// Port for the Prometheus remote write ingest endpoint. - pub ingest_port: u16, /// Maximum allowed lateness for out-of-order samples (milliseconds). /// Samples arriving later than this behind the watermark are dropped. pub allowed_lateness_ms: i64, @@ -38,7 +36,6 @@ impl Default for PrecomputeEngineConfig { fn default() -> Self { Self { num_workers: 4, - ingest_port: 9090, allowed_lateness_ms: 5_000, max_buffer_per_series: 10_000, flush_interval_ms: 1_000, @@ -58,7 +55,6 @@ mod tests { fn test_default_config() { let config = PrecomputeEngineConfig::default(); assert_eq!(config.num_workers, 4); - assert_eq!(config.ingest_port, 9090); assert_eq!(config.allowed_lateness_ms, 5_000); assert_eq!(config.max_buffer_per_series, 10_000); assert_eq!(config.flush_interval_ms, 1_000); diff --git a/asap-query-engine/src/precompute_engine/engine.rs b/asap-query-engine/src/precompute_engine/engine.rs index fad7885f..42fae769 100644 --- a/asap-query-engine/src/precompute_engine/engine.rs +++ b/asap-query-engine/src/precompute_engine/engine.rs @@ -1,18 +1,14 @@ use crate::data_model::StreamingConfig; use crate::precompute_engine::config::PrecomputeEngineConfig; -use crate::precompute_engine::ingest_handler::{ - handle_prometheus_ingest, handle_victoriametrics_ingest, IngestState, -}; +use crate::precompute_engine::ingest_source::{IngestContext, IngestSource}; use crate::precompute_engine::output_sink::OutputSink; use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; use crate::precompute_engine::worker::{Worker, WorkerRuntimeConfig}; use arc_swap::ArcSwap; use asap_types::aggregation_config::AggregationConfig; -use axum::{routing::post, Router}; use std::collections::HashMap; use std::sync::atomic::{AtomicI64, AtomicUsize}; use std::sync::Arc; -use tokio::net::TcpListener; use tokio::sync::mpsc; use tracing::{info, warn}; @@ -64,7 +60,7 @@ impl PrecomputeEngineHandle { /// The top-level precompute engine orchestrator. /// -/// Creates worker threads, the series router, and the Axum ingest server. +/// Creates worker threads and drives all registered ingest sources. /// Call `handle()` before `run()` to obtain a `PrecomputeEngineHandle` for /// applying runtime config updates while the engine is running. pub struct PrecomputeEngine { @@ -72,6 +68,7 @@ pub struct PrecomputeEngine { streaming_config: Arc, output_sink: Arc, diagnostics: Arc, + sources: Vec>, /// Channels created at construction so handle() can be extracted before run(). senders: Vec>, receivers: Option>>, @@ -84,6 +81,7 @@ impl PrecomputeEngine { config: PrecomputeEngineConfig, streaming_config: Arc, output_sink: Arc, + sources: Vec>, ) -> Self { let worker_group_counts = (0..config.num_workers) .map(|_| Arc::new(AtomicUsize::new(0))) @@ -96,8 +94,6 @@ impl PrecomputeEngine { worker_watermarks, }); - // Build channels and initial agg_configs at construction time so that - // handle() can be called before run(). let channel_size = config.channel_buffer_size; let mut senders = Vec::with_capacity(config.num_workers); let mut receivers = Vec::with_capacity(config.num_workers); @@ -119,6 +115,7 @@ impl PrecomputeEngine { streaming_config, output_sink, diagnostics, + sources, senders, receivers: Some(receivers), ingest_agg_configs, @@ -139,8 +136,8 @@ impl PrecomputeEngine { } } - /// Start the precompute engine. This spawns worker tasks and the HTTP - /// ingest server, then blocks until shutdown. + /// Start the precompute engine. This spawns worker tasks and all registered + /// ingest sources, then blocks until shutdown. pub async fn run(mut self) -> Result<(), Box> { let num_workers = self.config.num_workers; @@ -185,48 +182,49 @@ impl PrecomputeEngine { worker_handles.push(handle); } - info!( - "PrecomputeEngine started with {} workers on port {}", - num_workers, self.config.ingest_port - ); + info!("PrecomputeEngine started with {} workers", num_workers); - // Build the ingest state, sharing the same Arc as the handle so - // that PrecomputeEngineHandle::update_streaming_config swaps are visible here. - let ingest_state = Arc::new(IngestState { - router, - samples_ingested: std::sync::atomic::AtomicU64::new(0), + // Build the ingest context shared by all sources. + // The ArcSwap pointer is the same one held by PrecomputeEngineHandle, so + // handle.update_streaming_config() is immediately visible to every source. + let ctx = IngestContext { + router: router.clone(), agg_configs: self.ingest_agg_configs.clone(), pass_raw_samples: self.config.pass_raw_samples, - }); + }; - // Start flush timer - let flush_state = ingest_state.clone(); + // Flush timer: periodically signal workers to close idle windows. + let flush_router = router.clone(); let flush_interval_ms = self.config.flush_interval_ms; tokio::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(flush_interval_ms)); loop { interval.tick().await; - if let Err(e) = flush_state.router.broadcast_flush().await { + if let Err(e) = flush_router.broadcast_flush().await { warn!("Flush broadcast error: {}", e); break; } } }); - // Start the Axum HTTP server for ingest (Prometheus + VictoriaMetrics) - let app = Router::new() - .route("/api/v1/write", post(handle_prometheus_ingest)) - .route("/api/v1/import", post(handle_victoriametrics_ingest)) - .with_state(ingest_state); - - let addr = format!("0.0.0.0:{}", self.config.ingest_port); - info!("Ingest server listening on {}", addr); + // Spawn each ingest source. + let mut source_handles = Vec::with_capacity(self.sources.len()); + for source in self.sources { + let ctx = ctx.clone(); + let handle = tokio::spawn(async move { + if let Err(e) = source.run(ctx).await { + warn!("Ingest source error: {}", e); + } + }); + source_handles.push(handle); + } - let listener = TcpListener::bind(&addr).await?; - axum::serve(listener, app).await?; + // Block until all sources finish (normally only on shutdown). + for handle in source_handles { + let _ = handle.await; + } - // Wait for workers to finish (this only happens on shutdown) for handle in worker_handles { let _ = handle.await; } diff --git a/asap-query-engine/src/precompute_engine/ingest_handler.rs b/asap-query-engine/src/precompute_engine/ingest_handler.rs index dd594790..3995763c 100644 --- a/asap-query-engine/src/precompute_engine/ingest_handler.rs +++ b/asap-query-engine/src/precompute_engine/ingest_handler.rs @@ -1,141 +1,60 @@ use crate::drivers::ingest::prometheus_remote_write::decode_prometheus_remote_write; use crate::drivers::ingest::victoriametrics_remote_write::decode_victoriametrics_remote_write; -use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; -use crate::precompute_engine::worker::{extract_metric_name, parse_labels_from_series_key}; -use arc_swap::ArcSwap; -use asap_types::aggregation_config::AggregationConfig; -use axum::{body::Bytes, extract::State, http::StatusCode}; -use std::collections::HashMap; +use crate::precompute_engine::ingest_source::{route_decoded_samples, IngestContext, IngestSource}; +use axum::{body::Bytes, extract::State, http::StatusCode, routing::post, Router}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; -use tracing::warn; +use tokio::net::TcpListener; +use tracing::{info, warn}; -/// Shared state for the ingest HTTP handler. -pub(crate) struct IngestState { - pub(crate) router: SeriesRouter, - pub(crate) samples_ingested: std::sync::atomic::AtomicU64, - /// Aggregation configs for group-key extraction. - /// Wrapped in Arc so the same ArcSwap is shared with PrecomputeEngineHandle. - /// The handle calls ArcSwap::store() to push a new Vec; this state sees it - /// immediately via the shared Arc pointer (lock-free on the read path). - pub(crate) agg_configs: Arc>>>, - /// When true, skip group-key extraction and pass raw samples through. - pub(crate) pass_raw_samples: bool, +pub struct HttpIngestConfig { + pub port: u16, } -/// Extract the group key (grouping label values joined by semicolons) -/// for a given series key and aggregation config. -fn extract_group_key(series_key: &str, config: &AggregationConfig) -> String { - let labels = parse_labels_from_series_key(series_key); - let mut values = Vec::new(); - for label_name in &config.grouping_labels.labels { - if let Some(val) = labels.get(label_name.as_str()) { - values.push(*val); - } else { - values.push(""); - } - } - values.join(";") +pub struct HttpIngestSource { + config: HttpIngestConfig, } -/// Shared logic: group decoded samples by (agg_id, group_key) and route to workers. -async fn route_decoded_samples( - state: &IngestState, - samples: Vec, - ingest_received_at: Instant, -) -> StatusCode { - if samples.is_empty() { - return StatusCode::NO_CONTENT; +impl HttpIngestSource { + pub fn new(config: HttpIngestConfig) -> Self { + Self { config } } +} - let count = samples.len() as u64; - state - .samples_ingested - .fetch_add(count, std::sync::atomic::Ordering::Relaxed); +#[async_trait::async_trait] +impl IngestSource for HttpIngestSource { + async fn run( + self: Box, + ctx: IngestContext, + ) -> Result<(), Box> { + let state = Arc::new(HttpIngestState { + ctx, + samples_ingested: AtomicU64::new(0), + }); - if state.pass_raw_samples { - // Raw mode: group by series key and send as RawSamples - let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new(); - for s in &samples { - by_series - .entry(&s.labels) - .or_default() - .push((s.timestamp_ms, s.value)); - } - let messages: Vec = by_series - .into_iter() - .map(|(k, v)| WorkerMessage::RawSamples { - series_key: k.to_string(), - samples: v, - ingest_received_at, - }) - .collect(); + let app = Router::new() + .route("/api/v1/write", post(handle_prometheus_ingest)) + .route("/api/v1/import", post(handle_victoriametrics_ingest)) + .with_state(state); - if let Err(e) = state - .router - .route_group_batch(messages, ingest_received_at) - .await - { - warn!("Batch routing error: {}", e); - return StatusCode::INTERNAL_SERVER_ERROR; - } - return StatusCode::NO_CONTENT; - } - - // Group-by mode: for each sample, find matching agg configs and group by - // (agg_id, group_key). This is the equivalent of Arroyo's GROUP BY. - // - // Key: (agg_id, group_key) → Vec<(series_key, timestamp_ms, value)> - type GroupKey = (u64, String); - type SampleTuple = (String, i64, f64); - let mut by_group: HashMap> = HashMap::new(); + let addr = format!("0.0.0.0:{}", self.config.port); + info!("HTTP ingest server listening on {}", addr); - // Load agg_configs once per request (lock-free ArcSwap read). - let agg_configs = state.agg_configs.load(); - for s in &samples { - let metric_name = extract_metric_name(&s.labels); - for config in agg_configs.iter() { - if config.metric != metric_name - && config.spatial_filter_normalized != metric_name - && config.spatial_filter != metric_name - { - continue; - } - let group_key = extract_group_key(&s.labels, config); - by_group - .entry((config.aggregation_id, group_key)) - .or_default() - .push((s.labels.clone(), s.timestamp_ms, s.value)); - } - } - - let messages: Vec = by_group - .into_iter() - .map( - |((agg_id, group_key), samples)| WorkerMessage::GroupSamples { - agg_id, - group_key, - samples, - ingest_received_at, - }, - ) - .collect(); - - if let Err(e) = state - .router - .route_group_batch(messages, ingest_received_at) - .await - { - warn!("Batch routing error: {}", e); - return StatusCode::INTERNAL_SERVER_ERROR; + let listener = TcpListener::bind(&addr).await?; + axum::serve(listener, app).await?; + Ok(()) } +} - StatusCode::NO_CONTENT +/// Shared state for the Axum ingest handlers. +struct HttpIngestState { + ctx: IngestContext, + samples_ingested: AtomicU64, } -/// Axum handler for Prometheus remote write (Snappy + Protobuf). -pub(crate) async fn handle_prometheus_ingest( - State(state): State>, +async fn handle_prometheus_ingest( + State(state): State>, body: Bytes, ) -> StatusCode { let ingest_received_at = Instant::now(); @@ -146,12 +65,20 @@ pub(crate) async fn handle_prometheus_ingest( return StatusCode::BAD_REQUEST; } }; - route_decoded_samples(&state, samples, ingest_received_at).await + state + .samples_ingested + .fetch_add(samples.len() as u64, Ordering::Relaxed); + match route_decoded_samples(&state.ctx, samples, ingest_received_at).await { + Ok(()) => StatusCode::NO_CONTENT, + Err(e) => { + warn!("Routing error: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + } + } } -/// Axum handler for VictoriaMetrics remote write (Zstd + Protobuf). -pub(crate) async fn handle_victoriametrics_ingest( - State(state): State>, +async fn handle_victoriametrics_ingest( + State(state): State>, body: Bytes, ) -> StatusCode { let ingest_received_at = Instant::now(); @@ -162,5 +89,14 @@ pub(crate) async fn handle_victoriametrics_ingest( return StatusCode::BAD_REQUEST; } }; - route_decoded_samples(&state, samples, ingest_received_at).await + state + .samples_ingested + .fetch_add(samples.len() as u64, Ordering::Relaxed); + match route_decoded_samples(&state.ctx, samples, ingest_received_at).await { + Ok(()) => StatusCode::NO_CONTENT, + Err(e) => { + warn!("Routing error: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + } + } } diff --git a/asap-query-engine/src/precompute_engine/ingest_source.rs b/asap-query-engine/src/precompute_engine/ingest_source.rs new file mode 100644 index 00000000..8e155489 --- /dev/null +++ b/asap-query-engine/src/precompute_engine/ingest_source.rs @@ -0,0 +1,125 @@ +use crate::drivers::ingest::prometheus_remote_write::DecodedSample; +use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage}; +use crate::precompute_engine::worker::{extract_metric_name, parse_labels_from_series_key}; +use arc_swap::ArcSwap; +use asap_types::aggregation_config::AggregationConfig; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + +/// Everything a source needs to push decoded samples into the worker pool. +#[derive(Clone)] +pub struct IngestContext { + pub(crate) router: SeriesRouter, + /// Aggregation configs for group-key extraction. + /// Wrapped in Arc so the same ArcSwap is shared with PrecomputeEngineHandle. + /// The handle calls ArcSwap::store() to push a new Vec; this context sees it + /// immediately via the shared Arc pointer (lock-free on the read path). + pub(crate) agg_configs: Arc>>>, + /// When true, skip group-key extraction and pass raw samples through. + pub(crate) pass_raw_samples: bool, +} + +/// An ingest source for the precompute engine. +/// +/// Implementors decode incoming data (HTTP, Kafka, file, etc.) and push it +/// into the engine via [`route_decoded_samples`]. +#[async_trait::async_trait] +pub trait IngestSource: Send + Sync { + async fn run( + self: Box, + ctx: IngestContext, + ) -> Result<(), Box>; +} + +pub(crate) fn extract_group_key(series_key: &str, config: &AggregationConfig) -> String { + let labels = parse_labels_from_series_key(series_key); + let mut values = Vec::new(); + for label_name in &config.grouping_labels.labels { + if let Some(val) = labels.get(label_name.as_str()) { + values.push(*val); + } else { + values.push(""); + } + } + values.join(";") +} + +/// Group decoded samples by (agg_id, group_key) and route them to workers. +/// +/// Returns an error if the router fails to deliver any message. +pub(crate) async fn route_decoded_samples( + ctx: &IngestContext, + samples: Vec, + ingest_received_at: Instant, +) -> Result<(), Box> { + if samples.is_empty() { + return Ok(()); + } + + if ctx.pass_raw_samples { + let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new(); + for s in &samples { + by_series + .entry(&s.labels) + .or_default() + .push((s.timestamp_ms, s.value)); + } + let messages: Vec = by_series + .into_iter() + .map(|(k, v)| WorkerMessage::RawSamples { + series_key: k.to_string(), + samples: v, + ingest_received_at, + }) + .collect(); + ctx.router + .route_group_batch(messages, ingest_received_at) + .await?; + return Ok(()); + } + + // Group-by mode: for each sample, find matching agg configs and group by + // (agg_id, group_key). This is the equivalent of Arroyo's GROUP BY. + // + // Key: (agg_id, group_key) → Vec<(series_key, timestamp_ms, value)> + type GroupKey = (u64, String); + type SampleTuple = (String, i64, f64); + let mut by_group: HashMap> = HashMap::new(); + + // Load agg_configs once per request (lock-free ArcSwap read). + let agg_configs = ctx.agg_configs.load(); + for s in &samples { + let metric_name = extract_metric_name(&s.labels); + for config in agg_configs.iter() { + if config.metric != metric_name + && config.spatial_filter_normalized != metric_name + && config.spatial_filter != metric_name + { + continue; + } + let group_key = extract_group_key(&s.labels, config); + by_group + .entry((config.aggregation_id, group_key)) + .or_default() + .push((s.labels.clone(), s.timestamp_ms, s.value)); + } + } + + let messages: Vec = by_group + .into_iter() + .map( + |((agg_id, group_key), samples)| WorkerMessage::GroupSamples { + agg_id, + group_key, + samples, + ingest_received_at, + }, + ) + .collect(); + + ctx.router + .route_group_batch(messages, ingest_received_at) + .await?; + Ok(()) +} diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index dba95dd5..5202c418 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -2,6 +2,7 @@ pub mod accumulator_factory; pub mod config; mod engine; mod ingest_handler; +pub mod ingest_source; pub mod output_sink; pub mod series_buffer; pub mod series_router; @@ -9,3 +10,5 @@ pub mod window_manager; pub mod worker; pub use engine::{PrecomputeEngine, PrecomputeEngineHandle, PrecomputeWorkerDiagnostics}; +pub use ingest_handler::{HttpIngestConfig, HttpIngestSource}; +pub use ingest_source::{IngestContext, IngestSource}; diff --git a/asap-query-engine/src/precompute_engine/series_router.rs b/asap-query-engine/src/precompute_engine/series_router.rs index 012a1cbb..33b92e28 100644 --- a/asap-query-engine/src/precompute_engine/series_router.rs +++ b/asap-query-engine/src/precompute_engine/series_router.rs @@ -41,6 +41,7 @@ pub enum WorkerMessage { } /// Routes incoming samples to one of N workers based on a consistent hash. +#[derive(Clone)] pub struct SeriesRouter { senders: Vec>, num_workers: usize, diff --git a/asap-query-engine/tests/e2e_precompute_equivalence.rs b/asap-query-engine/tests/e2e_precompute_equivalence.rs index a1ecee19..804884c6 100644 --- a/asap-query-engine/tests/e2e_precompute_equivalence.rs +++ b/asap-query-engine/tests/e2e_precompute_equivalence.rs @@ -23,7 +23,9 @@ use query_engine_rust::drivers::ingest::prometheus_remote_write::{ }; use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; use query_engine_rust::precompute_engine::output_sink::CapturingOutputSink; -use query_engine_rust::precompute_engine::PrecomputeEngine; +use query_engine_rust::precompute_engine::{ + HttpIngestConfig, HttpIngestSource, IngestSource, PrecomputeEngine, +}; use query_engine_rust::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator; use query_engine_rust::precompute_operators::multiple_sum_accumulator::MultipleSumAccumulator; @@ -141,10 +143,9 @@ async fn send_remote_write(client: &reqwest::Client, port: u16, timeseries: Vec< ); } -fn engine_config(port: u16) -> PrecomputeEngineConfig { +fn engine_config() -> PrecomputeEngineConfig { PrecomputeEngineConfig { num_workers: 2, - ingest_port: port, allowed_lateness_ms: 0, max_buffer_per_series: 10_000, flush_interval_ms: 100, @@ -191,7 +192,9 @@ async fn e2e_kll_output_matches_arroyo() { let streaming_config = Arc::new(StreamingConfig::new(agg_map.clone())); let sink = Arc::new(CapturingOutputSink::new()); - let engine = PrecomputeEngine::new(engine_config(port), streaming_config, sink.clone()); + let sources: Vec> = + vec![Box::new(HttpIngestSource::new(HttpIngestConfig { port }))]; + let engine = PrecomputeEngine::new(engine_config(), streaming_config, sink.clone(), sources); tokio::spawn(async move { let _ = engine.run().await; }); @@ -305,7 +308,9 @@ async fn e2e_multiple_sum_output_matches_arroyo() { let streaming_config = Arc::new(StreamingConfig::new(agg_map.clone())); let sink = Arc::new(CapturingOutputSink::new()); - let engine = PrecomputeEngine::new(engine_config(port), streaming_config, sink.clone()); + let sources: Vec> = + vec![Box::new(HttpIngestSource::new(HttpIngestConfig { port }))]; + let engine = PrecomputeEngine::new(engine_config(), streaming_config, sink.clone(), sources); tokio::spawn(async move { let _ = engine.run().await; });