Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions asap-query-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ zstd = "0.13"
reqwest = { version = "0.11", features = ["json"] }
tracing-appender = "0.2"
arc-swap = "1"
csv = "1"
elastic_dsl_utilities.workspace = true
asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" }

Expand Down
76 changes: 73 additions & 3 deletions asap-query-engine/src/bin/precompute_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use query_engine_rust::data_model::{
use query_engine_rust::drivers::query::adapters::AdapterConfig;
use query_engine_rust::engines::SimpleEngine;
use query_engine_rust::precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig};
use query_engine_rust::precompute_engine::csv_ingest::{CsvFileIngestConfig, CsvFileIngestSource};
use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, StoreOutputSink};
use query_engine_rust::precompute_engine::{HttpIngestConfig, HttpIngestSource, PrecomputeEngine};
use query_engine_rust::stores::SimpleMapStore;
Expand Down Expand Up @@ -65,6 +66,40 @@ struct Args {
/// Policy for handling late samples that arrive after their window has closed
#[arg(long, value_enum, default_value_t = LateDataPolicy::Drop)]
late_data_policy: LateDataPolicy,

// --- CSV file ingest (alternative to HTTP) ---
/// Path to a local CSV file to ingest instead of listening for HTTP writes
#[arg(long)]
input_file: Option<String>,

/// Metric name to assign to every row (required with --input_file)
#[arg(long)]
csv_metric_name: Option<String>,

/// CSV column to use as the float value (required with --input_file)
#[arg(long)]
csv_value_col: Option<String>,

/// Comma-separated CSV columns to include as labels (e.g. "id1,id2,id3")
#[arg(long, default_value = "")]
csv_label_cols: String,

/// CSV column containing timestamps in milliseconds; omit to synthesize timestamps
#[arg(long)]
csv_timestamp_col: Option<String>,

/// Start timestamp (ms) for synthesized timestamps when --csv_timestamp_col is absent
#[arg(long, default_value_t = 0)]
csv_start_ts_ms: i64,

/// Milliseconds between consecutive rows for synthesized timestamps
/// (required when --csv_timestamp_col is absent)
#[arg(long)]
csv_ts_step_ms: Option<i64>,

/// Number of CSV rows per batch sent to workers
#[arg(long, default_value_t = 1000)]
csv_batch_size: usize,
}

#[tokio::main]
Expand Down Expand Up @@ -146,9 +181,44 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
};

let sources: Vec<Box<dyn query_engine_rust::precompute_engine::IngestSource>> =
vec![Box::new(HttpIngestSource::new(HttpIngestConfig {
port: args.ingest_port,
}))];
if let Some(path) = args.input_file {
let metric_name = args
.csv_metric_name
.ok_or("--csv_metric_name is required with --input_file")?;
let value_col = args
.csv_value_col
.ok_or("--csv_value_col is required with --input_file")?;
let ts_step_ms = if args.csv_timestamp_col.is_none() {
args.csv_ts_step_ms.ok_or(
"--csv_ts_step_ms is required when --csv_timestamp_col is not specified",
)?
} else {
args.csv_ts_step_ms.unwrap_or(0)
};
let label_cols = if args.csv_label_cols.is_empty() {
vec![]
} else {
args.csv_label_cols
.split(',')
.map(|s| s.trim().to_string())
.collect()
};
info!("File ingest mode: {}", path);
vec![Box::new(CsvFileIngestSource::new(CsvFileIngestConfig {
path,
metric_name,
value_col,
label_cols,
timestamp_col: args.csv_timestamp_col,
start_ts_ms: args.csv_start_ts_ms,
ts_step_ms,
batch_size: args.csv_batch_size,
}))]
} else {
vec![Box::new(HttpIngestSource::new(HttpIngestConfig {
port: args.ingest_port,
}))]
};

