Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions asap-query-engine/src/bin/bench_precompute_sketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use query_engine_rust::drivers::ingest::prometheus_remote_write::{
};
use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig};
use query_engine_rust::precompute_engine::output_sink::OutputSink;
use query_engine_rust::precompute_engine::PrecomputeEngine;
use query_engine_rust::precompute_engine::{
HttpIngestConfig, HttpIngestSource, IngestSource, PrecomputeEngine,
};
use query_engine_rust::stores::{SimpleMapStore, Store};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
Expand Down Expand Up @@ -166,7 +168,6 @@ async fn start_engine(
) {
let config = PrecomputeEngineConfig {
num_workers: workers,
ingest_port: port,
allowed_lateness_ms: 5_000,
max_buffer_per_series: 100_000,
flush_interval_ms: 100,
Expand All @@ -175,7 +176,9 @@ async fn start_engine(
raw_mode_aggregation_id: 0,
late_data_policy: LateDataPolicy::Drop,
};
let engine = PrecomputeEngine::new(config, streaming_config, sink);
let sources: Vec<Box<dyn IngestSource>> =
vec![Box::new(HttpIngestSource::new(HttpIngestConfig { port }))];
let engine = PrecomputeEngine::new(config, streaming_config, sink, sources);
tokio::spawn(async move {
if let Err(err) = engine.run().await {
eprintln!("precompute engine on port {port} failed: {err}");
Expand Down
11 changes: 8 additions & 3 deletions asap-query-engine/src/bin/e2e_quickstart_resource_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use query_engine_rust::drivers::ingest::prometheus_remote_write::{
};
use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig};
use query_engine_rust::precompute_engine::output_sink::StoreOutputSink;
use query_engine_rust::precompute_engine::PrecomputeEngine;
use query_engine_rust::precompute_engine::{
HttpIngestConfig, HttpIngestSource, IngestSource, PrecomputeEngine,
};
use query_engine_rust::stores::{SimpleMapStore, Store};
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -225,7 +227,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

let engine_config = PrecomputeEngineConfig {
num_workers: NUM_WORKERS,
ingest_port: INGEST_PORT,
allowed_lateness_ms: 5_000,
max_buffer_per_series: 10_000,
flush_interval_ms: 1_000,
Expand All @@ -235,7 +236,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
late_data_policy: LateDataPolicy::Drop,
};
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink);
let sources: Vec<Box<dyn IngestSource>> =
vec![Box::new(HttpIngestSource::new(HttpIngestConfig {
port: INGEST_PORT,
}))];
let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink, sources);
tokio::spawn(async move {
if let Err(e) = engine.run().await {
eprintln!("Precompute engine error: {e}");
Expand Down
10 changes: 7 additions & 3 deletions asap-query-engine/src/bin/precompute_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use query_engine_rust::drivers::query::adapters::AdapterConfig;
use query_engine_rust::engines::SimpleEngine;
use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig};
use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink};
use query_engine_rust::precompute_engine::PrecomputeEngine;
use query_engine_rust::precompute_engine::{HttpIngestConfig, HttpIngestSource, PrecomputeEngine};
use query_engine_rust::stores::SimpleMapStore;
use query_engine_rust::{HttpServer, HttpServerConfig};
use std::sync::Arc;
Expand Down Expand Up @@ -128,7 +128,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Build the precompute engine config
let engine_config = PrecomputeEngineConfig {
num_workers: args.num_workers,
ingest_port: args.ingest_port,
allowed_lateness_ms: args.allowed_lateness_ms,
max_buffer_per_series: args.max_buffer_per_series,
flush_interval_ms: args.flush_interval_ms,
Expand All @@ -146,8 +145,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Arc::new(StoreOutputSink::new(store))
};

let sources: Vec<Box<dyn query_engine_rust::precompute_engine::IngestSource>> =
vec![Box::new(HttpIngestSource::new(HttpIngestConfig {
port: args.ingest_port,
}))];

// Build and run the engine
let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink);
let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink, sources);

info!("Starting precompute engine...");
engine.run().await?;
Expand Down
33 changes: 26 additions & 7 deletions asap-query-engine/src/bin/test_e2e_precompute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEng
use query_engine_rust::precompute_engine::output_sink::{
NoopOutputSink, RawPassthroughSink, StoreOutputSink,
};
use query_engine_rust::precompute_engine::PrecomputeEngine;
use query_engine_rust::precompute_engine::{
HttpIngestConfig, HttpIngestSource, IngestSource, PrecomputeEngine,
};
use query_engine_rust::stores::SimpleMapStore;
use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config};
use query_engine_rust::{HttpServer, HttpServerConfig};
Expand Down Expand Up @@ -142,7 +144,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Start precompute engine
let engine_config = PrecomputeEngineConfig {
num_workers: 2,
ingest_port: INGEST_PORT,
allowed_lateness_ms: 5000,
max_buffer_per_series: 10000,
flush_interval_ms: 200,
Expand All @@ -152,7 +153,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
late_data_policy: LateDataPolicy::Drop,
};
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
let engine = PrecomputeEngine::new(engine_config, streaming_config.clone(), output_sink);
let sources: Vec<Box<dyn IngestSource>> =
vec![Box::new(HttpIngestSource::new(HttpIngestConfig {
port: INGEST_PORT,
}))];
let engine = PrecomputeEngine::new(
engine_config,
streaming_config.clone(),
output_sink,
sources,
);
tokio::spawn(async move {
if let Err(e) = engine.run().await {
eprintln!("Precompute engine error: {e}");
Expand Down Expand Up @@ -282,7 +292,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let raw_agg_id: u64 = 1;
let raw_engine_config = PrecomputeEngineConfig {
num_workers: 4,
ingest_port: RAW_INGEST_PORT,
allowed_lateness_ms: 5000,
max_buffer_per_series: 10000,
flush_interval_ms: 200,
Expand All @@ -292,7 +301,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
late_data_policy: LateDataPolicy::Drop,
};
let raw_sink = Arc::new(RawPassthroughSink::new(store.clone()));
let raw_engine = PrecomputeEngine::new(raw_engine_config, streaming_config.clone(), raw_sink);
let raw_sources: Vec<Box<dyn IngestSource>> =
vec![Box::new(HttpIngestSource::new(HttpIngestConfig {
port: RAW_INGEST_PORT,
}))];
let raw_engine = PrecomputeEngine::new(
raw_engine_config,
streaming_config.clone(),
raw_sink,
raw_sources,
);
tokio::spawn(async move {
if let Err(e) = raw_engine.run().await {
eprintln!("Raw precompute engine error: {e}");
Expand Down Expand Up @@ -630,7 +648,6 @@ async fn run_single_bench(
let noop_sink = Arc::new(NoopOutputSink::new());
let engine_config = PrecomputeEngineConfig {
num_workers,
ingest_port: port,
allowed_lateness_ms: 5000,
max_buffer_per_series: 100_000,
flush_interval_ms: 100,
Expand All @@ -639,7 +656,9 @@ async fn run_single_bench(
raw_mode_aggregation_id: 0,
late_data_policy: LateDataPolicy::Drop,
};
let engine = PrecomputeEngine::new(engine_config, streaming_config, noop_sink.clone());
let sources: Vec<Box<dyn IngestSource>> =
vec![Box::new(HttpIngestSource::new(HttpIngestConfig { port }))];
let engine = PrecomputeEngine::new(engine_config, streaming_config, noop_sink.clone(), sources);
tokio::spawn(async move {
if let Err(e) = engine.run().await {
eprintln!("Bench engine error: {e}");
Expand Down
5 changes: 4 additions & 1 deletion asap-query-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ pub use drivers::{

pub use precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig};
pub use precompute_engine::output_sink::StoreOutputSink;
pub use precompute_engine::{PrecomputeEngine, PrecomputeEngineHandle};
pub use precompute_engine::{
HttpIngestConfig, HttpIngestSource, IngestContext, IngestSource, PrecomputeEngine,
PrecomputeEngineHandle,
};

pub use query_tracker::{QueryTracker, QueryTrackerConfig};

Expand Down
19 changes: 14 additions & 5 deletions asap-query-engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use query_engine_rust::precompute_engine::PrecomputeWorkerDiagnostics;
use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config};
use query_engine_rust::InferenceConfig;
use query_engine_rust::{
HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpReceiver,
OtlpReceiverConfig, PrecomputeEngine, PrecomputeEngineConfig, PrecomputeEngineHandle, Result,
SimpleEngine, SimpleMapStore, StoreOutputSink,
HttpIngestConfig, HttpIngestSource, HttpServer, HttpServerConfig, IngestSource, KafkaConsumer,
KafkaConsumerConfig, OtlpReceiver, OtlpReceiverConfig, PrecomputeEngine,
PrecomputeEngineConfig, PrecomputeEngineHandle, Result, SimpleEngine, SimpleMapStore,
StoreOutputSink,
};

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -342,7 +343,6 @@ async fn main() -> Result<()> {
let precompute_handle = if enable_precompute {
let precompute_config = PrecomputeEngineConfig {
num_workers: args.precompute_num_workers,
ingest_port: args.prometheus_remote_write_port,
allowed_lateness_ms: args.precompute_allowed_lateness_ms,
max_buffer_per_series: args.precompute_max_buffer_per_series,
flush_interval_ms: args.precompute_flush_interval_ms,
Expand All @@ -352,7 +352,16 @@ async fn main() -> Result<()> {
late_data_policy: LateDataPolicy::Drop,
};
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
let pe = PrecomputeEngine::new(precompute_config, streaming_config.clone(), output_sink);
let sources: Vec<Box<dyn IngestSource>> =
vec![Box::new(HttpIngestSource::new(HttpIngestConfig {
port: args.prometheus_remote_write_port,
}))];
let pe = PrecomputeEngine::new(
precompute_config,
streaming_config.clone(),
output_sink,
sources,
);
let worker_diagnostics = pe.diagnostics();
// Extract the handle before run() consumes the engine.
pe_engine_handle = Some(pe.handle());
Expand Down
4 changes: 0 additions & 4 deletions asap-query-engine/src/precompute_engine/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ pub enum LateDataPolicy {
pub struct PrecomputeEngineConfig {
/// Number of worker threads for parallel processing.
pub num_workers: usize,
/// Port for the Prometheus remote write ingest endpoint.
pub ingest_port: u16,
/// Maximum allowed lateness for out-of-order samples (milliseconds).
/// Samples arriving later than this behind the watermark are dropped.
pub allowed_lateness_ms: i64,
Expand All @@ -38,7 +36,6 @@ impl Default for PrecomputeEngineConfig {
fn default() -> Self {
Self {
num_workers: 4,
ingest_port: 9090,
allowed_lateness_ms: 5_000,
max_buffer_per_series: 10_000,
flush_interval_ms: 1_000,
Expand All @@ -58,7 +55,6 @@ mod tests {
fn test_default_config() {
let config = PrecomputeEngineConfig::default();
assert_eq!(config.num_workers, 4);
assert_eq!(config.ingest_port, 9090);
assert_eq!(config.allowed_lateness_ms, 5_000);
assert_eq!(config.max_buffer_per_series, 10_000);
assert_eq!(config.flush_interval_ms, 1_000);
Expand Down
66 changes: 32 additions & 34 deletions asap-query-engine/src/precompute_engine/engine.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use crate::data_model::StreamingConfig;
use crate::precompute_engine::config::PrecomputeEngineConfig;
use crate::precompute_engine::ingest_handler::{
handle_prometheus_ingest, handle_victoriametrics_ingest, IngestState,
};
use crate::precompute_engine::ingest_source::{IngestContext, IngestSource};
use crate::precompute_engine::output_sink::OutputSink;
use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage};
use crate::precompute_engine::worker::{Worker, WorkerRuntimeConfig};
use arc_swap::ArcSwap;
use asap_types::aggregation_config::AggregationConfig;
use axum::{routing::post, Router};
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, AtomicUsize};
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tracing::{info, warn};

Expand Down Expand Up @@ -64,14 +60,15 @@ impl PrecomputeEngineHandle {

/// The top-level precompute engine orchestrator.
///
/// Creates worker threads, the series router, and the Axum ingest server.
/// Creates worker threads and drives all registered ingest sources.
/// Call `handle()` before `run()` to obtain a `PrecomputeEngineHandle` for
/// applying runtime config updates while the engine is running.
pub struct PrecomputeEngine {
config: PrecomputeEngineConfig,
streaming_config: Arc<StreamingConfig>,
output_sink: Arc<dyn OutputSink>,
diagnostics: Arc<PrecomputeWorkerDiagnostics>,
sources: Vec<Box<dyn IngestSource>>,
/// Channels created at construction so handle() can be extracted before run().
senders: Vec<mpsc::Sender<WorkerMessage>>,
receivers: Option<Vec<mpsc::Receiver<WorkerMessage>>>,
Expand All @@ -84,6 +81,7 @@ impl PrecomputeEngine {
config: PrecomputeEngineConfig,
streaming_config: Arc<StreamingConfig>,
output_sink: Arc<dyn OutputSink>,
sources: Vec<Box<dyn IngestSource>>,
) -> Self {
let worker_group_counts = (0..config.num_workers)
.map(|_| Arc::new(AtomicUsize::new(0)))
Expand All @@ -96,8 +94,6 @@ impl PrecomputeEngine {
worker_watermarks,
});

// Build channels and initial agg_configs at construction time so that
// handle() can be called before run().
let channel_size = config.channel_buffer_size;
let mut senders = Vec::with_capacity(config.num_workers);
let mut receivers = Vec::with_capacity(config.num_workers);
Expand All @@ -119,6 +115,7 @@ impl PrecomputeEngine {
streaming_config,
output_sink,
diagnostics,
sources,
senders,
receivers: Some(receivers),
ingest_agg_configs,
Expand All @@ -139,8 +136,8 @@ impl PrecomputeEngine {
}
}

/// Start the precompute engine. This spawns worker tasks and the HTTP
/// ingest server, then blocks until shutdown.
/// Start the precompute engine. This spawns worker tasks and all registered
/// ingest sources, then blocks until shutdown.
pub async fn run(mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let num_workers = self.config.num_workers;

Expand Down Expand Up @@ -185,48 +182,49 @@ impl PrecomputeEngine {
worker_handles.push(handle);
}

info!(
"PrecomputeEngine started with {} workers on port {}",
num_workers, self.config.ingest_port
);
info!("PrecomputeEngine started with {} workers", num_workers);

// Build the ingest state, sharing the same Arc<ArcSwap> as the handle so
// that PrecomputeEngineHandle::update_streaming_config swaps are visible here.
let ingest_state = Arc::new(IngestState {
router,
samples_ingested: std::sync::atomic::AtomicU64::new(0),
// Build the ingest context shared by all sources.
// The ArcSwap pointer is the same one held by PrecomputeEngineHandle, so
// handle.update_streaming_config() is immediately visible to every source.
let ctx = IngestContext {
router: router.clone(),
agg_configs: self.ingest_agg_configs.clone(),
pass_raw_samples: self.config.pass_raw_samples,
});
};

// Start flush timer
let flush_state = ingest_state.clone();
// Flush timer: periodically signal workers to close idle windows.
let flush_router = router.clone();
let flush_interval_ms = self.config.flush_interval_ms;
tokio::spawn(async move {
let mut interval =
tokio::time::interval(tokio::time::Duration::from_millis(flush_interval_ms));
loop {
interval.tick().await;
if let Err(e) = flush_state.router.broadcast_flush().await {
if let Err(e) = flush_router.broadcast_flush().await {
warn!("Flush broadcast error: {}", e);
break;
}
}
});

// Start the Axum HTTP server for ingest (Prometheus + VictoriaMetrics)
let app = Router::new()
.route("/api/v1/write", post(handle_prometheus_ingest))
.route("/api/v1/import", post(handle_victoriametrics_ingest))
.with_state(ingest_state);

let addr = format!("0.0.0.0:{}", self.config.ingest_port);
info!("Ingest server listening on {}", addr);
// Spawn each ingest source.
let mut source_handles = Vec::with_capacity(self.sources.len());
for source in self.sources {
let ctx = ctx.clone();
let handle = tokio::spawn(async move {
if let Err(e) = source.run(ctx).await {
warn!("Ingest source error: {}", e);
}
});
source_handles.push(handle);
}

let listener = TcpListener::bind(&addr).await?;
axum::serve(listener, app).await?;
// Block until all sources finish (normally only on shutdown).
for handle in source_handles {
let _ = handle.await;
}

// Wait for workers to finish (this only happens on shutdown)
for handle in worker_handles {
let _ = handle.await;
}
Expand Down
Loading
Loading