diff --git a/asap-query-engine/src/bin/precompute_engine.rs b/asap-query-engine/src/bin/precompute_engine.rs index d5944a0..3cf2459 100644 --- a/asap-query-engine/src/bin/precompute_engine.rs +++ b/asap-query-engine/src/bin/precompute_engine.rs @@ -116,7 +116,7 @@ async fn main() -> Result<(), Box> { fallback: None, }, }; - let http_server = HttpServer::new(http_config, query_engine, store.clone()); + let http_server = HttpServer::new(http_config, query_engine, store.clone(), None); tokio::spawn(async move { if let Err(e) = http_server.run().await { tracing::error!("Query server error: {}", e); diff --git a/asap-query-engine/src/bin/test_e2e_precompute.rs b/asap-query-engine/src/bin/test_e2e_precompute.rs index faf1fe1..1bdc2f5 100644 --- a/asap-query-engine/src/bin/test_e2e_precompute.rs +++ b/asap-query-engine/src/bin/test_e2e_precompute.rs @@ -131,7 +131,7 @@ async fn main() -> Result<(), Box> { fallback: None, }, }; - let http_server = HttpServer::new(http_config, query_engine, store.clone()); + let http_server = HttpServer::new(http_config, query_engine, store.clone(), None); tokio::spawn(async move { if let Err(e) = http_server.run().await { eprintln!("Query server error: {e}"); diff --git a/asap-query-engine/src/drivers/query/servers/http.rs b/asap-query-engine/src/drivers/query/servers/http.rs index 322f560..3866bbd 100644 --- a/asap-query-engine/src/drivers/query/servers/http.rs +++ b/asap-query-engine/src/drivers/query/servers/http.rs @@ -16,6 +16,7 @@ use tracing::{debug, info}; use crate::drivers::query::adapters::{create_http_adapter, AdapterConfig, HttpProtocolAdapter}; use crate::engines::SimpleEngine; +use crate::query_tracker::QueryTracker; use crate::stores::Store; #[derive(Debug, Clone)] @@ -30,6 +31,7 @@ pub struct HttpServer { config: HttpServerConfig, query_engine: Arc, store: Arc, + query_tracker: Option>, } #[derive(Clone)] @@ -37,6 +39,7 @@ struct AppState { config: HttpServerConfig, query_engine: Arc, store: Arc, + query_tracker: Option>, adapter: Arc, fallback: Option>, } @@ -46,11 +49,13 @@ impl HttpServer { config: HttpServerConfig, query_engine: Arc, store: Arc, + query_tracker: Option>, ) -> Self { Self { config, query_engine, store, + query_tracker, } } @@ -71,6 +76,7 @@ impl HttpServer { config: self.config.clone(), query_engine: self.query_engine, store: self.store, + query_tracker: self.query_tracker, adapter: adapter.clone(), fallback: self.config.adapter_config.fallback.clone(), }; @@ -108,6 +114,7 @@ impl HttpServer { config: self.config.clone(), query_engine: self.query_engine.clone(), store: self.store.clone(), + query_tracker: self.query_tracker.clone(), adapter: adapter.clone(), fallback: self.config.adapter_config.fallback.clone(), }; @@ -171,6 +178,11 @@ async fn process_query_request( } } + // Record query for passive auto-discovery (if tracker is enabled) + if let Some(tracker) = &state.query_tracker { + tracker.record_instant(&parsed_request.query, parsed_request.time); + } + // Step 2: Execute query with engine (using parsed request) let query_start_time = Instant::now(); debug!( @@ -448,6 +460,16 @@ async fn process_range_query_request( }; } + // Record query for passive auto-discovery (if tracker is enabled) + if let Some(tracker) = &state.query_tracker { + tracker.record_range( + &parsed_request.query, + parsed_request.start, + parsed_request.end, + parsed_request.step, + ); + } + // Execute range query with engine let query_start_time = Instant::now(); debug!( @@ -611,7 +633,7 @@ mod tests { crate::data_model::QueryLanguage::promql, )); - let server = HttpServer::new(config, query_engine, store); + let server = HttpServer::new(config, query_engine, store, None); server .start_test_server() .await diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index 72d7841..37996c5 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -17,6 +17,7 @@ pub mod engines; pub mod planner_client; pub mod precompute_engine; pub mod precompute_operators; +pub mod query_tracker; pub mod stores; #[cfg(test)] @@ -48,6 +49,8 @@ pub use precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig}; pub use precompute_engine::output_sink::StoreOutputSink; pub use precompute_engine::PrecomputeEngine; +pub use query_tracker::{QueryTracker, QueryTrackerConfig}; + pub use utils::{normalize_spatial_filter, read_inference_config, read_streaming_config}; pub type Result = std::result::Result>; diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index 0e5cb8f..0be1d13 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -151,6 +151,14 @@ struct Args { /// Capacity of the channel between router and each worker #[arg(long, default_value = "10000")] precompute_channel_buffer_size: usize, + + /// Enable automatic query tracking and planning + #[arg(long)] + enable_query_tracker: bool, + + /// Query tracker: observation window in seconds before triggering planning + #[arg(long, default_value = "100")] + tracker_observation_window_secs: u64, } #[tokio::main] @@ -342,7 +350,38 @@ async fn main() -> Result<()> { adapter_config, }; - let server = HttpServer::new(http_config, engine, store); + let query_tracker = if args.enable_query_tracker { + use query_engine_rust::planner_client::LocalPlannerClient; + use query_engine_rust::QueryTrackerConfig; + + let tracker_config = QueryTrackerConfig { + observation_window_secs: args.tracker_observation_window_secs, + prometheus_scrape_interval: args.prometheus_scrape_interval, + }; + let runtime_options = asap_planner::RuntimeOptions { + prometheus_scrape_interval: args.prometheus_scrape_interval, + streaming_engine: asap_planner::StreamingEngine::Precompute, + enable_punting: false, + range_duration: 300, + step: args.prometheus_scrape_interval, + }; + let planner_client = Arc::new(LocalPlannerClient::new( + runtime_options, + args.query_language, + args.prometheus_server.clone(), + )); + let tracker = Arc::new(query_engine_rust::QueryTracker::new(tracker_config)); + let _tracker_handle = tracker.start_background_loop(planner_client); + info!( + "Query tracker enabled (observation window: {}s)", + args.tracker_observation_window_secs + ); + Some(tracker) + } else { + None + }; + + let server = HttpServer::new(http_config, engine, store, query_tracker); info!("Starting HTTP server on port {}", args.http_port); // Wait for shutdown signal diff --git a/asap-query-engine/src/planner_client.rs b/asap-query-engine/src/planner_client.rs index 48c56b5..982812f 100644 --- a/asap-query-engine/src/planner_client.rs +++ b/asap-query-engine/src/planner_client.rs @@ -1,8 +1,11 @@ use anyhow::Result; -use asap_planner::{Controller, ControllerConfig, PlannerOutput, RuntimeOptions}; +use asap_planner::{ + build_schema_from_prometheus, Controller, ControllerConfig, PlannerOutput, RuntimeOptions, +}; use sketch_db_common::enums::QueryLanguage; use sketch_db_common::inference_config::InferenceConfig; use sketch_db_common::streaming_config::StreamingConfig; +use tracing::warn; pub struct PlannerResult { pub streaming_config: StreamingConfig, @@ -18,13 +21,19 @@ pub trait PlannerClient: Send + Sync { pub struct LocalPlannerClient { runtime_options: RuntimeOptions, query_language: QueryLanguage, + prometheus_url: String, } impl LocalPlannerClient { - pub fn new(runtime_options: RuntimeOptions, query_language: QueryLanguage) -> Self { + pub fn new( + runtime_options: RuntimeOptions, + query_language: QueryLanguage, + prometheus_url: String, + ) -> Self { Self { runtime_options, query_language, + prometheus_url, } } } @@ -34,9 +43,35 @@ impl PlannerClient for LocalPlannerClient { async fn plan(&self, config: ControllerConfig) -> Result { let opts = self.runtime_options.clone(); let query_language = self.query_language; + let prometheus_url = self.prometheus_url.clone(); let output: PlannerOutput = tokio::task::spawn_blocking(move || { - let schema = config.schema_from_hints(); + let all_queries: Vec = config + .query_groups + .iter() + .flat_map(|qg| qg.queries.clone()) + .collect(); + let mut schema = match build_schema_from_prometheus(&prometheus_url, &all_queries) { + Ok(s) => s, + Err(e) => { + warn!( + "Prometheus metric discovery failed, falling back to config hints: {}", + e + ); + config.schema_from_hints() + } + }; + // Fall back to config-file hints for metrics not found in Prometheus. + if let Some(metric_hints) = &config.metrics { + for hint in metric_hints { + if !schema.config.contains_key(&hint.metric) { + schema = schema.add_metric( + hint.metric.clone(), + promql_utilities::data_model::KeyByLabelNames::new(hint.labels.clone()), + ); + } + } + } let controller = Controller::new(config, schema, opts); controller.generate() }) @@ -127,7 +162,13 @@ mod tests { #[tokio::test] async fn test_local_planner_client() { - let client = LocalPlannerClient::new(sample_runtime_options(), QueryLanguage::promql); + // Use a dummy URL; the test config has metric hints so Prometheus discovery + // failures are tolerated via the fallback path. + let client = LocalPlannerClient::new( + sample_runtime_options(), + QueryLanguage::promql, + "http://localhost:9090".to_string(), + ); let config = sample_controller_config(); let result = client.plan(config).await.expect("plan should succeed"); diff --git a/asap-query-engine/src/query_tracker/mod.rs b/asap-query-engine/src/query_tracker/mod.rs new file mode 100644 index 0000000..f0043cd --- /dev/null +++ b/asap-query-engine/src/query_tracker/mod.rs @@ -0,0 +1,3 @@ +mod tracker; + +pub use tracker::{QueryTracker, QueryTrackerConfig}; diff --git a/asap-query-engine/src/query_tracker/tracker.rs b/asap-query-engine/src/query_tracker/tracker.rs new file mode 100644 index 0000000..fc03b7a --- /dev/null +++ b/asap-query-engine/src/query_tracker/tracker.rs @@ -0,0 +1,241 @@ +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use asap_planner::query_log::{infer_queries, to_controller_config, LogEntry}; +use chrono::{DateTime, Utc}; +use tokio::task::JoinHandle; +use tracing::{info, warn}; + +use crate::planner_client::PlannerClient; + +#[derive(Debug, Clone)] +pub struct QueryTrackerConfig { + /// How often to evaluate and trigger planning (default: 600s = 10 min). + pub observation_window_secs: u64, + /// Prometheus scrape interval, passed through to `infer_queries`. + pub prometheus_scrape_interval: u64, +} + +pub struct QueryTracker { + entries: Mutex>, + config: QueryTrackerConfig, +} + +impl QueryTracker { + pub fn new(config: QueryTrackerConfig) -> Self { + Self { + entries: Mutex::new(Vec::new()), + config, + } + } + + /// Record an instant query (step=0, start==end==time). + pub fn record_instant(&self, query: &str, time: f64) { + let ts = Utc::now(); + let query_time = epoch_secs_to_datetime(time); + let entry = LogEntry { + query: query.to_string(), + start: query_time, + end: query_time, + step: 0, + ts, + }; + self.entries.lock().unwrap().push(entry); + } + + /// Record a range query with start/end/step from the request. + pub fn record_range(&self, query: &str, start: f64, end: f64, step: f64) { + let ts = Utc::now(); + let entry = LogEntry { + query: query.to_string(), + start: epoch_secs_to_datetime(start), + end: epoch_secs_to_datetime(end), + step: step as u64, + ts, + }; + self.entries.lock().unwrap().push(entry); + } + + /// Spawn a background task that periodically evaluates collected queries and calls the planner. + pub fn start_background_loop( + self: &Arc, + planner_client: Arc, + ) -> JoinHandle<()> { + let tracker = Arc::clone(self); + tokio::spawn(async move { + let mut interval = + tokio::time::interval(Duration::from_secs(tracker.config.observation_window_secs)); + // The first tick completes immediately; skip it so we wait a full window first. + interval.tick().await; + + loop { + interval.tick().await; + Self::evaluate(&tracker, &planner_client).await; + } + }) + } + + async fn evaluate(tracker: &Arc, planner_client: &Arc) { + // Snapshot and drain collected entries. + let entries = { + let mut guard = tracker.entries.lock().unwrap(); + std::mem::take(&mut *guard) + }; + + if entries.is_empty() { + info!("query_tracker: no queries observed in this window, skipping"); + return; + } + + info!( + "query_tracker: evaluating {} observed query entries", + entries.len() + ); + + let scrape_interval = tracker.config.prometheus_scrape_interval; + let (instants, ranges) = infer_queries(&entries, scrape_interval); + + info!( + "query_tracker: inferred {} instant queries, {} range queries", + instants.len(), + ranges.len() + ); + + if instants.is_empty() && ranges.is_empty() { + info!("query_tracker: no queries met frequency threshold, skipping planner call"); + return; + } + + let controller_config = to_controller_config(instants, ranges); + + info!( + "query_tracker: calling planner with {} query groups", + controller_config.query_groups.len() + ); + + match planner_client.plan(controller_config).await { + Ok(result) => { + info!( + "query_tracker: planner succeeded — streaming aggregations: {}, inference queries: {}, punted: {}", + result.streaming_config.aggregation_configs.len(), + result.inference_config.query_configs.len(), + result.punted_queries.len(), + ); + } + Err(e) => { + warn!("query_tracker: planner failed: {}", e); + } + } + } +} + +fn epoch_secs_to_datetime(secs: f64) -> DateTime { + DateTime::from_timestamp(secs as i64, ((secs.fract()) * 1_000_000_000.0) as u32) + .unwrap_or_else(Utc::now) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::planner_client::PlannerResult; + use anyhow::Result; + use sketch_db_common::inference_config::InferenceConfig; + use sketch_db_common::streaming_config::StreamingConfig; + use std::collections::HashMap; + use std::sync::atomic::{AtomicUsize, Ordering}; + + struct MockPlannerClient { + call_count: AtomicUsize, + } + + impl MockPlannerClient { + fn new() -> Self { + Self { + call_count: AtomicUsize::new(0), + } + } + } + + #[async_trait::async_trait] + impl PlannerClient for MockPlannerClient { + async fn plan(&self, _config: asap_planner::ControllerConfig) -> Result { + self.call_count.fetch_add(1, Ordering::SeqCst); + Ok(PlannerResult { + streaming_config: StreamingConfig::new(HashMap::new()), + inference_config: InferenceConfig::new( + sketch_db_common::enums::QueryLanguage::promql, + sketch_db_common::enums::CleanupPolicy::NoCleanup, + ), + punted_queries: vec![], + }) + } + } + + #[test] + fn record_instant_appends_entry() { + let tracker = QueryTracker::new(QueryTrackerConfig { + observation_window_secs: 600, + prometheus_scrape_interval: 15, + }); + tracker.record_instant("rate(http_requests_total[5m])", 1700000000.0); + tracker.record_instant("rate(http_requests_total[5m])", 1700000060.0); + let entries = tracker.entries.lock().unwrap(); + assert_eq!(entries.len(), 2); + assert_eq!(entries[0].step, 0); + assert_eq!(entries[0].start, entries[0].end); + } + + #[test] + fn record_range_appends_entry() { + let tracker = QueryTracker::new(QueryTrackerConfig { + observation_window_secs: 600, + prometheus_scrape_interval: 15, + }); + tracker.record_range( + "rate(http_requests_total[5m])", + 1700000000.0, + 1700003600.0, + 30.0, + ); + let entries = tracker.entries.lock().unwrap(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].step, 30); + assert_ne!(entries[0].start, entries[0].end); + } + + #[tokio::test] + async fn evaluate_calls_planner_with_entries() { + let tracker = Arc::new(QueryTracker::new(QueryTrackerConfig { + observation_window_secs: 600, + prometheus_scrape_interval: 15, + })); + + // Record enough entries for infer_queries to produce results (need >=2 per query). + for i in 0..5 { + tracker.record_instant( + "rate(http_requests_total[5m])", + 1700000000.0 + (i as f64 * 60.0), + ); + } + + let mock_client = Arc::new(MockPlannerClient::new()); + QueryTracker::evaluate(&tracker, &(mock_client.clone() as Arc)).await; + + assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 1); + // Entries should be drained after evaluate. + assert!(tracker.entries.lock().unwrap().is_empty()); + } + + #[tokio::test] + async fn evaluate_skips_when_no_entries() { + let tracker = Arc::new(QueryTracker::new(QueryTrackerConfig { + observation_window_secs: 600, + prometheus_scrape_interval: 15, + })); + + let mock_client = Arc::new(MockPlannerClient::new()); + QueryTracker::evaluate(&tracker, &(mock_client.clone() as Arc)).await; + + assert_eq!(mock_client.call_count.load(Ordering::SeqCst), 0); + } +} diff --git a/asap-query-engine/src/tests/clickhouse_forwarding_tests.rs b/asap-query-engine/src/tests/clickhouse_forwarding_tests.rs index e05e93a..899e987 100644 --- a/asap-query-engine/src/tests/clickhouse_forwarding_tests.rs +++ b/asap-query-engine/src/tests/clickhouse_forwarding_tests.rs @@ -69,7 +69,7 @@ async fn setup_test_server(clickhouse_port: u16, database: &str) -> (HttpServer, QueryLanguage::sql, )); - let server = HttpServer::new(config, query_engine, store); + let server = HttpServer::new(config, query_engine, store, None); let actual_port = server .start_test_server() .await diff --git a/asap-query-engine/src/tests/elastic_forwarding_tests.rs b/asap-query-engine/src/tests/elastic_forwarding_tests.rs index c06a9d3..b1c7b2f 100644 --- a/asap-query-engine/src/tests/elastic_forwarding_tests.rs +++ b/asap-query-engine/src/tests/elastic_forwarding_tests.rs @@ -137,7 +137,7 @@ async fn setup_test_server(elasticsearch_port: u16, index: &str) -> (HttpServer, QueryLanguage::elastic_querydsl, )); - let server = HttpServer::new(config, query_engine, store); + let server = HttpServer::new(config, query_engine, store, None); let actual_port = server .start_test_server() .await @@ -173,7 +173,7 @@ async fn setup_test_server_sql(elasticsearch_port: u16, index: &str) -> (HttpSer QueryLanguage::elastic_sql, )); - let server = HttpServer::new(config, query_engine, store); + let server = HttpServer::new(config, query_engine, store, None); let actual_port = server .start_test_server() .await @@ -408,7 +408,7 @@ async fn test_forwarding_disabled() { QueryLanguage::elastic_querydsl, )); - let server = HttpServer::new(config, query_engine, store); + let server = HttpServer::new(config, query_engine, store, None); let server_port = server .start_test_server() .await @@ -576,7 +576,7 @@ async fn test_sql_forwarding_disabled() { QueryLanguage::elastic_sql, )); - let server = HttpServer::new(config, query_engine, store); + let server = HttpServer::new(config, query_engine, store, None); let server_port = server .start_test_server() .await diff --git a/asap-query-engine/src/tests/prometheus_forwarding_tests.rs b/asap-query-engine/src/tests/prometheus_forwarding_tests.rs index ab01b05..c65f5a6 100644 --- a/asap-query-engine/src/tests/prometheus_forwarding_tests.rs +++ b/asap-query-engine/src/tests/prometheus_forwarding_tests.rs @@ -88,7 +88,7 @@ async fn setup_test_server(prometheus_port: u16) -> (HttpServer, u16) { crate::data_model::QueryLanguage::promql, )); - let server = HttpServer::new(config, query_engine, store); + let server = HttpServer::new(config, query_engine, store, None); let actual_port = server .start_test_server() .await @@ -182,7 +182,7 @@ async fn test_forwarding_disabled() { crate::data_model::QueryLanguage::promql, )); - let server = HttpServer::new(config, query_engine, store); + let server = HttpServer::new(config, query_engine, store, None); let server_port = server .start_test_server() .await @@ -238,7 +238,7 @@ async fn test_prometheus_server_unreachable() { crate::data_model::QueryLanguage::promql, )); - let server = HttpServer::new(config, query_engine, store); + let server = HttpServer::new(config, query_engine, store, None); let server_port = server .start_test_server() .await