From cadf2a7a048dc1077ac59aa1fd6ace0eb991e55b Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Fri, 10 Apr 2026 10:21:01 -0400 Subject: [PATCH 1/4] ref 1 --- asap-query-engine/src/data_model/traits.rs | 14 + .../src/engines/simple_engine.rs | 338 +++++++----------- .../src/engines/window_merger.rs | 9 + .../count_min_sketch_accumulator.rs | 13 + .../count_min_sketch_with_heap_accumulator.rs | 13 + .../datasketches_kll_accumulator.rs | 10 + .../delta_set_aggregator_accumulator.rs | 13 + .../hydra_kll_accumulator.rs | 13 + .../increase_accumulator.rs | 10 + .../min_max_accumulator.rs | 10 + .../multiple_increase_accumulator.rs | 13 + .../multiple_min_max_accumulator.rs | 13 + .../multiple_sum_accumulator.rs | 13 + .../set_aggregator_accumulator.rs | 13 + .../precompute_operators/sum_accumulator.rs | 10 + 15 files changed, 292 insertions(+), 213 deletions(-) diff --git a/asap-query-engine/src/data_model/traits.rs b/asap-query-engine/src/data_model/traits.rs index c47fcef..064619f 100644 --- a/asap-query-engine/src/data_model/traits.rs +++ b/asap-query-engine/src/data_model/traits.rs @@ -30,6 +30,20 @@ pub trait AggregateCore: SerializableToSink + Send + Sync { /// Get all keys stored in this accumulator fn get_keys(&self) -> Option>; + + /// Dispatch a statistic query without downcasting. + /// + /// Replaces the 12-arm `match get_accumulator_type()` in the engine. + /// Single-subpopulation types ignore `key`; multiple-subpopulation types + /// require it and return `Err` when it is `None`. + /// Special cases (DeltaSetAggregator, SetAggregator) fall back to a + /// cardinality value when `key` is `None`. + fn query_statistic( + &self, + statistic: Statistic, + key: &Option, + query_kwargs: &HashMap, + ) -> Result>; } /// Trait for accumulators that support a single subpopulation diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine.rs index 64da6eb..eb4650b 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine.rs @@ -1155,7 +1155,13 @@ impl SimpleEngine { query_kwargs, }; - let agg_info = self.get_aggregation_id_info(query_config); + let agg_info = self + .get_aggregation_id_info(query_config) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()?; let query_plan = self .create_store_query_plan(&metric, ×tamps, &agg_info) @@ -1594,6 +1600,32 @@ impl SimpleEngine { } } + /// Parse a lowercase aggregation name into exactly one `Statistic`. + /// + /// Returns `None` (with a warning) if the name is not a recognised + /// `AggregationOperator` or if it maps to a number of statistics other + /// than one. Centralises the three previously-scattered copies of this + /// logic, which had inconsistent error handling (silent empty vec, panic, + /// and warn+return-None). + fn parse_single_statistic(statistic_name: &str) -> Option { + let stats = statistic_name + .parse::() + .map(|o| o.to_statistics()) + .unwrap_or_else(|_| { + warn!("Unsupported statistic name: '{}'", statistic_name); + vec![] + }); + if stats.len() != 1 { + warn!( + "Expected exactly one statistic for '{}', found {}", + statistic_name, + stats.len() + ); + return None; + } + stats.into_iter().next() + } + /// Extract QueryRequirements from a parsed SQL match result. /// Used as the fallback path when no query_configs entry is found. fn build_query_requirements_sql( @@ -1612,10 +1644,9 @@ impl SimpleEngine { _ => query_data.aggregation_info.get_name().to_lowercase(), }; - let statistics: Vec = statistic_name - .parse::() - .map(|o| o.to_statistics()) - .unwrap_or_default(); + let statistics: Vec = Self::parse_single_statistic(&statistic_name) + .into_iter() + .collect(); let data_range_ms = match query_pattern_type { QueryPatternType::OnlySpatial => None, @@ -1641,69 +1672,84 @@ impl SimpleEngine { } } - fn get_aggregation_id_info(&self, query_config: &QueryConfig) -> AggregationIdInfo { + fn get_aggregation_id_info( + &self, + query_config: &QueryConfig, + ) -> Result { let query_config_aggregations = &query_config.aggregations; + + if query_config_aggregations.is_empty() { + return Err("Query config has no aggregations defined".to_string()); + } + if query_config_aggregations.len() > 2 { + return Err("Query config with > 2 aggregations is not supported".to_string()); + } + let mut aggregation_id_for_key: Option = None; let mut aggregation_id_for_value: Option = None; let mut aggregation_type_for_key: Option = None; let mut aggregation_type_for_value: Option = None; - if query_config_aggregations.is_empty() { - panic!("Query config for query has no aggregations defined",); - } else if query_config_aggregations.len() > 2 { - panic!("Query config with > 2 aggregations is not supported"); - } else if query_config_aggregations.len() == 2 { + if query_config_aggregations.len() == 2 { for aggregation in query_config_aggregations { let aggregation_type = self .streaming_config .get_aggregation_config(aggregation.aggregation_id) - .map(|config| config.aggregation_type); + .map(|config| config.aggregation_type) + .ok_or_else(|| { + format!( + "No streaming config for aggregation_id {}", + aggregation.aggregation_id + ) + })?; if matches!( - aggregation_type.as_ref().unwrap(), + aggregation_type, AggregationType::DeltaSetAggregator | AggregationType::SetAggregator ) { if aggregation_id_for_key.is_some() { - panic!("Aggregation ID for key must be None"); - } - if aggregation_type_for_key.is_some() { - panic!("Aggregation type for key must be None"); + return Err( + "Query config has two key-type aggregations (expected at most one)" + .to_string(), + ); } aggregation_id_for_key = Some(aggregation.aggregation_id); - aggregation_type_for_key = aggregation_type; + aggregation_type_for_key = Some(aggregation_type); } else { if aggregation_id_for_value.is_some() { - panic!("Aggregation ID for value must be None"); + return Err( + "Query config has two value-type aggregations (expected at most one)" + .to_string(), + ); } aggregation_id_for_value = Some(aggregation.aggregation_id); - aggregation_type_for_value = aggregation_type; + aggregation_type_for_value = Some(aggregation_type); } } } else { - aggregation_id_for_key = Some(query_config_aggregations[0].aggregation_id); - aggregation_id_for_value = aggregation_id_for_key; - // aggregation_type_for_key = Some(query_config_aggregations[0].aggregation_type.clone()); - aggregation_type_for_key = self + // Single aggregation: key and value share the same aggregation + let id = query_config_aggregations[0].aggregation_id; + let agg_type = self .streaming_config - .get_aggregation_config(aggregation_id_for_key.unwrap()) - .map(|config| config.aggregation_type); - aggregation_type_for_value = self - .streaming_config - .get_aggregation_config(aggregation_id_for_value.unwrap()) - .map(|config| config.aggregation_type); - } - - // check for None - if aggregation_id_for_key.is_none() || aggregation_id_for_value.is_none() { - panic!("Aggregation IDs must not be None"); + .get_aggregation_config(id) + .map(|config| config.aggregation_type) + .ok_or_else(|| format!("No streaming config for aggregation_id {id}"))?; + aggregation_id_for_key = Some(id); + aggregation_id_for_value = Some(id); + aggregation_type_for_key = Some(agg_type); + aggregation_type_for_value = Some(agg_type); } - AggregationIdInfo { - aggregation_id_for_key: aggregation_id_for_key.unwrap(), - aggregation_id_for_value: aggregation_id_for_value.unwrap(), - aggregation_type_for_key: aggregation_type_for_key.unwrap(), - aggregation_type_for_value: aggregation_type_for_value.unwrap(), - } + Ok(AggregationIdInfo { + aggregation_id_for_key: aggregation_id_for_key + .ok_or("aggregation_id_for_key was not set")?, + aggregation_id_for_value: aggregation_id_for_value + .ok_or("aggregation_id_for_value was not set")?, + aggregation_type_for_key: aggregation_type_for_key + .ok_or("aggregation_type_for_key was not set")?, + aggregation_type_for_value: aggregation_type_for_value + .ok_or("aggregation_type_for_value was not set")?, + }) } pub fn handle_query_sql( @@ -1875,22 +1921,10 @@ impl SimpleEngine { } }; - let statistics_to_compute: Vec = statistic_name - .parse::() - .map(|o| o.to_statistics()) - .unwrap_or_else(|_| panic!("Unsupported statistic: {}", statistic_name)); - - if statistics_to_compute.len() != 1 { - warn!( - "Expected exactly one statistic to compute, found {}", - statistics_to_compute.len() - ); - return None; - } - let statistic_to_compute = statistics_to_compute.first().unwrap(); + let statistic_to_compute = Self::parse_single_statistic(&statistic_name)?; let query_kwargs = self - .build_query_kwargs_sql(statistic_to_compute, &match_result) + .build_query_kwargs_sql(&statistic_to_compute, &match_result) .map_err(|e| { warn!("{}", e); e @@ -1900,7 +1934,7 @@ impl SimpleEngine { // Create query metadata let metadata = QueryMetadata { query_output_labels: query_output_labels.clone(), - statistic_to_compute: *statistic_to_compute, + statistic_to_compute, query_kwargs: query_kwargs.clone(), }; @@ -1913,6 +1947,11 @@ impl SimpleEngine { self.find_query_config_sql(&query_data) { self.get_aggregation_id_info(config) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()? } else { warn!("No query_config entry for SQL query. Attempting capability-based matching."); let requirements = self.build_query_requirements_sql(&match_result, query_pattern_type); @@ -1997,22 +2036,10 @@ impl SimpleEngine { .get_name() .to_lowercase(); - let statistics_to_compute: Vec = statistic_name - .parse::() - .map(|o| o.to_statistics()) - .unwrap_or_else(|_| panic!("Unsupported statistic: {}", statistic_name)); - - if statistics_to_compute.len() != 1 { - warn!( - "Expected exactly one statistic to compute, found {}", - statistics_to_compute.len() - ); - return None; - } - let statistic_to_compute = statistics_to_compute.first().unwrap(); + let statistic_to_compute = Self::parse_single_statistic(&statistic_name)?; let query_kwargs = self - .build_query_kwargs_sql(statistic_to_compute, match_result) + .build_query_kwargs_sql(&statistic_to_compute, match_result) .map_err(|e| { warn!("{}", e); e @@ -2021,7 +2048,7 @@ impl SimpleEngine { let metadata = QueryMetadata { query_output_labels: query_output_labels.clone(), - statistic_to_compute: *statistic_to_compute, + statistic_to_compute, query_kwargs: query_kwargs.clone(), }; @@ -2042,6 +2069,11 @@ impl SimpleEngine { self.find_query_config_sql(query_data) { self.get_aggregation_id_info(config) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()? } else { warn!( "No query_config entry for SQL spatio-temporal query. Attempting capability-based matching." @@ -2148,7 +2180,13 @@ impl SimpleEngine { // TODO: Figure out how to handle query configuration for ElasticSearch queries. let query_config = self.find_query_config(&query)?; - let agg_info = self.get_aggregation_id_info(query_config); + let agg_info = self + .get_aggregation_id_info(query_config) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()?; let do_merge = true; // No "instant" queries in ElasticSearch supported for now, so we always need to merge. @@ -2809,6 +2847,11 @@ impl SimpleEngine { // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. let agg_info: AggregationIdInfo = if let Some(config) = self.find_query_config(&query) { self.get_aggregation_id_info(config) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()? } else { warn!( "No query_config entry for PromQL query '{}'. Attempting capability-based matching.", @@ -3109,8 +3152,6 @@ impl SimpleEngine { Ok(keys.into_iter().take(k).collect()) } - /// Query a precompute for a specific statistic - /// This follows the Python approach where precompute.query(statistic, key) is called fn query_precompute_for_statistic( &self, precompute: &dyn AggregateCore, @@ -3118,145 +3159,7 @@ impl SimpleEngine { key: &Option, query_kwargs: &HashMap, ) -> Result> { - // Handle different accumulator types and statistics using the trait methods - // TODO: change this logic to just check Single vs MultipleSubpopulationAggregate - match precompute.get_accumulator_type() { - AggregationType::Sum => { - if let Some(sum_acc) = precompute.as_any().downcast_ref::() { - use crate::data_model::SingleSubpopulationAggregate; - sum_acc.query(*statistic, None) - } else { - Err("Failed to downcast to SumAccumulator".into()) - } - } - AggregationType::MinMax => { - if let Some(minmax_acc) = precompute.as_any().downcast_ref::() { - use crate::data_model::SingleSubpopulationAggregate; - minmax_acc.query(*statistic, None) - } else { - Err("Failed to downcast to MinMaxAccumulator".into()) - } - } - AggregationType::Increase => { - if let Some(inc_acc) = precompute.as_any().downcast_ref::() { - use crate::data_model::SingleSubpopulationAggregate; - inc_acc.query(*statistic, None) - } else { - Err("Failed to downcast to IncreaseAccumulator".into()) - } - } - AggregationType::MultipleSum => { - if let Some(multi_sum_acc) = precompute.as_any().downcast_ref::() { - if let Some(key_val) = key { - use crate::data_model::MultipleSubpopulationAggregate; - multi_sum_acc.query(*statistic, key_val, Some(query_kwargs)) - } else { - Err("Key required for MultipleSumAccumulator".into()) - } - } else { - Err("Failed to downcast to MultipleSumAccumulator".into()) - } - } - AggregationType::MultipleMinMax => { - if let Some(multi_minmax_acc) = precompute.as_any().downcast_ref::() { - if let Some(key_val) = key { - use crate::data_model::MultipleSubpopulationAggregate; - multi_minmax_acc.query(*statistic, key_val, Some(query_kwargs)) - } else { - Err("Key required for MultipleMinMaxAccumulator".into()) - } - } else { - Err("Failed to downcast to MultipleMinMaxAccumulator".into()) - } - } - AggregationType::MultipleIncrease => { - if let Some(multi_inc_acc) = precompute.as_any().downcast_ref::() { - if let Some(key_val) = key { - use crate::data_model::MultipleSubpopulationAggregate; - multi_inc_acc.query(*statistic, key_val, Some(query_kwargs)) - } else { - Err("Key required for MultipleIncreaseAccumulator".into()) - } - } else { - Err("Failed to downcast to MultipleIncreaseAccumulator".into()) - } - } - AggregationType::CountMinSketch => { - if let Some(cms_acc) = precompute.as_any().downcast_ref::() { - use crate::data_model::MultipleSubpopulationAggregate; - if let Some(key_val) = key { - cms_acc.query(*statistic, key_val, Some(query_kwargs)) - } else { - Err("Key required for CountMinSketchAccumulator".into()) - } - } else { - Err("Failed to downcast to CountMinSketchAccumulator".into()) - } - } - AggregationType::CountMinSketchWithHeap => { - if let Some(cms_heap_acc) = precompute.as_any().downcast_ref::() { - use crate::data_model::MultipleSubpopulationAggregate; - if let Some(key_val) = key { - cms_heap_acc.query(*statistic, key_val, Some(query_kwargs)) - } else { - Err("Key required for CountMinSketchWithHeapAccumulator".into()) - } - } else { - Err("Failed to downcast to CountMinSketchWithHeapAccumulator".into()) - } - } - AggregationType::DatasketchesKLL => { - if let Some(kll_acc) = precompute.as_any().downcast_ref::() { - use crate::data_model::SingleSubpopulationAggregate; - kll_acc.query(*statistic, Some(query_kwargs)) - } else { - Err("Failed to downcast to DatasketchesKLLAccumulator".into()) - } - } - AggregationType::HydraKLL => { - if let Some(hydra_kll_acc) = precompute.as_any() - .downcast_ref::() - { - if let Some(key_val) = key { - use crate::data_model::MultipleSubpopulationAggregate; - hydra_kll_acc.query(*statistic, key_val, Some(query_kwargs)) - } else { - Err("Key required for HydraKllSketchAccumulator".into()) - } - } else { - Err("Failed to downcast to HydraKllSketchAccumulator".into()) - } - } - AggregationType::DeltaSetAggregator => { - if let Some(delta_acc) = precompute.as_any().downcast_ref::() { - if let Some(key_val) = key { - use crate::data_model::MultipleSubpopulationAggregate; - delta_acc.query(*statistic, key_val, Some(query_kwargs)) - } else { - // For DeltaSetAggregatorAccumulator without a key, return the union size - Ok((delta_acc.added.union(&delta_acc.removed).count()) as f64) - } - } else { - Err("Failed to downcast to DeltaSetAggregatorAccumulator".into()) - } - } - AggregationType::SetAggregator => { - if let Some(set_acc) = precompute.as_any().downcast_ref::() { - if let Some(key_val) = key { - use crate::data_model::MultipleSubpopulationAggregate; - set_acc.query(*statistic, key_val, Some(query_kwargs)) - } else { - // For SetAggregatorAccumulator without a key, return the set size - Ok(set_acc.added.len() as f64) - } - } else { - Err("Failed to downcast to SetAggregatorAccumulator".into()) - } - } - other => { - Err(format!("Unknown accumulator type: {other:?}").into()) - } - } + precompute.query_statistic(*statistic, key, query_kwargs) } // ============================================================ @@ -3792,6 +3695,15 @@ mod range_query_tests { fn get_keys(&self) -> Option> { None } + + fn query_statistic( + &self, + _statistic: promql_utilities::query_logics::enums::Statistic, + _key: &Option, + _query_kwargs: &std::collections::HashMap, + ) -> Result> { + Err("MockBucketAccumulator does not support query_statistic".into()) + } } /// Simulates the sliding window loop from execute_range_query_pipeline diff --git a/asap-query-engine/src/engines/window_merger.rs b/asap-query-engine/src/engines/window_merger.rs index 56e75b2..552249e 100644 --- a/asap-query-engine/src/engines/window_merger.rs +++ b/asap-query-engine/src/engines/window_merger.rs @@ -166,6 +166,15 @@ mod tests { fn get_keys(&self) -> Option> { None } + + fn query_statistic( + &self, + _statistic: promql_utilities::query_logics::enums::Statistic, + _key: &Option, + _query_kwargs: &std::collections::HashMap, + ) -> Result> { + Err("MockSumAccumulator does not support query_statistic".into()) + } } // Basic structure tests diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs index bb44540..fe3ec33 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs @@ -212,6 +212,19 @@ impl AggregateCore for CountMinSketchAccumulator { fn get_keys(&self) -> Option> { None } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + key: &Option, + query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::MultipleSubpopulationAggregate; + let key_val = key + .as_ref() + .ok_or("Key required for CountMinSketchAccumulator")?; + self.query(statistic, key_val, Some(query_kwargs)) + } } impl MultipleSubpopulationAggregate for CountMinSketchAccumulator { diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs index f2751c3..76ece28 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs @@ -178,6 +178,19 @@ impl AggregateCore for CountMinSketchWithHeapAccumulator { fn get_keys(&self) -> Option> { Some(self.get_topk_keys()) } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + key: &Option, + query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::MultipleSubpopulationAggregate; + let key_val = key + .as_ref() + .ok_or("Key required for CountMinSketchWithHeapAccumulator")?; + self.query(statistic, key_val, Some(query_kwargs)) + } } impl MultipleSubpopulationAggregate for CountMinSketchWithHeapAccumulator { diff --git a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs index 1781de0..52d9d8b 100644 --- a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs @@ -200,6 +200,16 @@ impl AggregateCore for DatasketchesKLLAccumulator { fn get_keys(&self) -> Option> { None } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + _key: &Option, + query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::SingleSubpopulationAggregate; + self.query(statistic, Some(query_kwargs)) + } } impl SingleSubpopulationAggregate for DatasketchesKLLAccumulator { diff --git a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs index f0ae9f9..e8b1b1b 100644 --- a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs @@ -250,6 +250,19 @@ impl AggregateCore for DeltaSetAggregatorAccumulator { } Some(self.added.iter().cloned().collect()) } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + key: &Option, + query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::MultipleSubpopulationAggregate; + let key_val = key + .as_ref() + .ok_or("Key required for DeltaSetAggregatorAccumulator")?; + self.query(statistic, key_val, Some(query_kwargs)) + } } impl MultipleSubpopulationAggregate for DeltaSetAggregatorAccumulator { diff --git a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs index 96cf19d..0b2e924 100644 --- a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs @@ -116,6 +116,19 @@ impl AggregateCore for HydraKllSketchAccumulator { fn get_keys(&self) -> Option> { None } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + key: &Option, + query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::MultipleSubpopulationAggregate; + let key_val = key + .as_ref() + .ok_or("Key required for HydraKllSketchAccumulator")?; + self.query(statistic, key_val, Some(query_kwargs)) + } } impl MultipleSubpopulationAggregate for HydraKllSketchAccumulator { diff --git a/asap-query-engine/src/precompute_operators/increase_accumulator.rs b/asap-query-engine/src/precompute_operators/increase_accumulator.rs index 5fcbe01..6c8c029 100644 --- a/asap-query-engine/src/precompute_operators/increase_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/increase_accumulator.rs @@ -248,6 +248,16 @@ impl AggregateCore for IncreaseAccumulator { fn get_keys(&self) -> Option> { None } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + _key: &Option, + _query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::SingleSubpopulationAggregate; + self.query(statistic, None) + } } impl SingleSubpopulationAggregate for IncreaseAccumulator { diff --git a/asap-query-engine/src/precompute_operators/min_max_accumulator.rs b/asap-query-engine/src/precompute_operators/min_max_accumulator.rs index 3b70087..b476362 100644 --- a/asap-query-engine/src/precompute_operators/min_max_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/min_max_accumulator.rs @@ -187,6 +187,16 @@ impl AggregateCore for MinMaxAccumulator { fn get_keys(&self) -> Option> { None } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + _key: &Option, + _query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::SingleSubpopulationAggregate; + self.query(statistic, None) + } } impl SingleSubpopulationAggregate for MinMaxAccumulator { diff --git a/asap-query-engine/src/precompute_operators/multiple_increase_accumulator.rs b/asap-query-engine/src/precompute_operators/multiple_increase_accumulator.rs index 82b77bb..6ee7040 100644 --- a/asap-query-engine/src/precompute_operators/multiple_increase_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/multiple_increase_accumulator.rs @@ -285,6 +285,19 @@ impl AggregateCore for MultipleIncreaseAccumulator { fn get_keys(&self) -> Option> { Some(self.increases.keys().cloned().collect()) } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + key: &Option, + query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::MultipleSubpopulationAggregate; + let key_val = key + .as_ref() + .ok_or("Key required for MultipleIncreaseAccumulator")?; + self.query(statistic, key_val, Some(query_kwargs)) + } } impl MultipleSubpopulationAggregate for MultipleIncreaseAccumulator { diff --git a/asap-query-engine/src/precompute_operators/multiple_min_max_accumulator.rs b/asap-query-engine/src/precompute_operators/multiple_min_max_accumulator.rs index a8f14a3..2984cf3 100644 --- a/asap-query-engine/src/precompute_operators/multiple_min_max_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/multiple_min_max_accumulator.rs @@ -243,6 +243,19 @@ impl AggregateCore for MultipleMinMaxAccumulator { fn get_keys(&self) -> Option> { Some(self.values.keys().cloned().collect()) } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + key: &Option, + query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::MultipleSubpopulationAggregate; + let key_val = key + .as_ref() + .ok_or("Key required for MultipleMinMaxAccumulator")?; + self.query(statistic, key_val, Some(query_kwargs)) + } } impl MultipleSubpopulationAggregate for MultipleMinMaxAccumulator { diff --git a/asap-query-engine/src/precompute_operators/multiple_sum_accumulator.rs b/asap-query-engine/src/precompute_operators/multiple_sum_accumulator.rs index c2ce83a..1505b93 100644 --- a/asap-query-engine/src/precompute_operators/multiple_sum_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/multiple_sum_accumulator.rs @@ -233,6 +233,19 @@ impl AggregateCore for MultipleSumAccumulator { fn get_keys(&self) -> Option> { Some(self.sums.keys().cloned().collect()) } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + key: &Option, + query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::MultipleSubpopulationAggregate; + let key_val = key + .as_ref() + .ok_or("Key required for MultipleSumAccumulator")?; + self.query(statistic, key_val, Some(query_kwargs)) + } } impl MultipleSubpopulationAggregate for MultipleSumAccumulator { diff --git a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs index e93ae66..4ec46c5 100644 --- a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs @@ -182,6 +182,19 @@ impl AggregateCore for SetAggregatorAccumulator { fn get_keys(&self) -> Option> { Some(self.added.iter().cloned().collect()) } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + key: &Option, + query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::MultipleSubpopulationAggregate; + let key_val = key + .as_ref() + .ok_or("Key required for SetAggregatorAccumulator")?; + self.query(statistic, key_val, Some(query_kwargs)) + } } impl MultipleSubpopulationAggregate for SetAggregatorAccumulator { diff --git a/asap-query-engine/src/precompute_operators/sum_accumulator.rs b/asap-query-engine/src/precompute_operators/sum_accumulator.rs index 717bcd6..fa32ef2 100644 --- a/asap-query-engine/src/precompute_operators/sum_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/sum_accumulator.rs @@ -121,6 +121,16 @@ impl AggregateCore for SumAccumulator { fn get_keys(&self) -> Option> { None } + + fn query_statistic( + &self, + statistic: promql_utilities::query_logics::enums::Statistic, + _key: &Option, + _query_kwargs: &std::collections::HashMap, + ) -> Result> { + use crate::data_model::SingleSubpopulationAggregate; + self.query(statistic, None) + } } impl SingleSubpopulationAggregate for SumAccumulator { From 9295c8e0c5a9f6d23669cd57163f04fc38695b48 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Fri, 10 Apr 2026 10:40:27 -0400 Subject: [PATCH 2/4] refactor engine code --- .../src/ast_matching/sqlpattern_matcher.rs | 11 + .../src/engines/simple_engine.rs | 484 ++++++------------ 2 files changed, 181 insertions(+), 314 deletions(-) diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs index 1aac0da..3e5f965 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs @@ -71,6 +71,17 @@ impl SQLQuery { pub fn is_valid(&self) -> bool { self.error.is_none() } + + /// The outer (spatial / single) query's data — always `query_data[0]`. + pub fn outer_data(&self) -> &SQLQueryData { + &self.query_data[0] + } + + /// The inner (temporal) query's data for nested queries — always `query_data[1]`. + /// Only valid for `OneTemporalOneSpatial` patterns. + pub fn inner_data(&self) -> &SQLQueryData { + &self.query_data[1] + } } pub struct SQLPatternMatcher { diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine.rs index eb4650b..7ce77df 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine.rs @@ -396,12 +396,12 @@ impl SimpleEngine { match query_pattern_type { QueryPatternType::OnlyTemporal => { let scrape_intervals = - match_result.query_data[0].time_info.clone().get_duration() as u64; + match_result.outer_data().time_info.clone().get_duration() as u64; end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000) } QueryPatternType::OneTemporalOneSpatial => { let scrape_intervals = - match_result.query_data[1].time_info.clone().get_duration() as u64; + match_result.inner_data().time_info.clone().get_duration() as u64; end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000) } QueryPatternType::OnlySpatial => { @@ -1088,24 +1088,53 @@ impl SimpleEngine { let (query_pattern_type, match_result) = found_match?; - let (metric, spatial_filter) = get_metric_and_spatial_filter(&match_result); + let agg_info = self + .get_aggregation_id_info(query_config) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()?; + + self.build_promql_execution_context_tail( + &match_result, + query_pattern_type, + query_time, + agg_info, + ) + } + + /// Shared context-building tail for both PromQL context builders. + /// + /// Called by `build_query_execution_context_from_ast` and + /// `build_query_execution_context_promql` after pattern matching and + /// `agg_info` resolution are complete. Computes labels, statistics, + /// kwargs, metadata, query plan, and the final `QueryExecutionContext`. + fn build_promql_execution_context_tail( + &self, + match_result: &PromQLMatchResult, + query_pattern_type: QueryPatternType, + query_time: u64, + agg_info: AggregationIdInfo, + ) -> Option { + let (metric, spatial_filter) = get_metric_and_spatial_filter(match_result); let promql_schema = match &self.inference_config.schema { SchemaConfig::PromQL(schema) => schema, _ => return None, }; - let all_labels = promql_schema - .get_labels(&metric) - .cloned() - .unwrap_or_else(|| { + let all_labels = match promql_schema.get_labels(&metric).cloned() { + Some(labels) => labels, + None => { warn!("No metric configuration found for '{}'", metric); - panic!("No metric configuration found"); - }); + return None; + } + }; let mut query_output_labels = match query_pattern_type { QueryPatternType::OnlyTemporal => all_labels.clone(), QueryPatternType::OnlySpatial => { - get_spatial_aggregation_output_labels(&match_result, &all_labels) + get_spatial_aggregation_output_labels(match_result, &all_labels) } QueryPatternType::OneTemporalOneSpatial => { let temporal_aggregation = match_result.get_function_name().unwrap(); @@ -1116,7 +1145,7 @@ impl SimpleEngine { .zip(spatial_aggregation.parse::().ok()) .is_some_and(|(f, o)| get_is_collapsable(f, o)); if collapsable { - get_spatial_aggregation_output_labels(&match_result, &all_labels) + get_spatial_aggregation_output_labels(match_result, &all_labels) } else { all_labels.clone() } @@ -1124,14 +1153,15 @@ impl SimpleEngine { }; let timestamps = - self.calculate_query_timestamps_promql(query_time, query_pattern_type, &match_result); + self.calculate_query_timestamps_promql(query_time, query_pattern_type, match_result); - let statistics_to_compute = get_statistics_to_compute(query_pattern_type, &match_result); + let statistics_to_compute = get_statistics_to_compute(query_pattern_type, match_result); if statistics_to_compute.len() != 1 { - panic!( - "Expected exactly one statistic, found {}", + warn!( + "Expected exactly one statistic to compute, found {}", statistics_to_compute.len() ); + return None; } let statistic_to_compute = statistics_to_compute.first().unwrap(); @@ -1142,7 +1172,7 @@ impl SimpleEngine { } let query_kwargs = self - .build_query_kwargs_promql(statistic_to_compute, query_pattern_type, &match_result) + .build_query_kwargs_promql(statistic_to_compute, query_pattern_type, match_result) .map_err(|e| { warn!("{}", e); e @@ -1155,14 +1185,6 @@ impl SimpleEngine { query_kwargs, }; - let agg_info = self - .get_aggregation_id_info(query_config) - .map_err(|e| { - warn!("{}", e); - e - }) - .ok()?; - let query_plan = self .create_store_query_plan(&metric, ×tamps, &agg_info) .map_err(|e| { @@ -1187,7 +1209,7 @@ impl SimpleEngine { .unwrap_or_else(KeyByLabelNames::empty); Some(QueryExecutionContext { - metric: metric.clone(), + metric, metadata, store_plan: query_plan, agg_info, @@ -1261,29 +1283,17 @@ impl SimpleEngine { let rhs = binary.rhs.as_ref(); let op = &binary.op; - // Check for scalar on right - if let Expr::NumberLiteral(nl) = rhs { - let (vector_plan, label_names) = self.build_arm_logical_plan(lhs, time)?; - let combined = - build_scalar_plan(vector_plan, nl.val, op, false, label_names.clone()).ok()?; - let results = tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(self.execute_logical_plan( - combined, - label_names.clone(), - "", - &Statistic::Sum, - )) - }) - .ok()?; - let output_labels = KeyByLabelNames::new(label_names); - return Some((output_labels, QueryResult::vector(results, query_time))); - } - - // Check for scalar on left - if let Expr::NumberLiteral(nl) = lhs { - let (vector_plan, label_names) = self.build_arm_logical_plan(rhs, time)?; + // Scalar case: either side may be a numeric literal + let scalar_case: Option<(f64, &Expr, bool)> = match (lhs, rhs) { + (_, Expr::NumberLiteral(nl)) => Some((nl.val, lhs, false)), + (Expr::NumberLiteral(nl), _) => Some((nl.val, rhs, true)), + _ => None, + }; + if let Some((scalar, vector_arm, scalar_on_left)) = scalar_case { + let (vector_plan, label_names) = self.build_arm_logical_plan(vector_arm, time)?; let combined = - build_scalar_plan(vector_plan, nl.val, op, true, label_names.clone()).ok()?; + build_scalar_plan(vector_plan, scalar, op, scalar_on_left, label_names.clone()) + .ok()?; let results = tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(self.execute_logical_plan( combined, @@ -1293,8 +1303,10 @@ impl SimpleEngine { )) }) .ok()?; - let output_labels = KeyByLabelNames::new(label_names); - return Some((output_labels, QueryResult::vector(results, query_time))); + return Some(( + KeyByLabelNames::new(label_names), + QueryResult::vector(results, query_time), + )); } // Vector–vector @@ -1429,40 +1441,29 @@ impl SimpleEngine { let rhs = binary.rhs.as_ref(); let op = &binary.op; - // Scalar on right: evaluate vector arm, apply scalar per sample - if let Expr::NumberLiteral(nl) = rhs { - let (lhs_ctx, lhs_labels) = self.build_arm_range_context(lhs, start, end, step)?; - let lhs_results = self.execute_range_query_pipeline(&lhs_ctx).ok()?; - let scalar = nl.val; - let combined: Vec = lhs_results - .into_iter() - .map(|mut elem| { - for s in &mut elem.samples { - s.value = Self::apply_range_binary_op(op, s.value, scalar); - } - elem - }) - .collect(); - let output_labels = KeyByLabelNames::new(lhs_labels); - return Some((output_labels, QueryResult::matrix(combined))); - } - - // Scalar on left: evaluate vector arm, apply scalar per sample - if let Expr::NumberLiteral(nl) = lhs { - let (rhs_ctx, rhs_labels) = self.build_arm_range_context(rhs, start, end, step)?; - let rhs_results = self.execute_range_query_pipeline(&rhs_ctx).ok()?; - let scalar = nl.val; - let combined: Vec = rhs_results + // Scalar case: either side may be a numeric literal + let scalar_case: Option<(f64, &Expr, bool)> = match (lhs, rhs) { + (_, Expr::NumberLiteral(nl)) => Some((nl.val, lhs, false)), + (Expr::NumberLiteral(nl), _) => Some((nl.val, rhs, true)), + _ => None, + }; + if let Some((scalar, vector_arm, scalar_on_left)) = scalar_case { + let (ctx, labels) = self.build_arm_range_context(vector_arm, start, end, step)?; + let results = self.execute_range_query_pipeline(&ctx).ok()?; + let combined: Vec = results .into_iter() .map(|mut elem| { for s in &mut elem.samples { - s.value = Self::apply_range_binary_op(op, scalar, s.value); + s.value = if scalar_on_left { + Self::apply_range_binary_op(op, scalar, s.value) + } else { + Self::apply_range_binary_op(op, s.value, scalar) + }; } elem }) .collect(); - let output_labels = KeyByLabelNames::new(rhs_labels); - return Some((output_labels, QueryResult::matrix(combined))); + return Some((KeyByLabelNames::new(labels), QueryResult::matrix(combined))); } // Vector-vector: evaluate both arms, join by label key, apply op per matching timestamp @@ -1633,11 +1634,12 @@ impl SimpleEngine { match_result: &SQLQuery, query_pattern_type: QueryPatternType, ) -> QueryRequirements { - let query_data = &match_result.query_data[0]; + let query_data = match_result.outer_data(); let metric = query_data.metric.clone(); let statistic_name = match query_pattern_type { - QueryPatternType::OneTemporalOneSpatial => match_result.query_data[1] + QueryPatternType::OneTemporalOneSpatial => match_result + .inner_data() .aggregation_info .get_name() .to_lowercase(), @@ -1656,7 +1658,7 @@ impl SimpleEngine { } QueryPatternType::OneTemporalOneSpatial => { let scrape_intervals = - match_result.query_data[1].time_info.clone().get_duration() as u64; + match_result.inner_data().time_info.clone().get_duration() as u64; Some(scrape_intervals * self.prometheus_scrape_interval * 1000) } }; @@ -1758,15 +1760,24 @@ impl SimpleEngine { time: f64, ) -> Option<(KeyByLabelNames, QueryResult)> { let context = self.build_query_execution_context_sql(query, time)?; - // Execute complete query pipeline + self.execute_context(context, false) + } + + /// Execute the query pipeline for an already-built context. + /// + /// Shared by `handle_query_sql`, `handle_query_elastic`, and `handle_query_promql`. + fn execute_context( + &self, + context: QueryExecutionContext, + enable_topk: bool, + ) -> Option<(KeyByLabelNames, QueryResult)> { let results = self - .execute_query_pipeline(&context, false) // SQL: topk disabled + .execute_query_pipeline(&context, enable_topk) .map_err(|e| { warn!("Query execution failed: {}", e); e }) .ok()?; - Some(( context.metadata.query_output_labels, QueryResult::vector(results, context.query_time), @@ -1841,7 +1852,7 @@ impl SimpleEngine { // so we need to use the inner (temporal) query's time_info to compute query_time let query_time = match query_pattern_type { QueryPatternType::OneTemporalOneSpatial => { - let inner_time_info = &match_result.query_data[1].time_info; + let inner_time_info = &match_result.inner_data().time_info; Self::convert_query_time_to_data_time( inner_time_info.get_start() + inner_time_info.get_duration(), ) @@ -1872,17 +1883,17 @@ impl SimpleEngine { // Potentially change SQLQueryType 1 => { // For non-nested queries, output associated labels - let labels = &match_result.query_data[0].labels; + let labels = &match_result.outer_data().labels; KeyByLabelNames::new(labels.clone().into_iter().collect()) } 2 => { // Extract spatial aggregation output labels using AST-based approach - let temporal_labels = &match_result.query_data[1].labels; - let spatial_labels = &match_result.query_data[0].labels; + let temporal_labels = &match_result.inner_data().labels; + let spatial_labels = &match_result.outer_data().labels; - let temporal_aggregation = &match_result.query_data[1].aggregation_info; - let spatial_aggregation = &match_result.query_data[0].aggregation_info; + let temporal_aggregation = &match_result.inner_data().aggregation_info; + let spatial_aggregation = &match_result.outer_data().aggregation_info; match self.sql_get_is_collapsable(temporal_aggregation, spatial_aggregation) { // If false: get all labels, which are all temporal labels. If true, get only spatial labels @@ -1900,21 +1911,24 @@ impl SimpleEngine { let statistic_name = match query_pattern_type { QueryPatternType::OnlyTemporal => { // Use the temporal aggregation (first subquery) - match_result.query_data[0] + match_result + .outer_data() .aggregation_info .get_name() .to_lowercase() } QueryPatternType::OneTemporalOneSpatial => { // Use the temporal aggregation (second subquery contains temporal) - match_result.query_data[1] + match_result + .inner_data() .aggregation_info .get_name() .to_lowercase() } QueryPatternType::OnlySpatial => { // Use the spatial aggregation (first subquery) - match_result.query_data[0] + match_result + .outer_data() .aggregation_info .get_name() .to_lowercase() @@ -1959,10 +1973,11 @@ impl SimpleEngine { .find_compatible_aggregation(&requirements)? }; - let metric = &match_result.query_data[0].metric; + let metric = &match_result.outer_data().metric; let spatial_filter = if query_pattern_type == QueryPatternType::OneTemporalOneSpatial { - match_result.query_data[0] + match_result + .outer_data() .labels .iter() .cloned() @@ -1972,25 +1987,50 @@ impl SimpleEngine { String::new() }; - // Create query plan and execute values query + let do_merge = query_pattern_type == QueryPatternType::OnlyTemporal + || query_pattern_type == QueryPatternType::OneTemporalOneSpatial; + + self.build_sql_execution_context_tail( + metric, + ×tamps, + metadata, + agg_info, + do_merge, + spatial_filter, + query_time, + ) + } + + /// Shared context-building tail for both SQL context builders. + /// + /// Called by `build_query_execution_context_sql` and `build_spatiotemporal_context` + /// after labels, statistic, metadata, timestamps, and `agg_info` are resolved. + /// Builds the query plan, derives grouping/aggregated labels, and returns the + /// final `QueryExecutionContext`. + #[allow(clippy::too_many_arguments)] + fn build_sql_execution_context_tail( + &self, + metric: &str, + timestamps: &QueryTimestamps, + metadata: QueryMetadata, + agg_info: AggregationIdInfo, + do_merge: bool, + spatial_filter: String, + query_time: u64, + ) -> Option { let query_plan = self - .create_store_query_plan(metric, ×tamps, &agg_info) + .create_store_query_plan(metric, timestamps, &agg_info) .map_err(|e| { warn!("Failed to create store query plan: {}", e); e }) .ok()?; - // Create execution context - // do_merge is true for temporal queries (OnlyTemporal or OneTemporalOneSpatial) - let do_merge = query_pattern_type == QueryPatternType::OnlyTemporal - || query_pattern_type == QueryPatternType::OneTemporalOneSpatial; - let grouping_labels = self .streaming_config .get_aggregation_config(agg_info.aggregation_id_for_value) .map(|config| config.grouping_labels.clone()) - .unwrap_or_else(|| query_output_labels.clone()); + .unwrap_or_else(|| metadata.query_output_labels.clone()); let aggregated_labels = self .streaming_config @@ -2001,16 +2041,14 @@ impl SimpleEngine { Some(QueryExecutionContext { metric: metric.to_string(), metadata, - store_plan: query_plan.clone(), - agg_info: agg_info.clone(), + store_plan: query_plan, + agg_info, do_merge, spatial_filter, query_time, grouping_labels, aggregated_labels, }) - - // TODO: Handle spatial aggregation for OneTemporalOneSpatial when not collapsable } /// Build execution context for SpatioTemporal queries. @@ -2023,7 +2061,8 @@ impl SimpleEngine { ) -> Option { // Output labels are the GROUP BY columns (subset of all labels) let query_output_labels = KeyByLabelNames::new( - match_result.query_data[0] + match_result + .outer_data() .labels .clone() .into_iter() @@ -2031,7 +2070,8 @@ impl SimpleEngine { ); // Get the statistic from the aggregation - let statistic_name = match_result.query_data[0] + let statistic_name = match_result + .outer_data() .aggregation_info .get_name() .to_lowercase(); @@ -2055,7 +2095,7 @@ impl SimpleEngine { // Calculate timestamps - similar to OnlyTemporal let end_timestamp = self.validate_and_align_end_timestamp(query_time, QueryPatternType::OnlyTemporal); - let scrape_intervals = match_result.query_data[0].time_info.get_duration() as u64; + let scrape_intervals = match_result.outer_data().time_info.get_duration() as u64; let start_timestamp = end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000); @@ -2083,40 +2123,17 @@ impl SimpleEngine { self.streaming_config .find_compatible_aggregation(&requirements)? }; - let metric = &match_result.query_data[0].metric; - - let query_plan = self - .create_store_query_plan(metric, ×tamps, &agg_info) - .map_err(|e| { - warn!("Failed to create store query plan: {}", e); - e - }) - .ok()?; - - // SpatioTemporal queries need merging (like temporal queries) - let grouping_labels = self - .streaming_config - .get_aggregation_config(agg_info.aggregation_id_for_value) - .map(|config| config.grouping_labels.clone()) - .unwrap_or_else(|| query_output_labels.clone()); + let metric = &match_result.outer_data().metric; - let aggregated_labels = self - .streaming_config - .get_aggregation_config(agg_info.aggregation_id_for_key) - .map(|config| config.aggregated_labels.clone()) - .unwrap_or_else(KeyByLabelNames::empty); - - Some(QueryExecutionContext { - metric: metric.to_string(), + self.build_sql_execution_context_tail( + metric, + ×tamps, metadata, - store_plan: query_plan, agg_info, - do_merge: true, - spatial_filter: String::new(), + true, + String::new(), query_time, - grouping_labels, - aggregated_labels, - }) + ) } /// Handle a query following Python's unified architecture @@ -2140,19 +2157,7 @@ impl SimpleEngine { "Built execution context for ElasticSearch query {:?}", context ); - // Execute complete query pipeline - let results = self - .execute_query_pipeline(&context, false) // SQL: topk disabled - .map_err(|e| { - warn!("Query execution failed: {}", e); - e - }) - .ok()?; - - Some(( - context.metadata.query_output_labels, - QueryResult::vector(results, context.query_time), - )) + self.execute_context(context, false) } pub fn build_query_execution_context_elastic( @@ -2586,24 +2591,7 @@ impl SimpleEngine { context.store_plan.values_query.end_timestamp ); - // TODO: Make handle_query_promql (and handle_query) async and use .await directly - // instead of blocking. See execute_plan for the async implementation. - // Execute complete query pipeline - //let results = tokio::task::block_in_place(|| { - // tokio::runtime::Handle::current().block_on(self.execute_plan(&context)) - //}) - let results = self - .execute_query_pipeline(&context, true) // PromQL: topk enabled - .map_err(|e| { - warn!("Query execution failed: {}", e); - e - }) - .ok()?; - - let result = Some(( - context.metadata.query_output_labels, - QueryResult::vector(results, context.query_time), - )); + let result = self.execute_context(context, true); // Determine query routing order based on function type. // USampling functions prefer the precomputed path first (sketch fallback), @@ -2741,108 +2729,7 @@ impl SimpleEngine { debug!("Found matching query config for: {}", query); - // Track query metadata setup latency - let query_metadata_start_time = Instant::now(); - - // Extract metric and spatial filter using AST-based approach - // SQL issue: table name and filter label names, return empty filter for now but compute later - let (metric, spatial_filter) = get_metric_and_spatial_filter(&match_result); - - // Get all labels from inference config for this metric - let promql_schema = match &self.inference_config.schema { - SchemaConfig::PromQL(schema) => schema, - SchemaConfig::SQL(_) => { - warn!("PromQL query requested but config has SQL schema"); - return None; - } - &SchemaConfig::ElasticQueryDSL => { - warn!("PromQL query requested but config has ElasticQueryDSL schema"); - return None; - } - SchemaConfig::ElasticSQL(_) => { - warn!("PromQL query requested but config has ElasticSQL schema"); - return None; - } - }; - let all_labels = match promql_schema.get_labels(&metric).cloned() { - Some(labels) => labels, - None => { - warn!("No metric configuration found for '{}'", metric); - return None; - } - }; - - // Determine query output labels based on pattern type - // TODO: should we be returning this and using it to convert to final HTTP response? - let mut query_output_labels = match query_pattern_type { - QueryPatternType::OnlyTemporal => { - // For temporal-only queries, output all labels - all_labels.clone() - } - QueryPatternType::OnlySpatial => { - // Extract spatial aggregation output labels using AST-based approach - get_spatial_aggregation_output_labels(&match_result, &all_labels) - } - QueryPatternType::OneTemporalOneSpatial => { - // Extract spatial aggregation output labels for combined queries - let temporal_aggregation = match_result.get_function_name().unwrap(); - let spatial_aggregation = match_result.get_aggregation_op().unwrap(); - // iff temporal outer labels issubset of spatial inner labels, collapse - // SQL issue: take into account labels from the query, not needed at present because only uses promql translations - let collapsable = temporal_aggregation - .parse::() - .ok() - .zip(spatial_aggregation.parse::().ok()) - .is_some_and(|(f, o)| get_is_collapsable(f, o)); - if collapsable { - get_spatial_aggregation_output_labels(&match_result, &all_labels) - } else { - all_labels.clone() - } - } - }; - - let timestamps = - self.calculate_query_timestamps_promql(query_time, query_pattern_type, &match_result); - - // Extract statistics to compute using AST-based approach - let statistics_to_compute = get_statistics_to_compute(query_pattern_type, &match_result); - if statistics_to_compute.len() != 1 { - warn!( - "Expected exactly one statistic to compute, found {}", - statistics_to_compute.len() - ); - return None; - } - let statistic_to_compute = statistics_to_compute.first().unwrap(); - - // For topk queries, prepend "__name__" to query_output_labels - if *statistic_to_compute == Statistic::Topk { - let mut new_labels = vec!["__name__".to_string()]; - new_labels.extend(query_output_labels.labels); - query_output_labels = KeyByLabelNames::new(new_labels); - } - - let query_kwargs = self - .build_query_kwargs_promql(statistic_to_compute, query_pattern_type, &match_result) - .map_err(|e| { - warn!("{}", e); - e - }) - .ok()?; - - let query_metadata_duration = query_metadata_start_time.elapsed(); - debug!( - "[LATENCY] Query metadata calculation: {:.2}ms", - query_metadata_duration.as_secs_f64() * 1000.0 - ); - - // Create query metadata - let metadata = QueryMetadata { - query_output_labels: query_output_labels.clone(), - statistic_to_compute: *statistic_to_compute, - query_kwargs: query_kwargs.clone(), - }; + let query_context_start_time = Instant::now(); // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. let agg_info: AggregationIdInfo = if let Some(config) = self.find_query_config(&query) { @@ -2863,51 +2750,20 @@ impl SimpleEngine { .find_compatible_aggregation(&requirements)? }; - // Create query plan (determines window type and calculates timestamps) - let query_plan = self - .create_store_query_plan(&metric, ×tamps, &agg_info) - .map_err(|e| { - warn!("Failed to create store query plan: {}", e); - e - }) - .ok()?; - - // let window_type = if query_plan.values_query.is_exact_query { - // "sliding" - // } else { - // "tumbling" - // }; - - // Create execution context - // do_merge is true for temporal queries (OnlyTemporal or OneTemporalOneSpatial) - let do_merge = query_pattern_type == QueryPatternType::OnlyTemporal - || query_pattern_type == QueryPatternType::OneTemporalOneSpatial; - - let grouping_labels = self - .streaming_config - .get_aggregation_config(agg_info.aggregation_id_for_value) - .map(|config| config.grouping_labels.clone()) - .unwrap_or_else(|| query_output_labels.clone()); - - let aggregated_labels = self - .streaming_config - .get_aggregation_config(agg_info.aggregation_id_for_key) - .map(|config| config.aggregated_labels.clone()) - .unwrap_or_else(KeyByLabelNames::empty); - - Some(QueryExecutionContext { - metric: metric.clone(), - metadata, - store_plan: query_plan.clone(), - agg_info: agg_info.clone(), - do_merge, - spatial_filter, + let result = self.build_promql_execution_context_tail( + &match_result, + query_pattern_type, query_time, - grouping_labels, - aggregated_labels, - }) + agg_info, + ); - // TODO: Handle spatial aggregation for OneTemporalOneSpatial when not collapsable + let query_context_duration = query_context_start_time.elapsed(); + debug!( + "[LATENCY] Query context build: {:.2}ms", + query_context_duration.as_secs_f64() * 1000.0 + ); + + result } /// Merge precomputed outputs (extracts buckets from timestamped data) From fc0be7ef7e42154d24f48e40944b267aa3a15103 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Fri, 10 Apr 2026 10:47:34 -0400 Subject: [PATCH 3/4] more refactor --- .../src/engines/simple_engine.rs | 31 ++++++------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine.rs index 7ce77df..bc73109 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine.rs @@ -1333,21 +1333,14 @@ impl SimpleEngine { rhs: f64, ) -> f64 { use promql_parser::parser::token::{T_ADD, T_DIV, T_MOD, T_MUL, T_POW, T_SUB}; - let id = op.id(); - if id == T_ADD { - lhs + rhs - } else if id == T_SUB { - lhs - rhs - } else if id == T_MUL { - lhs * rhs - } else if id == T_DIV { - lhs / rhs - } else if id == T_MOD { - lhs % rhs - } else if id == T_POW { - lhs.powf(rhs) - } else { - f64::NAN + match op.id() { + id if id == T_ADD => lhs + rhs, + id if id == T_SUB => lhs - rhs, + id if id == T_MUL => lhs * rhs, + id if id == T_DIV => lhs / rhs, + id if id == T_MOD => lhs % rhs, + id if id == T_POW => lhs.powf(rhs), + _ => f64::NAN, } } @@ -2849,9 +2842,7 @@ impl SimpleEngine { } // Try to use optimized batch merge for KLL accumulators - if !accumulators.is_empty() - && accumulators[0].get_accumulator_type() == AggregationType::DatasketchesKLL - { + if accumulators[0].get_accumulator_type() == AggregationType::DatasketchesKLL { use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator; match DatasketchesKLLAccumulator::merge_multiple(accumulators) { @@ -2867,9 +2858,7 @@ impl SimpleEngine { } // Try to use optimized batch merge for CountMinSketch accumulators - if !accumulators.is_empty() - && accumulators[0].get_accumulator_type() == AggregationType::CountMinSketch - { + if accumulators[0].get_accumulator_type() == AggregationType::CountMinSketch { use crate::precompute_operators::count_min_sketch_accumulator::CountMinSketchAccumulator; match CountMinSketchAccumulator::merge_multiple(accumulators) { From e327e6cde4631a1e87d6d049f9fa09b619b4aee6 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Fri, 10 Apr 2026 14:12:31 -0400 Subject: [PATCH 4/4] some more refactor --- .../src/ast_matching/sqlpattern_matcher.rs | 8 +-- .../src/engines/simple_engine.rs | 59 ++++++++++++------- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs index 3e5f965..fee6c70 100644 --- a/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs +++ b/asap-common/dependencies/rs/sql_utilities/src/ast_matching/sqlpattern_matcher.rs @@ -73,14 +73,14 @@ impl SQLQuery { } /// The outer (spatial / single) query's data — always `query_data[0]`. - pub fn outer_data(&self) -> &SQLQueryData { - &self.query_data[0] + pub fn outer_data(&self) -> Option<&SQLQueryData> { + self.query_data.first() } /// The inner (temporal) query's data for nested queries — always `query_data[1]`. /// Only valid for `OneTemporalOneSpatial` patterns. - pub fn inner_data(&self) -> &SQLQueryData { - &self.query_data[1] + pub fn inner_data(&self) -> Option<&SQLQueryData> { + self.query_data.get(1) } } diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine.rs index bc73109..bc753b0 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine.rs @@ -395,13 +395,21 @@ impl SimpleEngine { ) -> u64 { match query_pattern_type { QueryPatternType::OnlyTemporal => { - let scrape_intervals = - match_result.outer_data().time_info.clone().get_duration() as u64; + let scrape_intervals = match_result + .outer_data() + .expect("OnlyTemporal pattern guarantees outer_data is present") + .time_info + .clone() + .get_duration() as u64; end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000) } QueryPatternType::OneTemporalOneSpatial => { - let scrape_intervals = - match_result.inner_data().time_info.clone().get_duration() as u64; + let scrape_intervals = match_result + .inner_data() + .expect("OneTemporalOneSpatial pattern guarantees inner_data is present") + .time_info + .clone() + .get_duration() as u64; end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000) } QueryPatternType::OnlySpatial => { @@ -1627,12 +1635,15 @@ impl SimpleEngine { match_result: &SQLQuery, query_pattern_type: QueryPatternType, ) -> QueryRequirements { - let query_data = match_result.outer_data(); + let query_data = match_result + .outer_data() + .expect("build_query_requirements_sql called on valid SQLQuery"); let metric = query_data.metric.clone(); let statistic_name = match query_pattern_type { QueryPatternType::OneTemporalOneSpatial => match_result .inner_data() + .expect("OneTemporalOneSpatial pattern guarantees inner_data is present") .aggregation_info .get_name() .to_lowercase(), @@ -1650,8 +1661,12 @@ impl SimpleEngine { Some(scrape_intervals * self.prometheus_scrape_interval * 1000) } QueryPatternType::OneTemporalOneSpatial => { - let scrape_intervals = - match_result.inner_data().time_info.clone().get_duration() as u64; + let scrape_intervals = match_result + .inner_data() + .expect("OneTemporalOneSpatial pattern guarantees inner_data is present") + .time_info + .clone() + .get_duration() as u64; Some(scrape_intervals * self.prometheus_scrape_interval * 1000) } }; @@ -1845,7 +1860,7 @@ impl SimpleEngine { // so we need to use the inner (temporal) query's time_info to compute query_time let query_time = match query_pattern_type { QueryPatternType::OneTemporalOneSpatial => { - let inner_time_info = &match_result.inner_data().time_info; + let inner_time_info = &match_result.inner_data()?.time_info; Self::convert_query_time_to_data_time( inner_time_info.get_start() + inner_time_info.get_duration(), ) @@ -1876,17 +1891,17 @@ impl SimpleEngine { // Potentially change SQLQueryType 1 => { // For non-nested queries, output associated labels - let labels = &match_result.outer_data().labels; + let labels = &match_result.outer_data()?.labels; KeyByLabelNames::new(labels.clone().into_iter().collect()) } 2 => { // Extract spatial aggregation output labels using AST-based approach - let temporal_labels = &match_result.inner_data().labels; - let spatial_labels = &match_result.outer_data().labels; + let temporal_labels = &match_result.inner_data()?.labels; + let spatial_labels = &match_result.outer_data()?.labels; - let temporal_aggregation = &match_result.inner_data().aggregation_info; - let spatial_aggregation = &match_result.outer_data().aggregation_info; + let temporal_aggregation = &match_result.inner_data()?.aggregation_info; + let spatial_aggregation = &match_result.outer_data()?.aggregation_info; match self.sql_get_is_collapsable(temporal_aggregation, spatial_aggregation) { // If false: get all labels, which are all temporal labels. If true, get only spatial labels @@ -1905,7 +1920,7 @@ impl SimpleEngine { QueryPatternType::OnlyTemporal => { // Use the temporal aggregation (first subquery) match_result - .outer_data() + .outer_data()? .aggregation_info .get_name() .to_lowercase() @@ -1913,7 +1928,7 @@ impl SimpleEngine { QueryPatternType::OneTemporalOneSpatial => { // Use the temporal aggregation (second subquery contains temporal) match_result - .inner_data() + .inner_data()? .aggregation_info .get_name() .to_lowercase() @@ -1921,7 +1936,7 @@ impl SimpleEngine { QueryPatternType::OnlySpatial => { // Use the spatial aggregation (first subquery) match_result - .outer_data() + .outer_data()? .aggregation_info .get_name() .to_lowercase() @@ -1966,11 +1981,11 @@ impl SimpleEngine { .find_compatible_aggregation(&requirements)? }; - let metric = &match_result.outer_data().metric; + let metric = &match_result.outer_data()?.metric; let spatial_filter = if query_pattern_type == QueryPatternType::OneTemporalOneSpatial { match_result - .outer_data() + .outer_data()? .labels .iter() .cloned() @@ -2055,7 +2070,7 @@ impl SimpleEngine { // Output labels are the GROUP BY columns (subset of all labels) let query_output_labels = KeyByLabelNames::new( match_result - .outer_data() + .outer_data()? .labels .clone() .into_iter() @@ -2064,7 +2079,7 @@ impl SimpleEngine { // Get the statistic from the aggregation let statistic_name = match_result - .outer_data() + .outer_data()? .aggregation_info .get_name() .to_lowercase(); @@ -2088,7 +2103,7 @@ impl SimpleEngine { // Calculate timestamps - similar to OnlyTemporal let end_timestamp = self.validate_and_align_end_timestamp(query_time, QueryPatternType::OnlyTemporal); - let scrape_intervals = match_result.outer_data().time_info.get_duration() as u64; + let scrape_intervals = match_result.outer_data()?.time_info.get_duration() as u64; let start_timestamp = end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000); @@ -2116,7 +2131,7 @@ impl SimpleEngine { self.streaming_config .find_compatible_aggregation(&requirements)? }; - let metric = &match_result.outer_data().metric; + let metric = &match_result.outer_data()?.metric; self.build_sql_execution_context_tail( metric,