diff --git a/Cargo.lock b/Cargo.lock index 3bd7935..a849360 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3569,6 +3569,7 @@ dependencies = [ "chrono", "clap 4.6.0", "criterion", + "csv", "ctor", "dashmap 5.5.3", "datafusion", diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index fb2bab6..9b73901 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -58,6 +58,7 @@ zstd = "0.13" reqwest = { version = "0.11", features = ["json"] } tracing-appender = "0.2" arc-swap = "1" +csv = "1" elastic_dsl_utilities.workspace = true asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" } diff --git a/asap-query-engine/src/bin/precompute_engine.rs b/asap-query-engine/src/bin/precompute_engine.rs index 4e2befb..1a65898 100644 --- a/asap-query-engine/src/bin/precompute_engine.rs +++ b/asap-query-engine/src/bin/precompute_engine.rs @@ -6,6 +6,7 @@ use query_engine_rust::data_model::{ use query_engine_rust::drivers::query::adapters::AdapterConfig; use query_engine_rust::engines::SimpleEngine; use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; +use query_engine_rust::precompute_engine::csv_ingest::{CsvFileIngestConfig, CsvFileIngestSource}; use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink}; use query_engine_rust::precompute_engine::{HttpIngestConfig, HttpIngestSource, PrecomputeEngine}; use query_engine_rust::stores::SimpleMapStore; @@ -65,6 +66,40 @@ struct Args { /// Policy for handling late samples that arrive after their window has closed #[arg(long, value_enum, default_value_t = LateDataPolicy::Drop)] late_data_policy: LateDataPolicy, + + // --- CSV file ingest (alternative to HTTP) --- + /// Path to a local CSV file to ingest instead of listening for HTTP writes + #[arg(long)] + input_file: Option, + + /// Metric name to assign to every row (required with --input_file) + #[arg(long)] + csv_metric_name: Option, + + /// CSV column to use as the float value (required with --input_file) + #[arg(long)] + csv_value_col: Option, + + /// Comma-separated CSV columns to include as labels (e.g. "id1,id2,id3") + #[arg(long, default_value = "")] + csv_label_cols: String, + + /// CSV column containing timestamps in milliseconds; omit to synthesize timestamps + #[arg(long)] + csv_timestamp_col: Option, + + /// Start timestamp (ms) for synthesized timestamps when --csv_timestamp_col is absent + #[arg(long, default_value_t = 0)] + csv_start_ts_ms: i64, + + /// Milliseconds between consecutive rows for synthesized timestamps + /// (required when --csv_timestamp_col is absent) + #[arg(long)] + csv_ts_step_ms: Option, + + /// Number of CSV rows per batch sent to workers + #[arg(long, default_value_t = 1000)] + csv_batch_size: usize, } #[tokio::main] @@ -146,9 +181,44 @@ async fn main() -> Result<(), Box> { }; let sources: Vec> = - vec![Box::new(HttpIngestSource::new(HttpIngestConfig { - port: args.ingest_port, - }))]; + if let Some(path) = args.input_file { + let metric_name = args + .csv_metric_name + .ok_or("--csv_metric_name is required with --input_file")?; + let value_col = args + .csv_value_col + .ok_or("--csv_value_col is required with --input_file")?; + let ts_step_ms = if args.csv_timestamp_col.is_none() { + args.csv_ts_step_ms.ok_or( + "--csv_ts_step_ms is required when --csv_timestamp_col is not specified", + )? + } else { + args.csv_ts_step_ms.unwrap_or(0) + }; + let label_cols = if args.csv_label_cols.is_empty() { + vec![] + } else { + args.csv_label_cols + .split(',') + .map(|s| s.trim().to_string()) + .collect() + }; + info!("File ingest mode: {}", path); + vec![Box::new(CsvFileIngestSource::new(CsvFileIngestConfig { + path, + metric_name, + value_col, + label_cols, + timestamp_col: args.csv_timestamp_col, + start_ts_ms: args.csv_start_ts_ms, + ts_step_ms, + batch_size: args.csv_batch_size, + }))] + } else { + vec![Box::new(HttpIngestSource::new(HttpIngestConfig { + port: args.ingest_port, + }))] + }; // Build and run the engine let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink, sources); diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index 87463b1..6c601f0 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -13,6 +13,7 @@ use query_engine_rust::data_model::enums::{ }; use query_engine_rust::drivers::AdapterConfig; use query_engine_rust::precompute_engine::config::LateDataPolicy; +use query_engine_rust::precompute_engine::csv_ingest::{CsvFileIngestConfig, CsvFileIngestSource}; use query_engine_rust::precompute_engine::PrecomputeWorkerDiagnostics; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; use query_engine_rust::InferenceConfig; @@ -165,6 +166,40 @@ struct Args { /// Query tracker: observation window in seconds before triggering planning #[arg(long, default_value = "100")] tracker_observation_window_secs: u64, + + // --- CSV file ingest (alternative to HTTP remote write) --- + /// Path to a local CSV file to ingest instead of listening for HTTP writes + #[arg(long)] + input_file: Option, + + /// Metric name to assign to every row (required with --input-file) + #[arg(long)] + csv_metric_name: Option, + + /// CSV column to use as the float value (required with --input-file) + #[arg(long)] + csv_value_col: Option, + + /// Comma-separated CSV columns to include as labels (e.g. "id1,id2,id3") + #[arg(long, default_value = "")] + csv_label_cols: String, + + /// CSV column containing timestamps in milliseconds; omit to synthesize timestamps + #[arg(long)] + csv_timestamp_col: Option, + + /// Start timestamp (ms) for synthesized timestamps when --csv-timestamp-col is absent + #[arg(long, default_value_t = 0)] + csv_start_ts_ms: i64, + + /// Milliseconds between consecutive rows for synthesized timestamps + /// (required when --csv-timestamp-col is absent) + #[arg(long)] + csv_ts_step_ms: Option, + + /// Number of CSV rows per batch sent to workers + #[arg(long, default_value_t = 1000)] + csv_batch_size: usize, } #[tokio::main] @@ -333,9 +368,10 @@ async fn main() -> Result<()> { }; // Setup precompute engine (replaces standalone Prometheus remote write server) - // Automatically enable when using precompute streaming engine - let enable_precompute = - args.enable_prometheus_remote_write || args.streaming_engine == StreamingEngine::Precompute; + // Automatically enable when using precompute streaming engine or ingesting from a file + let enable_precompute = args.enable_prometheus_remote_write + || args.streaming_engine == StreamingEngine::Precompute + || args.input_file.is_some(); // Handle extracted before run() so the applier task can call update_streaming_config. let mut pe_engine_handle: Option = None; @@ -352,10 +388,46 @@ async fn main() -> Result<()> { late_data_policy: LateDataPolicy::Drop, }; let output_sink = Arc::new(StoreOutputSink::new(store.clone())); - let sources: Vec> = + let sources: Vec> = if let Some(ref path) = args.input_file { + let metric_name = args + .csv_metric_name + .clone() + .ok_or("--csv-metric-name is required with --input-file")?; + let value_col = args + .csv_value_col + .clone() + .ok_or("--csv-value-col is required with --input-file")?; + let ts_step_ms = if args.csv_timestamp_col.is_none() { + args.csv_ts_step_ms.ok_or( + "--csv-ts-step-ms is required when --csv-timestamp-col is not specified", + )? + } else { + args.csv_ts_step_ms.unwrap_or(0) + }; + let label_cols = if args.csv_label_cols.is_empty() { + vec![] + } else { + args.csv_label_cols + .split(',') + .map(|s| s.trim().to_string()) + .collect() + }; + info!("File ingest mode: {}", path); + vec![Box::new(CsvFileIngestSource::new(CsvFileIngestConfig { + path: path.clone(), + metric_name, + value_col, + label_cols, + timestamp_col: args.csv_timestamp_col.clone(), + start_ts_ms: args.csv_start_ts_ms, + ts_step_ms, + batch_size: args.csv_batch_size, + }))] + } else { vec![Box::new(HttpIngestSource::new(HttpIngestConfig { port: args.prometheus_remote_write_port, - }))]; + }))] + }; let pe = PrecomputeEngine::new( precompute_config, streaming_config.clone(), @@ -412,8 +484,8 @@ async fn main() -> Result<()> { adapter_config, }; - // Verify Prometheus is reachable before starting - { + // Verify Prometheus is reachable before starting (only needed when forwarding queries) + if args.forward_unsupported_queries { let client = reqwest::Client::new(); let health_url = format!( "{}/api/v1/status/runtimeinfo", diff --git a/asap-query-engine/src/precompute_engine/csv_ingest.rs b/asap-query-engine/src/precompute_engine/csv_ingest.rs new file mode 100644 index 0000000..459572f --- /dev/null +++ b/asap-query-engine/src/precompute_engine/csv_ingest.rs @@ -0,0 +1,172 @@ +use crate::drivers::ingest::prometheus_remote_write::DecodedSample; +use crate::precompute_engine::ingest_source::{route_decoded_samples, IngestContext, IngestSource}; +use std::time::Instant; +use tracing::info; + +pub struct CsvFileIngestConfig { + pub path: String, + pub metric_name: String, + pub value_col: String, + /// Label columns. Will be sorted alphabetically in the labels string. + pub label_cols: Vec, + /// If Some, parse this column as the timestamp in milliseconds. + /// If None, synthesize timestamps using start_ts_ms + row_index * ts_step_ms. + pub timestamp_col: Option, + pub start_ts_ms: i64, + /// Required when timestamp_col is None. + pub ts_step_ms: i64, + pub batch_size: usize, +} + +pub struct CsvFileIngestSource { + config: CsvFileIngestConfig, +} + +impl CsvFileIngestSource { + pub fn new(config: CsvFileIngestConfig) -> Self { + Self { config } + } +} + +#[async_trait::async_trait] +impl IngestSource for CsvFileIngestSource { + async fn run( + self: Box, + ctx: IngestContext, + ) -> Result<(), Box> { + let config = self.config; + let (tx, mut rx) = tokio::sync::mpsc::channel::>(8); + + let reader_handle = tokio::task::spawn_blocking( + move || -> Result> { + let mut rdr = csv::Reader::from_path(&config.path)?; + let headers = rdr.headers()?.clone(); + + let value_idx = headers + .iter() + .position(|h| h == config.value_col) + .ok_or_else(|| format!("value column '{}' not found in CSV", config.value_col)) + .map_err(|e| -> Box { + std::io::Error::other(e).into() + })?; + + let ts_idx = config + .timestamp_col + .as_ref() + .map(|col| { + headers + .iter() + .position(|h| h == col.as_str()) + .ok_or_else(|| format!("timestamp column '{}' not found in CSV", col)) + .map_err(|e| -> Box { + std::io::Error::other(e).into() + }) + }) + .transpose()?; + + let mut sorted_label_cols = config.label_cols.clone(); + sorted_label_cols.sort(); + + let mut label_idxs: Vec<(String, usize)> = Vec::new(); + for col in &sorted_label_cols { + let idx = headers + .iter() + .position(|h| h == col.as_str()) + .ok_or_else(|| format!("label column '{}' not found in CSV", col)) + .map_err(|e| -> Box { + std::io::Error::other(e).into() + })?; + label_idxs.push((col.clone(), idx)); + } + + let mut batch: Vec = Vec::with_capacity(config.batch_size); + let mut row_count: u64 = 0; + + for result in rdr.records() { + let record = result?; + + let labels = if label_idxs.is_empty() { + config.metric_name.clone() + } else { + let mut s = String::with_capacity(64); + s.push_str(&config.metric_name); + s.push('{'); + for (i, (col, idx)) in label_idxs.iter().enumerate() { + if i > 0 { + s.push(','); + } + s.push_str(col); + s.push_str("=\""); + s.push_str(record.get(*idx).unwrap_or("")); + s.push('"'); + } + s.push('}'); + s + }; + + let value: f64 = record + .get(value_idx) + .ok_or("missing value field") + .map_err(|e| -> Box { + std::io::Error::other(e).into() + })? + .parse() + .map_err(|e| -> Box { + std::io::Error::other(format!("failed to parse value: {}", e)).into() + })?; + + let timestamp_ms = match ts_idx { + Some(idx) => record + .get(idx) + .ok_or("missing timestamp field") + .map_err(|e| -> Box { + std::io::Error::other(e).into() + })? + .parse::() + .map_err(|e| -> Box { + std::io::Error::other(format!("failed to parse timestamp: {}", e)) + .into() + })?, + None => config.start_ts_ms + (row_count as i64) * config.ts_step_ms, + }; + + batch.push(DecodedSample { + labels, + timestamp_ms, + value, + }); + row_count += 1; + + if batch.len() >= config.batch_size { + let send_batch = + std::mem::replace(&mut batch, Vec::with_capacity(config.batch_size)); + if tx.blocking_send(send_batch).is_err() { + break; + } + } + } + + if !batch.is_empty() { + let _ = tx.blocking_send(batch); + } + + Ok(row_count) + }, + ); + + let mut total_samples: u64 = 0; + while let Some(batch) = rx.recv().await { + total_samples += batch.len() as u64; + route_decoded_samples(&ctx, batch, Instant::now()).await?; + } + + let rows = reader_handle.await??; + info!( + "CSV ingest complete: {} rows ingested, {} samples routed", + rows, total_samples + ); + + ctx.router.broadcast_shutdown().await?; + Ok(()) + } +} diff --git a/asap-query-engine/src/precompute_engine/mod.rs b/asap-query-engine/src/precompute_engine/mod.rs index 5202c41..d5c1066 100644 --- a/asap-query-engine/src/precompute_engine/mod.rs +++ b/asap-query-engine/src/precompute_engine/mod.rs @@ -1,5 +1,6 @@ pub mod accumulator_factory; pub mod config; +pub mod csv_ingest; mod engine; mod ingest_handler; pub mod ingest_source; @@ -9,6 +10,7 @@ pub mod series_router; pub mod window_manager; pub mod worker; +pub use csv_ingest::{CsvFileIngestConfig, CsvFileIngestSource}; pub use engine::{PrecomputeEngine, PrecomputeEngineHandle, PrecomputeWorkerDiagnostics}; pub use ingest_handler::{HttpIngestConfig, HttpIngestSource}; pub use ingest_source::{IngestContext, IngestSource};