// Build and run the engine
let engine = PrecomputeEngine::new(engine_config, streaming_config, output_sink, sources);
Expand Down
86 changes: 79 additions & 7 deletions asap-query-engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use query_engine_rust::data_model::enums::{
};
use query_engine_rust::drivers::AdapterConfig;
use query_engine_rust::precompute_engine::config::LateDataPolicy;
use query_engine_rust::precompute_engine::csv_ingest::{CsvFileIngestConfig, CsvFileIngestSource};
use query_engine_rust::precompute_engine::PrecomputeWorkerDiagnostics;
use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config};
use query_engine_rust::InferenceConfig;
Expand Down Expand Up @@ -165,6 +166,40 @@ struct Args {
/// Query tracker: observation window in seconds before triggering planning
#[arg(long, default_value = "100")]
tracker_observation_window_secs: u64,

// --- CSV file ingest (alternative to HTTP remote write) ---
/// Path to a local CSV file to ingest instead of listening for HTTP writes
#[arg(long)]
input_file: Option<String>,

/// Metric name to assign to every row (required with --input-file)
#[arg(long)]
csv_metric_name: Option<String>,

/// CSV column to use as the float value (required with --input-file)
#[arg(long)]
csv_value_col: Option<String>,

/// Comma-separated CSV columns to include as labels (e.g. "id1,id2,id3")
#[arg(long, default_value = "")]
csv_label_cols: String,

/// CSV column containing timestamps in milliseconds; omit to synthesize timestamps
#[arg(long)]
csv_timestamp_col: Option<String>,

/// Start timestamp (ms) for synthesized timestamps when --csv-timestamp-col is absent
#[arg(long, default_value_t = 0)]
csv_start_ts_ms: i64,

/// Milliseconds between consecutive rows for synthesized timestamps
/// (required when --csv-timestamp-col is absent)
#[arg(long)]
csv_ts_step_ms: Option<i64>,

/// Number of CSV rows per batch sent to workers
#[arg(long, default_value_t = 1000)]
csv_batch_size: usize,
}

#[tokio::main]
Expand Down Expand Up @@ -333,9 +368,10 @@ async fn main() -> Result<()> {
};

// Setup precompute engine (replaces standalone Prometheus remote write server)
// Automatically enable when using precompute streaming engine
let enable_precompute =
args.enable_prometheus_remote_write || args.streaming_engine == StreamingEngine::Precompute;
// Automatically enable when using precompute streaming engine or ingesting from a file
let enable_precompute = args.enable_prometheus_remote_write
|| args.streaming_engine == StreamingEngine::Precompute
|| args.input_file.is_some();

// Handle extracted before run() so the applier task can call update_streaming_config.
let mut pe_engine_handle: Option<PrecomputeEngineHandle> = None;
Expand All @@ -352,10 +388,46 @@ async fn main() -> Result<()> {
late_data_policy: LateDataPolicy::Drop,
};
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
let sources: Vec<Box<dyn IngestSource>> =
let sources: Vec<Box<dyn IngestSource>> = if let Some(ref path) = args.input_file {
let metric_name = args
.csv_metric_name
.clone()
.ok_or("--csv-metric-name is required with --input-file")?;
let value_col = args
.csv_value_col
.clone()
.ok_or("--csv-value-col is required with --input-file")?;
let ts_step_ms = if args.csv_timestamp_col.is_none() {
args.csv_ts_step_ms.ok_or(
"--csv-ts-step-ms is required when --csv-timestamp-col is not specified",
)?
} else {
args.csv_ts_step_ms.unwrap_or(0)
};
let label_cols = if args.csv_label_cols.is_empty() {
vec![]
} else {
args.csv_label_cols
.split(',')
.map(|s| s.trim().to_string())
.collect()
};
info!("File ingest mode: {}", path);
vec![Box::new(CsvFileIngestSource::new(CsvFileIngestConfig {
path: path.clone(),
metric_name,
value_col,
label_cols,
timestamp_col: args.csv_timestamp_col.clone(),
start_ts_ms: args.csv_start_ts_ms,
ts_step_ms,
batch_size: args.csv_batch_size,
}))]
} else {
vec![Box::new(HttpIngestSource::new(HttpIngestConfig {
port: args.prometheus_remote_write_port,
}))];
}))]
};
let pe = PrecomputeEngine::new(
precompute_config,
streaming_config.clone(),
Expand Down Expand Up @@ -412,8 +484,8 @@ async fn main() -> Result<()> {
adapter_config,
};

// Verify Prometheus is reachable before starting
{
// Verify Prometheus is reachable before starting (only needed when forwarding queries)
if args.forward_unsupported_queries {
let client = reqwest::Client::new();
let health_url = format!(
"{}/api/v1/status/runtimeinfo",
Expand Down
Loading
Loading