diff --git a/asap-query-engine/src/engines/simple_engine/elastic.rs b/asap-query-engine/src/engines/simple_engine/elastic.rs new file mode 100644 index 0000000..c9d7755 --- /dev/null +++ b/asap-query-engine/src/engines/simple_engine/elastic.rs @@ -0,0 +1,191 @@ +//! Elasticsearch DSL query language handler for SimpleEngine. +//! +//! Contains all Elastic DSL-specific context building and query dispatch. + +use super::SimpleEngine; +use super::{QueryExecutionContext, QueryMetadata, QueryTimestamps}; +use crate::engines::query_result::QueryResult; +use elastic_dsl_utilities::pattern::parse_and_classify; +use elastic_dsl_utilities::types::{EsDslQueryPattern, GroupBySpec, MetricAggType}; +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::query_logics::enums::Statistic; +use std::collections::HashMap; +use tracing::{debug, warn}; + +impl SimpleEngine { + pub fn handle_query_elastic( + &self, + query: String, + time: f64, + ) -> Option<(KeyByLabelNames, QueryResult)> { + let context = self.build_query_execution_context_elastic(query, time)?; + debug!( + "Built execution context for ElasticSearch query {:?}", + context + ); + self.execute_context(context, false) + } + + pub fn build_query_execution_context_elastic( + &self, + query: String, + time: f64, + ) -> Option { + let query_time = Self::convert_query_time_to_data_time(time); + + // 1. Parse query DSL somehow. Elasticsearch DSL crate does not support deserializing, but maybe can use Opensearch instead? + // 2. Determine whether query is supported using some AST representation or hardcoded pattern matching. + let query_pattern: EsDslQueryPattern = + parse_and_classify(&query).unwrap_or(EsDslQueryPattern::Unknown); + match query_pattern { + EsDslQueryPattern::Unknown => { + debug!("Could not parse query into known pattern"); + return None; + } + _ => { + debug!("Parsed query pattern: {:?}", query_pattern); + } + } + + // 3. Convert parsed query into execution context components (labels, statistic, kwargs, metadata, store query plan, etc.) + + // TODO: Figure out how to handle query configuration for ElasticSearch queries. + let query_config = self.find_query_config(&query)?; + let agg_info = self + .get_aggregation_id_info(query_config) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()?; + + let do_merge = true; // No "instant" queries in ElasticSearch supported for now, so we always need to merge. + + let (metric, query_metadata) = self.build_query_metadata_elastic(&query_pattern)?; + + let spatial_filter = String::new(); // Placeholder - extract from query if applicable + + // TODO: Need way to parse ES DSL "date math". + let timestamps = self.resolve_query_time_range_elastic(query_time, query_pattern); + + let query_plan = self + .create_store_query_plan(&metric, ×tamps, &agg_info) + .map_err(|e| { + warn!("Failed to create store query plan: {}", e); + e + }) + .ok()?; + + let grouping_labels = self + .streaming_config + .get_aggregation_config(agg_info.aggregation_id_for_value) + .map(|config| config.grouping_labels.clone()) + .unwrap_or_else(|| query_metadata.query_output_labels.clone()); + + let aggregated_labels = self + .streaming_config + .get_aggregation_config(agg_info.aggregation_id_for_key) + .map(|config| config.aggregated_labels.clone()) + .unwrap_or_else(KeyByLabelNames::empty); + + Some(QueryExecutionContext { + metric, + metadata: query_metadata, + store_plan: query_plan.clone(), + agg_info: agg_info.clone(), + do_merge, + spatial_filter, + query_time, + grouping_labels, + aggregated_labels, + }) + } + + fn build_query_metadata_elastic( + &self, + query_pattern: &EsDslQueryPattern, + ) -> Option<(String, QueryMetadata)> { + // Constructs QueryMetadata based on the parsed ES DSL query pattern. This includes determining the + // metric to query, the statistic to compute, and any relevant query kwargs (e.g. quantile value for percentiles). + + // Figure out aggregation type and what labels are included in output. + // By default, we only include grouping labels in the output for ES DSL. + + // Take first aggregation by default since current engine doesn't support multiple aggregations in a single query. + let aggregation = query_pattern.get_metric_aggs()?.first()?.clone(); + + // By default, we only include grouping labels in the output for ES DSL. + let query_output_labels = match query_pattern.get_groupby_spec() { + Some(GroupBySpec::Terms { field }) => KeyByLabelNames::new(vec![field.clone()]), + Some(GroupBySpec::MultiTerms { fields }) => KeyByLabelNames::new(fields.to_vec()), + None => KeyByLabelNames::empty(), + }; + + let metric = aggregation.field.clone(); + + // Map ElasticSearch aggregation types to our internal Statistic enum. + let statistic_to_compute = match aggregation.agg_type { + MetricAggType::Percentiles => Statistic::Quantile, + MetricAggType::Avg => Statistic::Rate, + MetricAggType::Sum => Statistic::Sum, + MetricAggType::Min => Statistic::Min, + MetricAggType::Max => Statistic::Max, + }; + + let mut query_kwargs = HashMap::new(); // Placeholder - build based on query and statistic + if aggregation.agg_type == MetricAggType::Percentiles { + // Extract quantile value from aggregation parameters and add to query_kwargs + if let Some(params) = &aggregation.params { + if let Some(percents) = params.get("percents") { + // Get first value from percents array since we only support one quantile argument for now. + let quantile = percents + .as_array() + .and_then(|arr| arr.first()) + .and_then(|v| v.as_f64()); + // ES percentiles are specified as values between 0 and 100, but we want to convert to 0-1 range for our internal representation. + query_kwargs.insert("quantile".to_string(), (quantile? / 100.0).to_string()); + } + } + } + + let metadata = QueryMetadata { + query_output_labels: query_output_labels.clone(), + statistic_to_compute, + query_kwargs: query_kwargs.clone(), + }; + Some((metric, metadata)) + } + + pub fn resolve_query_time_range_elastic( + &self, + query_time: u64, + query_pattern: EsDslQueryPattern, + ) -> QueryTimestamps { + // Resolves the actual start and end timestamps into milliseconds for an ElasticSearch query + // based on the provided query_time and the time range specified in the ES DSL query pattern (if any). + // If no time range is specified, default to entire history up to query_time. + + let mut start_timestamp: u64 = 0; + let mut end_timestamp: u64 = query_time; + + let time_range = query_pattern.get_time_range(); + if let Some(tr) = time_range { + if let Some(resolved_range) = tr.resolve_epoch_millis(query_time as i64) { + debug!( + "Parsed time range from query: start={} end={}", + resolved_range.gte_ms.unwrap_or(0), + resolved_range.lte_ms.unwrap_or(0) + ); + start_timestamp = resolved_range.gte_ms.unwrap_or(0) as u64; + end_timestamp = resolved_range.lte_ms.unwrap_or(query_time as i64) as u64; + } else { + debug!("Failed to resolve time range from query"); + } + }; + + QueryTimestamps { + start_timestamp, + end_timestamp, + } + } +} diff --git a/asap-query-engine/src/engines/simple_engine.rs b/asap-query-engine/src/engines/simple_engine/mod.rs similarity index 56% rename from asap-query-engine/src/engines/simple_engine.rs rename to asap-query-engine/src/engines/simple_engine/mod.rs index bc753b0..806971a 100644 --- a/asap-query-engine/src/engines/simple_engine.rs +++ b/asap-query-engine/src/engines/simple_engine/mod.rs @@ -1,16 +1,16 @@ +mod elastic; +mod promql; +mod sql; + use crate::data_model::{ - AggregationIdInfo, InferenceConfig, KeyByLabelValues, QueryConfig, QueryLanguage, SchemaConfig, + AggregationIdInfo, InferenceConfig, KeyByLabelValues, QueryConfig, QueryLanguage, StreamingConfig, }; -use crate::engines::query_result::{InstantVectorElement, QueryResult, RangeVectorElement}; +use crate::engines::query_result::{InstantVectorElement, QueryResult}; // use crate::stores::promsketch_store::{ // self, is_usampling_function, metrics as ps_metrics, PromSketchStore, // }; use crate::stores::{Store, TimestampedBucketsMap}; -use core::panic; -use promql_utilities::get_is_collapsable; -use promql_utilities::query_logics::enums::{AggregationOperator, AggregationType, PromQLFunction}; -use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; @@ -19,25 +19,12 @@ use tracing::{debug, warn}; use crate::AggregateCore; use asap_types::enums::WindowType; -use asap_types::query_requirements::QueryRequirements; -use asap_types::utils::normalize_spatial_filter; -use promql_utilities::ast_matching::{PromQLMatchResult, PromQLPattern, PromQLPatternBuilder}; +use promql_utilities::ast_matching::{PromQLPattern, PromQLPatternBuilder}; use promql_utilities::data_model::KeyByLabelNames; -use promql_utilities::query_logics::enums::{QueryPatternType, Statistic}; -use promql_utilities::query_logics::parsing::{ - get_metric_and_spatial_filter, get_spatial_aggregation_output_labels, get_statistics_to_compute, +use promql_utilities::query_logics::enums::{ + AggregationOperator, AggregationType, PromQLFunction, QueryPatternType, Statistic, }; - -use sql_utilities::ast_matching::QueryType; -use sql_utilities::ast_matching::{SQLPatternMatcher, SQLPatternParser, SQLQuery}; -use sql_utilities::sqlhelper::{AggregationInfo, SQLQueryData}; -use sqlparser::dialect::*; -use sqlparser::parser::Parser as parser; - -// SQL issue: refactor simpleengine to create matchresult similar to SQLquerydata - -use elastic_dsl_utilities::pattern::parse_and_classify; -use elastic_dsl_utilities::types::{EsDslQueryPattern, GroupBySpec, MetricAggType}; +use serde_json::Value; // Type alias for merged outputs (single aggregate per key after merging) type MergedOutputsMap = HashMap, Box>; @@ -310,33 +297,6 @@ impl SimpleEngine { .find(|config| config.query == query) } - /// Finds the query configuration for a SQL query using structural pattern matching. - /// - /// Unlike `find_query_config` (which does exact string comparison), this method parses - /// each template in query_configs and compares it structurally against the incoming - /// query_data — ignoring absolute timestamps and comparing only metric, aggregation, - /// labels, time column name, and duration. - fn find_query_config_sql(&self, query_data: &SQLQueryData) -> Option<&QueryConfig> { - let schema = match &self.inference_config.schema { - SchemaConfig::SQL(sql_schema) => sql_schema, - _ => return None, - }; - - self.inference_config.query_configs.iter().find(|config| { - let template_statements = - match parser::parse_sql(&GenericDialect {}, config.query.as_str()) { - Ok(stmts) => stmts, - Err(_) => return false, - }; - let template_data = - match SQLPatternParser::new(schema, 0.0).parse_query(&template_statements) { - Some(data) => data, - None => return false, - }; - query_data.matches_sql_pattern(&template_data) - }) - } - /// Validates and potentially aligns end timestamp based on query pattern fn validate_and_align_end_timestamp( &self, @@ -368,202 +328,6 @@ impl SimpleEngine { end_timestamp } - /// Calculates start timestamp for PromQL queries - fn calculate_start_timestamp_promql( - &self, - end_timestamp: u64, - query_pattern_type: QueryPatternType, - match_result: &PromQLMatchResult, - ) -> u64 { - match query_pattern_type { - QueryPatternType::OnlyTemporal | QueryPatternType::OneTemporalOneSpatial => { - let range_seconds = match_result.get_range_duration().unwrap().num_seconds() as u64; - end_timestamp - (range_seconds * 1000) - } - QueryPatternType::OnlySpatial => { - end_timestamp - (self.prometheus_scrape_interval * 1000) - } - } - } - - /// Calculates start timestamp for SQL queries - fn calculate_start_timestamp_sql( - &self, - end_timestamp: u64, - query_pattern_type: QueryPatternType, - match_result: &SQLQuery, - ) -> u64 { - match query_pattern_type { - QueryPatternType::OnlyTemporal => { - let scrape_intervals = match_result - .outer_data() - .expect("OnlyTemporal pattern guarantees outer_data is present") - .time_info - .clone() - .get_duration() as u64; - end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000) - } - QueryPatternType::OneTemporalOneSpatial => { - let scrape_intervals = match_result - .inner_data() - .expect("OneTemporalOneSpatial pattern guarantees inner_data is present") - .time_info - .clone() - .get_duration() as u64; - end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000) - } - QueryPatternType::OnlySpatial => { - end_timestamp - (self.prometheus_scrape_interval * 1000) - } - } - } - - /// Calculates and validates query timestamps for PromQL - fn calculate_query_timestamps_promql( - &self, - query_time: u64, - query_pattern_type: QueryPatternType, - match_result: &PromQLMatchResult, - ) -> QueryTimestamps { - let mut end_timestamp = if let Some(at_modifier) = match_result - .tokens - .get("metric") - .and_then(|t| t.metric.as_ref()) - .and_then(|m| m.at_modifier) - { - at_modifier * 1000 - } else { - query_time - }; - - end_timestamp = self.validate_and_align_end_timestamp(end_timestamp, query_pattern_type); - let start_timestamp = - self.calculate_start_timestamp_promql(end_timestamp, query_pattern_type, match_result); - - QueryTimestamps { - start_timestamp, - end_timestamp, - } - } - - /// Calculates and validates query timestamps for SQL - fn calculate_query_timestamps_sql( - &self, - query_time: u64, - query_pattern_type: QueryPatternType, - match_result: &SQLQuery, - ) -> QueryTimestamps { - let mut end_timestamp = query_time; - end_timestamp = self.validate_and_align_end_timestamp(end_timestamp, query_pattern_type); - let start_timestamp = - self.calculate_start_timestamp_sql(end_timestamp, query_pattern_type, match_result); - - QueryTimestamps { - start_timestamp, - end_timestamp, - } - } - - /// Extracts quantile parameter from PromQL match result - fn extract_quantile_param_promql( - &self, - query_pattern_type: QueryPatternType, - match_result: &PromQLMatchResult, - ) -> Option { - let quantile_value = match query_pattern_type { - QueryPatternType::OnlyTemporal | QueryPatternType::OneTemporalOneSpatial => { - match_result - .tokens - .get("function_args") - .and_then(|token| token.function.as_ref()) - .and_then(|func| func.args.first()) - } - QueryPatternType::OnlySpatial => match_result - .tokens - .get("aggregation") - .and_then(|token| token.aggregation.as_ref()) - .and_then(|agg| agg.param.as_ref()), - }; - - quantile_value.map(|s| s.to_string()) - } - - /// Extracts quantile parameter from SQL match result - fn extract_quantile_param_sql(&self, match_result: &SQLQuery) -> Option { - match_result - .query_data - .first() - .map(|data| data.aggregation_info.get_args()[0].to_string()) - } - - /// Extracts topk k parameter from PromQL match result - fn extract_topk_param( - &self, - query_pattern_type: QueryPatternType, - match_result: &PromQLMatchResult, - ) -> Result { - match query_pattern_type { - QueryPatternType::OnlySpatial => match_result - .tokens - .get("aggregation") - .and_then(|token| token.aggregation.as_ref()) - .and_then(|agg| agg.param.as_ref()) - .map(|s| s.to_string()) - .ok_or_else(|| "Missing k parameter for top-k query".to_string()), - _ => Err(format!( - "Top-k statistic is only supported for OnlySpatial pattern, found {:?}", - query_pattern_type - )), - } - } - - /// Builds query kwargs (quantile, k, etc.) for PromQL queries - fn build_query_kwargs_promql( - &self, - statistic: &Statistic, - query_pattern_type: QueryPatternType, - match_result: &PromQLMatchResult, - ) -> Result, String> { - let mut query_kwargs = HashMap::new(); - - match statistic { - Statistic::Quantile => { - let quantile = self - .extract_quantile_param_promql(query_pattern_type, match_result) - .ok_or_else(|| "Missing quantile parameter for quantile query".to_string())?; - debug!("Extracted quantile value: {:?}", quantile); - query_kwargs.insert("quantile".to_string(), quantile); - } - Statistic::Topk => { - let k = self.extract_topk_param(query_pattern_type, match_result)?; - debug!("Extracted k value: {:?}", k); - query_kwargs.insert("k".to_string(), k); - } - _ => {} - } - - Ok(query_kwargs) - } - - /// Builds query kwargs for SQL queries - fn build_query_kwargs_sql( - &self, - statistic: &Statistic, - match_result: &SQLQuery, - ) -> Result, String> { - let mut query_kwargs = HashMap::new(); - - if *statistic == Statistic::Quantile { - let quantile = self - .extract_quantile_param_sql(match_result) - .ok_or_else(|| "Missing quantile parameter for quantile query".to_string())?; - query_kwargs.insert("quantile".to_string(), quantile); - } - // Note: SQL doesn't support topk limiting yet - - Ok(query_kwargs) - } - /// Creates query parameters for separate keys query fn create_keys_query_params( &self, @@ -1050,462 +814,6 @@ impl SimpleEngine { Ok(self.format_final_results(all_results, statistic, metric, false)) } - /// Finds a query config by structurally comparing `arm_ast` against each - /// config's parsed query. - /// - /// Both the arm AST and each config's query string are first normalized to - /// the canonical `Display` form produced by `promql_parser`. This ensures - /// that user-written variants like `"sum(x) by (lbl)"` and the parser's - /// canonical `"sum by (lbl) (x)"` compare equal. - pub fn find_query_config_promql_structural( - &self, - arm_ast: &promql_parser::parser::Expr, - ) -> Option<&QueryConfig> { - let arm_canonical = format!("{}", arm_ast); - self.inference_config.query_configs.iter().find(|config| { - let config_canonical = promql_parser::parser::parse(&config.query) - .map(|ast| format!("{}", ast)) - .unwrap_or_default(); - config_canonical == arm_canonical - }) - } - - /// Variant of `build_query_execution_context_promql` that accepts a pre-parsed - /// AST node and a pre-found `QueryConfig`, avoiding redundant parsing and lookup. - pub fn build_query_execution_context_from_ast( - &self, - arm_ast: &promql_parser::parser::Expr, - query_config: &QueryConfig, - time: f64, - ) -> Option { - let query_time = Self::convert_query_time_to_data_time(time); - - let mut found_match = None; - for (pattern_type, patterns) in &self.controller_patterns { - for pattern in patterns { - let match_result = pattern.matches(arm_ast); - if match_result.matches { - found_match = Some((*pattern_type, match_result)); - break; - } - } - if found_match.is_some() { - break; - } - } - - let (query_pattern_type, match_result) = found_match?; - - let agg_info = self - .get_aggregation_id_info(query_config) - .map_err(|e| { - warn!("{}", e); - e - }) - .ok()?; - - self.build_promql_execution_context_tail( - &match_result, - query_pattern_type, - query_time, - agg_info, - ) - } - - /// Shared context-building tail for both PromQL context builders. - /// - /// Called by `build_query_execution_context_from_ast` and - /// `build_query_execution_context_promql` after pattern matching and - /// `agg_info` resolution are complete. Computes labels, statistics, - /// kwargs, metadata, query plan, and the final `QueryExecutionContext`. - fn build_promql_execution_context_tail( - &self, - match_result: &PromQLMatchResult, - query_pattern_type: QueryPatternType, - query_time: u64, - agg_info: AggregationIdInfo, - ) -> Option { - let (metric, spatial_filter) = get_metric_and_spatial_filter(match_result); - - let promql_schema = match &self.inference_config.schema { - SchemaConfig::PromQL(schema) => schema, - _ => return None, - }; - let all_labels = match promql_schema.get_labels(&metric).cloned() { - Some(labels) => labels, - None => { - warn!("No metric configuration found for '{}'", metric); - return None; - } - }; - - let mut query_output_labels = match query_pattern_type { - QueryPatternType::OnlyTemporal => all_labels.clone(), - QueryPatternType::OnlySpatial => { - get_spatial_aggregation_output_labels(match_result, &all_labels) - } - QueryPatternType::OneTemporalOneSpatial => { - let temporal_aggregation = match_result.get_function_name().unwrap(); - let spatial_aggregation = match_result.get_aggregation_op().unwrap(); - let collapsable = temporal_aggregation - .parse::() - .ok() - .zip(spatial_aggregation.parse::().ok()) - .is_some_and(|(f, o)| get_is_collapsable(f, o)); - if collapsable { - get_spatial_aggregation_output_labels(match_result, &all_labels) - } else { - all_labels.clone() - } - } - }; - - let timestamps = - self.calculate_query_timestamps_promql(query_time, query_pattern_type, match_result); - - let statistics_to_compute = get_statistics_to_compute(query_pattern_type, match_result); - if statistics_to_compute.len() != 1 { - warn!( - "Expected exactly one statistic to compute, found {}", - statistics_to_compute.len() - ); - return None; - } - let statistic_to_compute = statistics_to_compute.first().unwrap(); - - if *statistic_to_compute == Statistic::Topk { - let mut new_labels = vec!["__name__".to_string()]; - new_labels.extend(query_output_labels.labels); - query_output_labels = KeyByLabelNames::new(new_labels); - } - - let query_kwargs = self - .build_query_kwargs_promql(statistic_to_compute, query_pattern_type, match_result) - .map_err(|e| { - warn!("{}", e); - e - }) - .ok()?; - - let metadata = QueryMetadata { - query_output_labels: query_output_labels.clone(), - statistic_to_compute: *statistic_to_compute, - query_kwargs, - }; - - let query_plan = self - .create_store_query_plan(&metric, ×tamps, &agg_info) - .map_err(|e| { - warn!("Failed to create store query plan: {}", e); - e - }) - .ok()?; - - let do_merge = query_pattern_type == QueryPatternType::OnlyTemporal - || query_pattern_type == QueryPatternType::OneTemporalOneSpatial; - - let grouping_labels = self - .streaming_config - .get_aggregation_config(agg_info.aggregation_id_for_value) - .map(|config| config.grouping_labels.clone()) - .unwrap_or_else(|| query_output_labels.clone()); - - let aggregated_labels = self - .streaming_config - .get_aggregation_config(agg_info.aggregation_id_for_key) - .map(|config| config.aggregated_labels.clone()) - .unwrap_or_else(KeyByLabelNames::empty); - - Some(QueryExecutionContext { - metric, - metadata, - store_plan: query_plan, - agg_info, - do_merge, - spatial_filter, - query_time, - grouping_labels, - aggregated_labels, - }) - } - - /// Recursively builds a DataFusion logical plan for one arm of a binary - /// arithmetic expression. - /// - /// - Leaf arm (supported PromQL pattern): look up config structurally, build - /// context, return its `to_logical_plan()` together with the output label names. - /// - Binary arm: recursively build both sub-arms and combine with - /// `build_binary_vector_plan`. - /// - Scalar literal: returns `None` (handled by the caller separately). - fn build_arm_logical_plan( - &self, - arm_ast: &promql_parser::parser::Expr, - time: f64, - ) -> Option<(datafusion::logical_expr::LogicalPlan, Vec)> { - use crate::engines::logical::plan_builder::build_binary_vector_plan; - use promql_parser::parser::Expr; - - match arm_ast { - Expr::NumberLiteral(_) => None, // caller handles scalars - Expr::Paren(paren) => self.build_arm_logical_plan(&paren.expr, time), - Expr::Binary(binary) => { - // Nested binary expression — recurse on both sides - let (lhs_plan, lhs_labels) = self.build_arm_logical_plan(&binary.lhs, time)?; - let (rhs_plan, _) = self.build_arm_logical_plan(&binary.rhs, time)?; - let combined = - build_binary_vector_plan(lhs_plan, rhs_plan, &binary.op, lhs_labels.clone()) - .ok()?; - Some((combined, lhs_labels)) - } - other => { - // Leaf pattern: structural config lookup + context + plan - let config = self.find_query_config_promql_structural(other)?; - let ctx = self.build_query_execution_context_from_ast(other, config, time)?; - let label_names = ctx.metadata.query_output_labels.labels.clone(); - let plan = ctx.to_logical_plan().ok()?; - Some((plan, label_names)) - } - } - } - - /// Handles a binary arithmetic PromQL expression by building a combined - /// DataFusion plan (vector–vector join or scalar projection) and executing it. - /// - /// Returns `None` if any arm is not acceleratable (caller falls back to Prometheus). - fn handle_binary_expr_promql( - &self, - ast: &promql_parser::parser::Expr, - time: f64, - ) -> Option<(KeyByLabelNames, QueryResult)> { - use crate::engines::logical::plan_builder::{build_binary_vector_plan, build_scalar_plan}; - use promql_parser::parser::Expr; - - let query_time = Self::convert_query_time_to_data_time(time); - - let binary = match ast { - Expr::Binary(b) => b, - _ => return None, - }; - - let lhs = binary.lhs.as_ref(); - let rhs = binary.rhs.as_ref(); - let op = &binary.op; - - // Scalar case: either side may be a numeric literal - let scalar_case: Option<(f64, &Expr, bool)> = match (lhs, rhs) { - (_, Expr::NumberLiteral(nl)) => Some((nl.val, lhs, false)), - (Expr::NumberLiteral(nl), _) => Some((nl.val, rhs, true)), - _ => None, - }; - if let Some((scalar, vector_arm, scalar_on_left)) = scalar_case { - let (vector_plan, label_names) = self.build_arm_logical_plan(vector_arm, time)?; - let combined = - build_scalar_plan(vector_plan, scalar, op, scalar_on_left, label_names.clone()) - .ok()?; - let results = tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(self.execute_logical_plan( - combined, - label_names.clone(), - "", - &Statistic::Sum, - )) - }) - .ok()?; - return Some(( - KeyByLabelNames::new(label_names), - QueryResult::vector(results, query_time), - )); - } - - // Vector–vector - let (lhs_plan, lhs_labels) = self.build_arm_logical_plan(lhs, time)?; - let (rhs_plan, _) = self.build_arm_logical_plan(rhs, time)?; - let combined = build_binary_vector_plan(lhs_plan, rhs_plan, op, lhs_labels.clone()).ok()?; - let results = tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(self.execute_logical_plan( - combined, - lhs_labels.clone(), - "", - &Statistic::Sum, - )) - }) - .ok()?; - let output_labels = KeyByLabelNames::new(lhs_labels); - Some((output_labels, QueryResult::vector(results, query_time))) - } - - /// Applies a PromQL binary arithmetic operator to two f64 values. - fn apply_range_binary_op( - op: &promql_parser::parser::token::TokenType, - lhs: f64, - rhs: f64, - ) -> f64 { - use promql_parser::parser::token::{T_ADD, T_DIV, T_MOD, T_MUL, T_POW, T_SUB}; - match op.id() { - id if id == T_ADD => lhs + rhs, - id if id == T_SUB => lhs - rhs, - id if id == T_MUL => lhs * rhs, - id if id == T_DIV => lhs / rhs, - id if id == T_MOD => lhs % rhs, - id if id == T_POW => lhs.powf(rhs), - _ => f64::NAN, - } - } - - /// Recursively builds a range execution context for one arm of a binary arithmetic expression. - fn build_arm_range_context( - &self, - arm_ast: &promql_parser::parser::Expr, - start: f64, - end: f64, - step: f64, - ) -> Option<(RangeQueryExecutionContext, Vec)> { - use promql_parser::parser::Expr; - - match arm_ast { - Expr::NumberLiteral(_) => None, // caller handles scalars - Expr::Paren(paren) => self.build_arm_range_context(&paren.expr, start, end, step), - other => { - let config = self.find_query_config_promql_structural(other)?; - let base_context = - self.build_query_execution_context_from_ast(other, config, end)?; - let label_names = base_context.metadata.query_output_labels.labels.clone(); - - let start_ms = Self::convert_query_time_to_data_time(start); - let end_ms = Self::convert_query_time_to_data_time(end); - let step_ms = (step * 1000.0) as u64; - - let tumbling_window_ms = self - .streaming_config - .get_aggregation_config(base_context.agg_info.aggregation_id_for_value) - .map(|c| c.window_size * 1000)?; - - self.validate_range_query_params(start_ms, end_ms, step_ms, tumbling_window_ms) - .map_err(|e| { - warn!("Range arm query validation failed: {}", e); - e - }) - .ok()?; - - let lookback_ms = base_context.store_plan.values_query.end_timestamp - - base_context.store_plan.values_query.start_timestamp; - - let buckets_per_step = (step_ms / tumbling_window_ms) as usize; - let lookback_bucket_count = (lookback_ms / tumbling_window_ms) as usize; - - let mut extended_store_plan = base_context.store_plan.clone(); - extended_store_plan.values_query.start_timestamp = - start_ms.saturating_sub(lookback_ms); - extended_store_plan.values_query.end_timestamp = end_ms; - extended_store_plan.values_query.is_exact_query = false; - - let range_context = RangeQueryExecutionContext { - base: QueryExecutionContext { - store_plan: extended_store_plan, - ..base_context - }, - range_params: RangeQueryParams { - start: start_ms, - end: end_ms, - step: step_ms, - }, - buckets_per_step, - lookback_bucket_count, - tumbling_window_ms, - }; - - Some((range_context, label_names)) - } - } - } - - /// Handles a binary arithmetic PromQL expression for range queries. - /// - /// Evaluates each arm independently over the full range, then joins the - /// resulting series by label key and applies the arithmetic operator - /// sample-by-sample at matching timestamps. - fn handle_binary_expr_range_promql( - &self, - ast: &promql_parser::parser::Expr, - start: f64, - end: f64, - step: f64, - ) -> Option<(KeyByLabelNames, QueryResult)> { - use promql_parser::parser::Expr; - - let binary = match ast { - Expr::Binary(b) => b, - _ => return None, - }; - - let lhs = binary.lhs.as_ref(); - let rhs = binary.rhs.as_ref(); - let op = &binary.op; - - // Scalar case: either side may be a numeric literal - let scalar_case: Option<(f64, &Expr, bool)> = match (lhs, rhs) { - (_, Expr::NumberLiteral(nl)) => Some((nl.val, lhs, false)), - (Expr::NumberLiteral(nl), _) => Some((nl.val, rhs, true)), - _ => None, - }; - if let Some((scalar, vector_arm, scalar_on_left)) = scalar_case { - let (ctx, labels) = self.build_arm_range_context(vector_arm, start, end, step)?; - let results = self.execute_range_query_pipeline(&ctx).ok()?; - let combined: Vec = results - .into_iter() - .map(|mut elem| { - for s in &mut elem.samples { - s.value = if scalar_on_left { - Self::apply_range_binary_op(op, scalar, s.value) - } else { - Self::apply_range_binary_op(op, s.value, scalar) - }; - } - elem - }) - .collect(); - return Some((KeyByLabelNames::new(labels), QueryResult::matrix(combined))); - } - - // Vector-vector: evaluate both arms, join by label key, apply op per matching timestamp - let (lhs_ctx, lhs_labels) = self.build_arm_range_context(lhs, start, end, step)?; - let (rhs_ctx, _) = self.build_arm_range_context(rhs, start, end, step)?; - let lhs_results = self.execute_range_query_pipeline(&lhs_ctx).ok()?; - let rhs_results = self.execute_range_query_pipeline(&rhs_ctx).ok()?; - - // Build lookup: label_key -> {timestamp -> value} for rhs - let mut rhs_map: HashMap> = HashMap::new(); - for elem in rhs_results { - let ts_map: HashMap = elem - .samples - .iter() - .map(|s| (s.timestamp, s.value)) - .collect(); - rhs_map.insert(elem.labels, ts_map); - } - - let mut combined: Vec = Vec::new(); - for lhs_elem in lhs_results { - if let Some(rhs_ts_map) = rhs_map.get(&lhs_elem.labels) { - let mut new_elem = RangeVectorElement::new(lhs_elem.labels.clone()); - for s in &lhs_elem.samples { - if let Some(&rhs_val) = rhs_ts_map.get(&s.timestamp) { - new_elem.add_sample( - s.timestamp, - Self::apply_range_binary_op(op, s.value, rhs_val), - ); - } - } - if !new_elem.samples.is_empty() { - combined.push(new_elem); - } - } - } - - let output_labels = KeyByLabelNames::new(lhs_labels); - Some((output_labels, QueryResult::matrix(combined))) - } - /// Formats unformatted results into final InstantVectorElement format /// For topk queries (when enabled), sorts by value and prepends metric name to keys fn format_final_results( @@ -1544,64 +852,6 @@ impl SimpleEngine { .collect() } - fn sql_get_is_collapsable( - &self, - temporal_aggregation: &AggregationInfo, - spatial_aggregation: &AggregationInfo, - ) -> bool { - match spatial_aggregation.get_name() { - "SUM" => matches!( - temporal_aggregation.get_name(), - "SUM" | "COUNT" // Note: "increase" and "rate" are commented out in Python - ), - "MIN" => temporal_aggregation.get_name() == "MIN", - "MAX" => temporal_aggregation.get_name() == "MAX", - _ => false, - } - } - - /// Extract QueryRequirements from a parsed PromQL match result. - /// Used as the fallback path when no query_configs entry is found. - fn build_query_requirements_promql( - &self, - match_result: &PromQLMatchResult, - query_pattern_type: QueryPatternType, - ) -> QueryRequirements { - let (metric, spatial_filter) = get_metric_and_spatial_filter(match_result); - - let statistics = get_statistics_to_compute(query_pattern_type, match_result); - - let data_range_ms = match query_pattern_type { - QueryPatternType::OnlySpatial => None, - _ => match_result - .get_range_duration() - .map(|d| d.num_seconds() as u64 * 1000), - }; - - let all_labels = match &self.inference_config.schema { - SchemaConfig::PromQL(schema) => schema - .get_labels(&metric) - .cloned() - .unwrap_or_else(KeyByLabelNames::empty), - _ => KeyByLabelNames::empty(), - }; - - let grouping_labels = match query_pattern_type { - QueryPatternType::OnlyTemporal => all_labels, - QueryPatternType::OnlySpatial | QueryPatternType::OneTemporalOneSpatial => { - get_spatial_aggregation_output_labels(match_result, &all_labels) - } - }; - - QueryRequirements { - metric, - statistics, - data_range_ms, - grouping_labels, - spatial_filter_normalized: normalize_spatial_filter(&spatial_filter), - } - } - /// Parse a lowercase aggregation name into exactly one `Statistic`. /// /// Returns `None` (with a warning) if the name is not a recognised @@ -1628,60 +878,6 @@ impl SimpleEngine { stats.into_iter().next() } - /// Extract QueryRequirements from a parsed SQL match result. - /// Used as the fallback path when no query_configs entry is found. - fn build_query_requirements_sql( - &self, - match_result: &SQLQuery, - query_pattern_type: QueryPatternType, - ) -> QueryRequirements { - let query_data = match_result - .outer_data() - .expect("build_query_requirements_sql called on valid SQLQuery"); - let metric = query_data.metric.clone(); - - let statistic_name = match query_pattern_type { - QueryPatternType::OneTemporalOneSpatial => match_result - .inner_data() - .expect("OneTemporalOneSpatial pattern guarantees inner_data is present") - .aggregation_info - .get_name() - .to_lowercase(), - _ => query_data.aggregation_info.get_name().to_lowercase(), - }; - - let statistics: Vec = Self::parse_single_statistic(&statistic_name) - .into_iter() - .collect(); - - let data_range_ms = match query_pattern_type { - QueryPatternType::OnlySpatial => None, - QueryPatternType::OnlyTemporal => { - let scrape_intervals = query_data.time_info.clone().get_duration() as u64; - Some(scrape_intervals * self.prometheus_scrape_interval * 1000) - } - QueryPatternType::OneTemporalOneSpatial => { - let scrape_intervals = match_result - .inner_data() - .expect("OneTemporalOneSpatial pattern guarantees inner_data is present") - .time_info - .clone() - .get_duration() as u64; - Some(scrape_intervals * self.prometheus_scrape_interval * 1000) - } - }; - - let grouping_labels = KeyByLabelNames::new(query_data.labels.clone().into_iter().collect()); - - QueryRequirements { - metric, - statistics, - data_range_ms, - grouping_labels, - spatial_filter_normalized: normalize_spatial_filter(""), - } - } - fn get_aggregation_id_info( &self, query_config: &QueryConfig, @@ -1762,15 +958,6 @@ impl SimpleEngine { }) } - pub fn handle_query_sql( - &self, - query: String, - time: f64, - ) -> Option<(KeyByLabelNames, QueryResult)> { - let context = self.build_query_execution_context_sql(query, time)?; - self.execute_context(context, false) - } - /// Execute the query pipeline for an already-built context. /// /// Shared by `handle_query_sql`, `handle_query_elastic`, and `handle_query_promql`. @@ -1792,358 +979,6 @@ impl SimpleEngine { )) } - pub fn build_query_execution_context_sql( - &self, - query: String, - time: f64, - ) -> Option { - // Get SQL schema from inference config - let schema = match &self.inference_config.schema { - SchemaConfig::SQL(sql_schema) => sql_schema.clone(), - SchemaConfig::PromQL(_) => { - warn!("SQL query requested but config has PromQL schema"); - return None; - } - &SchemaConfig::ElasticQueryDSL => todo!(), - SchemaConfig::ElasticSQL(sql_schema) => sql_schema.clone(), - }; - - let statements = parser::parse_sql(&GenericDialect {}, query.as_str()).unwrap(); - let query_data = SQLPatternParser::new(&schema, time).parse_query(&statements); - - let query_data = match query_data { - Some(data) => data, - None => { - debug!("Could not parse query"); - return None; - } - }; - - let matcher = SQLPatternMatcher::new(schema, self.prometheus_scrape_interval as f64); - let match_result = matcher.query_info_to_pattern(&query_data); - - debug!("Match result: {:?}", match_result); - debug!("Validity: {}", match_result.is_valid()); - - if !match_result.is_valid() { - return None; - } - - // Handle SpatioTemporal queries separately - they bypass QueryPatternType mapping - if match_result.query_type == vec![QueryType::SpatioTemporal] { - let query_time = Self::convert_query_time_to_data_time( - query_data.time_info.get_start() + query_data.time_info.get_duration(), - ); - return self.build_spatiotemporal_context(&match_result, query_time, &query_data); - } - - let query_pattern_type = match &match_result.query_type[..] { - [x] => match x { - QueryType::Spatial => QueryPatternType::OnlySpatial, - QueryType::TemporalGeneric => QueryPatternType::OnlyTemporal, - QueryType::TemporalQuantile => QueryPatternType::OnlyTemporal, - QueryType::SpatioTemporal => unreachable!("SpatioTemporal handled above"), - }, - [x, y] => match (x, y) { - (QueryType::Spatial, QueryType::TemporalGeneric) => { - QueryPatternType::OneTemporalOneSpatial - } - (QueryType::Spatial, QueryType::TemporalQuantile) => { - QueryPatternType::OneTemporalOneSpatial - } - _ => panic!("Unsupported query type found"), - }, - _ => panic!("Unsupported query type found"), - }; - - // For nested queries (spatial of temporal), the outer query has no time clause, - // so we need to use the inner (temporal) query's time_info to compute query_time - let query_time = match query_pattern_type { - QueryPatternType::OneTemporalOneSpatial => { - let inner_time_info = &match_result.inner_data()?.time_info; - Self::convert_query_time_to_data_time( - inner_time_info.get_start() + inner_time_info.get_duration(), - ) - } - _ => Self::convert_query_time_to_data_time( - query_data.time_info.get_start() + query_data.time_info.get_duration(), - ), - }; - - // self.handle_sql_temporal_aggregation( - // query_config, - // &match_result, - // query_time, - // query_pattern_type, - // ) - // } - - // fn handle_sql_temporal_aggregation( - // &self, - // query_config: &QueryConfig, - // match_result: &SQLQuery, - // query_time: u64, - // query_pattern_type: QueryPatternType, - // ) -> Option<(KeyByLabelNames, QueryResult)> { - // Labels - - let query_output_labels = match &match_result.query_type.len() { - // Potentially change SQLQueryType - 1 => { - // For non-nested queries, output associated labels - let labels = &match_result.outer_data()?.labels; - - KeyByLabelNames::new(labels.clone().into_iter().collect()) - } - 2 => { - // Extract spatial aggregation output labels using AST-based approach - let temporal_labels = &match_result.inner_data()?.labels; - let spatial_labels = &match_result.outer_data()?.labels; - - let temporal_aggregation = &match_result.inner_data()?.aggregation_info; - let spatial_aggregation = &match_result.outer_data()?.aggregation_info; - - match self.sql_get_is_collapsable(temporal_aggregation, spatial_aggregation) { - // If false: get all labels, which are all temporal labels. If true, get only spatial labels - false => KeyByLabelNames::new(temporal_labels.clone().into_iter().collect()), - true => KeyByLabelNames::new(spatial_labels.clone().into_iter().collect()), - } - } - _ => { - warn!("Invalid query type: {}", query_pattern_type); - KeyByLabelNames::new(Vec::new()) - } - }; - - // Statistic - determine based on query pattern type - let statistic_name = match query_pattern_type { - QueryPatternType::OnlyTemporal => { - // Use the temporal aggregation (first subquery) - match_result - .outer_data()? - .aggregation_info - .get_name() - .to_lowercase() - } - QueryPatternType::OneTemporalOneSpatial => { - // Use the temporal aggregation (second subquery contains temporal) - match_result - .inner_data()? - .aggregation_info - .get_name() - .to_lowercase() - } - QueryPatternType::OnlySpatial => { - // Use the spatial aggregation (first subquery) - match_result - .outer_data()? - .aggregation_info - .get_name() - .to_lowercase() - } - }; - - let statistic_to_compute = Self::parse_single_statistic(&statistic_name)?; - - let query_kwargs = self - .build_query_kwargs_sql(&statistic_to_compute, &match_result) - .map_err(|e| { - warn!("{}", e); - e - }) - .ok()?; - - // Create query metadata - let metadata = QueryMetadata { - query_output_labels: query_output_labels.clone(), - statistic_to_compute, - query_kwargs: query_kwargs.clone(), - }; - - // Time - let timestamps = - self.calculate_query_timestamps_sql(query_time, query_pattern_type, &match_result); - - // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. - let agg_info: AggregationIdInfo = if let Some(config) = - self.find_query_config_sql(&query_data) - { - self.get_aggregation_id_info(config) - .map_err(|e| { - warn!("{}", e); - e - }) - .ok()? - } else { - warn!("No query_config entry for SQL query. Attempting capability-based matching."); - let requirements = self.build_query_requirements_sql(&match_result, query_pattern_type); - self.streaming_config - .find_compatible_aggregation(&requirements)? - }; - - let metric = &match_result.outer_data()?.metric; - - let spatial_filter = if query_pattern_type == QueryPatternType::OneTemporalOneSpatial { - match_result - .outer_data()? - .labels - .iter() - .cloned() - .collect::>() - .join(",") - } else { - String::new() - }; - - let do_merge = query_pattern_type == QueryPatternType::OnlyTemporal - || query_pattern_type == QueryPatternType::OneTemporalOneSpatial; - - self.build_sql_execution_context_tail( - metric, - ×tamps, - metadata, - agg_info, - do_merge, - spatial_filter, - query_time, - ) - } - - /// Shared context-building tail for both SQL context builders. - /// - /// Called by `build_query_execution_context_sql` and `build_spatiotemporal_context` - /// after labels, statistic, metadata, timestamps, and `agg_info` are resolved. - /// Builds the query plan, derives grouping/aggregated labels, and returns the - /// final `QueryExecutionContext`. - #[allow(clippy::too_many_arguments)] - fn build_sql_execution_context_tail( - &self, - metric: &str, - timestamps: &QueryTimestamps, - metadata: QueryMetadata, - agg_info: AggregationIdInfo, - do_merge: bool, - spatial_filter: String, - query_time: u64, - ) -> Option { - let query_plan = self - .create_store_query_plan(metric, timestamps, &agg_info) - .map_err(|e| { - warn!("Failed to create store query plan: {}", e); - e - }) - .ok()?; - - let grouping_labels = self - .streaming_config - .get_aggregation_config(agg_info.aggregation_id_for_value) - .map(|config| config.grouping_labels.clone()) - .unwrap_or_else(|| metadata.query_output_labels.clone()); - - let aggregated_labels = self - .streaming_config - .get_aggregation_config(agg_info.aggregation_id_for_key) - .map(|config| config.aggregated_labels.clone()) - .unwrap_or_else(KeyByLabelNames::empty); - - Some(QueryExecutionContext { - metric: metric.to_string(), - metadata, - store_plan: query_plan, - agg_info, - do_merge, - spatial_filter, - query_time, - grouping_labels, - aggregated_labels, - }) - } - - /// Build execution context for SpatioTemporal queries. - /// These queries span multiple scrape intervals but GROUP BY a subset of labels. - fn build_spatiotemporal_context( - &self, - match_result: &SQLQuery, - query_time: u64, - query_data: &SQLQueryData, - ) -> Option { - // Output labels are the GROUP BY columns (subset of all labels) - let query_output_labels = KeyByLabelNames::new( - match_result - .outer_data()? - .labels - .clone() - .into_iter() - .collect(), - ); - - // Get the statistic from the aggregation - let statistic_name = match_result - .outer_data()? - .aggregation_info - .get_name() - .to_lowercase(); - - let statistic_to_compute = Self::parse_single_statistic(&statistic_name)?; - - let query_kwargs = self - .build_query_kwargs_sql(&statistic_to_compute, match_result) - .map_err(|e| { - warn!("{}", e); - e - }) - .ok()?; - - let metadata = QueryMetadata { - query_output_labels: query_output_labels.clone(), - statistic_to_compute, - query_kwargs: query_kwargs.clone(), - }; - - // Calculate timestamps - similar to OnlyTemporal - let end_timestamp = - self.validate_and_align_end_timestamp(query_time, QueryPatternType::OnlyTemporal); - let scrape_intervals = match_result.outer_data()?.time_info.get_duration() as u64; - let start_timestamp = - end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000); - - let timestamps = QueryTimestamps { - start_timestamp, - end_timestamp, - }; - - // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. - let agg_info: AggregationIdInfo = if let Some(config) = - self.find_query_config_sql(query_data) - { - self.get_aggregation_id_info(config) - .map_err(|e| { - warn!("{}", e); - e - }) - .ok()? - } else { - warn!( - "No query_config entry for SQL spatio-temporal query. Attempting capability-based matching." - ); - let requirements = - self.build_query_requirements_sql(match_result, QueryPatternType::OnlyTemporal); - self.streaming_config - .find_compatible_aggregation(&requirements)? - }; - let metric = &match_result.outer_data()?.metric; - - self.build_sql_execution_context_tail( - metric, - ×tamps, - metadata, - agg_info, - true, - String::new(), - query_time, - ) - } - /// Handle a query following Python's unified architecture // pub async fn handle_query( pub fn handle_query(&self, query: String, time: f64) -> Option<(KeyByLabelNames, QueryResult)> { @@ -2155,625 +990,6 @@ impl SimpleEngine { } } - pub fn handle_query_elastic( - &self, - query: String, - time: f64, - ) -> Option<(KeyByLabelNames, QueryResult)> { - let context = self.build_query_execution_context_elastic(query, time)?; - debug!( - "Built execution context for ElasticSearch query {:?}", - context - ); - self.execute_context(context, false) - } - - pub fn build_query_execution_context_elastic( - &self, - query: String, - time: f64, - ) -> Option { - let query_time = Self::convert_query_time_to_data_time(time); - - // 1. Parse query DSL somehow. Elasticsearch DSL crate does not support deserializing, but maybe can use Opensearch instead? - // 2. Determine whether query is supported using some AST representation or hardcoded pattern matching. - let query_pattern: EsDslQueryPattern = - parse_and_classify(&query).unwrap_or(EsDslQueryPattern::Unknown); - match query_pattern { - EsDslQueryPattern::Unknown => { - debug!("Could not parse query into known pattern"); - return None; - } - _ => { - debug!("Parsed query pattern: {:?}", query_pattern); - } - } - - // 3. Convert parsed query into execution context components (labels, statistic, kwargs, metadata, store query plan, etc.) - - // TODO: Figure out how to handle query configuration for ElasticSearch queries. - let query_config = self.find_query_config(&query)?; - let agg_info = self - .get_aggregation_id_info(query_config) - .map_err(|e| { - warn!("{}", e); - e - }) - .ok()?; - - let do_merge = true; // No "instant" queries in ElasticSearch supported for now, so we always need to merge. - - let (metric, query_metadata) = self.build_query_metadata_elastic(&query_pattern)?; - - let spatial_filter = String::new(); // Placeholder - extract from query if applicable - - // TODO: Need way to parse ES DSL "date math". - let timestamps = self.resolve_query_time_range_elastic(query_time, query_pattern); - - let query_plan = self - .create_store_query_plan(&metric, ×tamps, &agg_info) - .map_err(|e| { - warn!("Failed to create store query plan: {}", e); - e - }) - .ok()?; - - let grouping_labels = self - .streaming_config - .get_aggregation_config(agg_info.aggregation_id_for_value) - .map(|config| config.grouping_labels.clone()) - .unwrap_or_else(|| query_metadata.query_output_labels.clone()); - - let aggregated_labels = self - .streaming_config - .get_aggregation_config(agg_info.aggregation_id_for_key) - .map(|config| config.aggregated_labels.clone()) - .unwrap_or_else(KeyByLabelNames::empty); - - Some(QueryExecutionContext { - metric, - metadata: query_metadata, - store_plan: query_plan.clone(), - agg_info: agg_info.clone(), - do_merge, - spatial_filter, - query_time, - grouping_labels, - aggregated_labels, - }) - } - - fn build_query_metadata_elastic( - &self, - query_pattern: &EsDslQueryPattern, - ) -> Option<(String, QueryMetadata)> { - // Constructs QueryMetadata based on the parsed ES DSL query pattern. This includes determining the - // metric to query, the statistic to compute, and any relevant query kwargs (e.g. quantile value for percentiles). - - // Figure out aggregation type and what labels are included in output. - // By default, we only include grouping labels in the output for ES DSL. - - // Take first aggregation by default since current engine doesn't support multiple aggregations in a single query. - let aggregation = query_pattern.get_metric_aggs()?.first()?.clone(); - - // By default, we only include grouping labels in the output for ES DSL. - let query_output_labels = match query_pattern.get_groupby_spec() { - Some(GroupBySpec::Terms { field }) => KeyByLabelNames::new(vec![field.clone()]), - Some(GroupBySpec::MultiTerms { fields }) => KeyByLabelNames::new(fields.to_vec()), - None => KeyByLabelNames::empty(), - }; - - let metric = aggregation.field.clone(); - - // Map ElasticSearch aggregation types to our internal Statistic enum. - let statistic_to_compute = match aggregation.agg_type { - MetricAggType::Percentiles => Statistic::Quantile, - MetricAggType::Avg => Statistic::Rate, - MetricAggType::Sum => Statistic::Sum, - MetricAggType::Min => Statistic::Min, - MetricAggType::Max => Statistic::Max, - }; - - let mut query_kwargs = HashMap::new(); // Placeholder - build based on query and statistic - if aggregation.agg_type == MetricAggType::Percentiles { - // Extract quantile value from aggregation parameters and add to query_kwargs - if let Some(params) = &aggregation.params { - if let Some(percents) = params.get("percents") { - // Get first value from percents array since we only support one quantile argument for now. - let quantile = percents - .as_array() - .and_then(|arr| arr.first()) - .and_then(|v| v.as_f64()); - // ES percentiles are specified as values between 0 and 100, but we want to convert to 0-1 range for our internal representation. - query_kwargs.insert("quantile".to_string(), (quantile? / 100.0).to_string()); - } - } - } - - let metadata = QueryMetadata { - query_output_labels: query_output_labels.clone(), - statistic_to_compute, - query_kwargs: query_kwargs.clone(), - }; - Some((metric, metadata)) - } - - pub fn resolve_query_time_range_elastic( - &self, - query_time: u64, - query_pattern: EsDslQueryPattern, - ) -> QueryTimestamps { - // Resolves the actual start and end timestamps into milliseconds for an ElasticSearch query - // based on the provided query_time and the time range specified in the ES DSL query pattern (if any). - // If no time range is specified, default to entire history up to query_time. - - let mut start_timestamp: u64 = 0; - let mut end_timestamp: u64 = query_time; - - let time_range = query_pattern.get_time_range(); - if let Some(tr) = time_range { - if let Some(resolved_range) = tr.resolve_epoch_millis(query_time as i64) { - debug!( - "Parsed time range from query: start={} end={}", - resolved_range.gte_ms.unwrap_or(0), - resolved_range.lte_ms.unwrap_or(0) - ); - start_timestamp = resolved_range.gte_ms.unwrap_or(0) as u64; - end_timestamp = resolved_range.lte_ms.unwrap_or(query_time as i64) as u64; - } else { - debug!("Failed to resolve time range from query"); - } - }; - - QueryTimestamps { - start_timestamp, - end_timestamp, - } - } - - // /// Try to extract sketch query components from a PromQL query string. - // /// - // /// Attempts the standard AST parser first. If that fails (e.g. for custom - // /// sketch-only functions), falls back to a lightweight regex extraction for - // /// patterns like `func(metric[range])` and `func(number, metric[range])`. - // /// Extract just the sketch function name from a query without full evaluation. - // fn extract_sketch_func_name(&self, query: &str) -> Option { - // self.parse_sketch_query_components(query) - // .map(|c| c.func_name) - // } - - // fn parse_sketch_query_components(&self, query: &str) -> Option { - // // --- Path A: standard PromQL parser + pattern matching --- - // if let Some(components) = self.parse_sketch_via_ast(query) { - // return Some(components); - // } - - // // --- Path B: regex fallback for custom sketch functions --- - // self.parse_sketch_via_regex(query) - // } - - // /// Parse sketch components using the standard PromQL AST parser. - // fn parse_sketch_via_ast(&self, query: &str) -> Option { - // let ast = match promql_parser::parser::parse(query) { - // Ok(ast) => ast, - // Err(_) => return None, - // }; - - // let mut found_match = None; - // for (pattern_type, patterns) in &self.controller_patterns { - // for pattern in patterns { - // let match_result = pattern.matches(&ast); - // if match_result.matches { - // found_match = Some((*pattern_type, match_result)); - // break; - // } - // } - // if found_match.is_some() { - // break; - // } - // } - - // let (query_pattern_type, match_result) = found_match?; - - // if query_pattern_type != QueryPatternType::OnlyTemporal { - // debug!( - // "Sketch query (AST): pattern type {:?} is not OnlyTemporal, skipping for '{}'", - // query_pattern_type, query - // ); - // return None; - // } - - // let func_name = match_result.get_function_name()?; - // promsketch_store::promsketch_func_map(&func_name)?; - - // let (metric, spatial_filter) = get_metric_and_spatial_filter(&match_result); - // let metric = if spatial_filter.is_empty() { - // metric - // } else { - // format!("{}{{{}}}", metric, spatial_filter) - // }; - - // let range_seconds = match_result.get_range_duration()?.num_seconds() as u64; - - // let args = if func_name == "quantile_over_time" { - // self.extract_quantile_param_promql(query_pattern_type, &match_result) - // .and_then(|s| s.parse::().ok()) - // .unwrap_or(0.5) - // } else { - // 0.0 - // }; - - // Some(SketchQueryComponents { - // func_name, - // metric, - // range_seconds, - // args, - // }) - // } - - // /// Regex fallback for custom sketch functions the PromQL parser doesn't know. - // /// - // /// Matches two forms: - // /// - `func_name(metric[duration])` (generic) - // /// - `func_name(number, metric[duration])` (quantile) - // /// - `func_name(metric{filter}[duration])` (with label filter) - // fn parse_sketch_via_regex(&self, query: &str) -> Option { - // use regex::Regex; - - // // quantile form: quantile_over_time(0.5, metric{...}[5m]) - // let quantile_re = - // Regex::new(r"^(\w+)\(\s*([0-9.]+)\s*,\s*(\w+(?:\{[^}]*\})?)\[(\d+)([smhd])\]\s*\)$") - // .ok()?; - - // // generic form: func(metric{...}[5m]) - // let generic_re = - // Regex::new(r"^(\w+)\(\s*(\w+(?:\{[^}]*\})?)\[(\d+)([smhd])\]\s*\)$").ok()?; - - // if let Some(caps) = quantile_re.captures(query.trim()) { - // let func_name = caps[1].to_string(); - // promsketch_store::promsketch_func_map(&func_name)?; - // let args: f64 = caps[2].parse().ok()?; - // let metric = caps[3].to_string(); - // let range_seconds = Self::parse_duration_to_seconds(&caps[4], &caps[5])?; - // debug!( - // "Sketch query (regex/quantile): parsed {} with metric={}, range={}s, args={}", - // func_name, metric, range_seconds, args - // ); - // return Some(SketchQueryComponents { - // func_name, - // metric, - // range_seconds, - // args, - // }); - // } - - // if let Some(caps) = generic_re.captures(query.trim()) { - // let func_name = caps[1].to_string(); - // promsketch_store::promsketch_func_map(&func_name)?; - // let metric = caps[2].to_string(); - // let range_seconds = Self::parse_duration_to_seconds(&caps[3], &caps[4])?; - // debug!( - // "Sketch query (regex/generic): parsed {} with metric={}, range={}s", - // func_name, metric, range_seconds - // ); - // return Some(SketchQueryComponents { - // func_name, - // metric, - // range_seconds, - // args: 0.0, - // }); - // } - - // None - // } - - // /// Convert a numeric value + unit suffix into seconds. - // fn parse_duration_to_seconds(value: &str, unit: &str) -> Option { - // let n: u64 = value.parse().ok()?; - // let multiplier = match unit { - // "s" => 1, - // "m" => 60, - // "h" => 3600, - // "d" => 86400, - // _ => return None, - // }; - // Some(n * multiplier) - // } - - // /// Try to handle a PromQL query via the sketch shortcut path. - // /// Returns Some if the query is sketch-backed and PromSketchStore is available. - // /// Returns None to fall through to the precomputed pipeline. - // fn handle_sketch_query_promql( - // &self, - // query: &str, - // time: f64, - // ) -> Option<(KeyByLabelNames, QueryResult)> { - // let ps = self.promsketch_store.as_ref()?; - - // let components = match self.parse_sketch_query_components(query) { - // Some(c) => c, - // None => { - // debug!( - // "Sketch query: could not parse sketch components from '{}'", - // query - // ); - // return None; - // } - // }; - - // let eval_start = Instant::now(); - - // let query_time = Self::convert_query_time_to_data_time(time); - // let end = query_time; - // let start = end.saturating_sub(components.range_seconds * 1000); - - // debug!( - // "Sketch query: evaluating {}({}) range=[{}, {}] args={}", - // components.func_name, components.metric, start, end, components.args - // ); - - // let results = match ps.eval_matching( - // &components.func_name, - // &components.metric, - // components.args, - // start, - // end, - // ) { - // Ok(r) => r, - // Err(e) => { - // warn!( - // "Sketch query: eval_matching failed for {}({}): {}", - // components.func_name, components.metric, e - // ); - // ps_metrics::SKETCH_QUERIES_TOTAL - // .with_label_values(&["miss"]) - // .inc(); - // return None; - // } - // }; - - // if results.is_empty() { - // debug!( - // "Sketch query: no matching series with data for {}({}), falling through", - // components.func_name, components.metric - // ); - // ps_metrics::SKETCH_QUERIES_TOTAL - // .with_label_values(&["miss"]) - // .inc(); - // return None; - // } - - // ps_metrics::SKETCH_QUERIES_TOTAL - // .with_label_values(&["hit"]) - // .inc(); - // ps_metrics::SKETCH_QUERY_DURATION.observe(eval_start.elapsed().as_secs_f64()); - - // info!( - // "Sketch query: {}({}) returned {} series results", - // components.func_name, - // components.metric, - // results.len() - // ); - - // let elements: Vec = results - // .into_iter() - // .map(|(labels_str, value)| { - // let labels = KeyByLabelValues::new_with_labels(vec![labels_str]); - // InstantVectorElement::new(labels, value) - // }) - // .collect(); - - // let output_labels = KeyByLabelNames::new(vec!["__name__".to_string()]); - // Some((output_labels, QueryResult::vector(elements, query_time))) - // } - - pub fn handle_query_promql( - &self, - query: String, - time: f64, - ) -> Option<(KeyByLabelNames, QueryResult)> { - let query_start_time = Instant::now(); - debug!("Handling query: {} at time {}", query, time); - - // Check for binary arithmetic before attempting single-query dispatch. - // Binary expressions won't have a matching query_config, so we handle them here. - if let Ok(ast) = promql_parser::parser::parse(&query) { - if matches!(&ast, promql_parser::parser::Expr::Binary(_)) { - let result = self.handle_binary_expr_promql(&ast, time); - let total_query_duration = query_start_time.elapsed(); - debug!( - "Binary arithmetic query handling took: {:.2}ms", - total_query_duration.as_secs_f64() * 1000.0 - ); - return result; - } - } - - let context = self.build_query_execution_context_promql(query, time)?; - - debug!( - "Querying store for metric: {}, aggregation_id: {}, range: [{}, {}]", - context.metric, - context.agg_info.aggregation_id_for_value, - context.store_plan.values_query.start_timestamp, - context.store_plan.values_query.end_timestamp - ); - - let result = self.execute_context(context, true); - - // Determine query routing order based on function type. - // USampling functions prefer the precomputed path first (sketch fallback), - // while EHUniv/EHKLL functions prefer the sketch path first. - // let prefer_precomputed = self - // .extract_sketch_func_name(&query) - // .is_some_and(|name| is_usampling_function(&name)); - - // if !prefer_precomputed { - // // Non-USampling sketch functions: try sketch path first - // if let Some(result) = self.handle_sketch_query_promql(&query, time) { - // let total_query_duration = query_start_time.elapsed(); - // debug!( - // "Sketch query handling took: {:.2}ms", - // total_query_duration.as_secs_f64() * 1000.0 - // ); - // return Some(result); - // } - // } - - // // Precomputed pipeline - // let precomputed_result = (|| -> Option<(KeyByLabelNames, QueryResult)> { - // let context = self.build_query_execution_context_promql(query.clone(), time)?; - - // debug!( - // "Querying store for metric: {}, aggregation_id: {}, range: [{}, {}]", - // context.metric, - // context.agg_info.aggregation_id_for_value, - // context.store_plan.values_query.start_timestamp, - // context.store_plan.values_query.end_timestamp - // ); - - // let results = self - // .execute_query_pipeline(&context, true) // PromQL: topk enabled - // .map_err(|e| { - // warn!("Query execution failed: {}", e); - // e - // }) - // .ok()?; - - // Some(( - // context.metadata.query_output_labels, - // QueryResult::vector(results, context.query_time), - // )) - // })(); - - // if precomputed_result.is_some() { - // let total_query_duration = query_start_time.elapsed(); - // debug!( - // "Total query handling took: {:.2}ms", - // total_query_duration.as_secs_f64() * 1000.0 - // ); - // return precomputed_result; - // } - - // // Fallback: USampling functions try sketch if precomputed had no data - // if prefer_precomputed { - // if let Some(result) = self.handle_sketch_query_promql(&query, time) { - // let total_query_duration = query_start_time.elapsed(); - // debug!( - // "Sketch fallback query handling took: {:.2}ms", - // total_query_duration.as_secs_f64() * 1000.0 - // ); - // return Some(result); - // } - // } - - let total_query_duration = query_start_time.elapsed(); - debug!( - "Total query handling took: {:.2}ms (no results)", - total_query_duration.as_secs_f64() * 1000.0 - ); - result - } - - pub fn build_query_execution_context_promql( - &self, - query: String, - time: f64, - ) -> Option { - let query_time = Self::convert_query_time_to_data_time(time); - - // Parse PromQL AST using promql-parser crate - let parse_start_time = Instant::now(); - let ast = match promql_parser::parser::parse(&query) { - Ok(ast) => { - let parse_duration = parse_start_time.elapsed(); - debug!( - "PromQL parsing took: {:.2}ms", - parse_duration.as_secs_f64() * 1000.0 - ); - ast - } - Err(e) => { - warn!("Failed to parse PromQL query '{}': {}", query, e); - return None; - } - }; - - let pattern_match_start_time = Instant::now(); - - let mut found_match = None; - for (pattern_type, patterns) in &self.controller_patterns { - for pattern in patterns { - debug!( - "Trying pattern type: {:?} for query: {}", - pattern_type, query - ); - let match_result = pattern.matches(&ast); - debug!("Match result: {:?}", match_result); - if match_result.matches { - found_match = Some((*pattern_type, match_result)); - break; - } - } - if found_match.is_some() { - break; - } - } - - let (query_pattern_type, match_result) = match found_match { - Some((pt, result)) => { - let pattern_match_duration = pattern_match_start_time.elapsed(); - debug!( - "Pattern matching took: {:.2}ms", - pattern_match_duration.as_secs_f64() * 1000.0 - ); - (pt, result) - } - None => { - warn!("No matching pattern found for query: {}", query); - return None; - } - }; - - debug!("Found matching query config for: {}", query); - - let query_context_start_time = Instant::now(); - - // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. - let agg_info: AggregationIdInfo = if let Some(config) = self.find_query_config(&query) { - self.get_aggregation_id_info(config) - .map_err(|e| { - warn!("{}", e); - e - }) - .ok()? - } else { - warn!( - "No query_config entry for PromQL query '{}'. Attempting capability-based matching.", - query - ); - let requirements = - self.build_query_requirements_promql(&match_result, query_pattern_type); - self.streaming_config - .find_compatible_aggregation(&requirements)? - }; - - let result = self.build_promql_execution_context_tail( - &match_result, - query_pattern_type, - query_time, - agg_info, - ); - - let query_context_duration = query_context_start_time.elapsed(); - debug!( - "[LATENCY] Query context build: {:.2}ms", - query_context_duration.as_secs_f64() * 1000.0 - ); - - result - } - /// Merge precomputed outputs (extracts buckets from timestamped data) fn merge_precomputed_outputs( &self, @@ -3049,67 +1265,6 @@ impl SimpleEngine { Ok(()) } - /// Build execution context for range query - pub fn build_range_query_execution_context_promql( - &self, - query: String, - start: f64, - end: f64, - step: f64, - ) -> Option { - // First, build the base instant query context (reuse existing logic) - // Use 'end' as the reference time for parsing - let base_context = self.build_query_execution_context_promql(query, end)?; - - // Convert to milliseconds - let start_ms = Self::convert_query_time_to_data_time(start); - let end_ms = Self::convert_query_time_to_data_time(end); - let step_ms = (step * 1000.0) as u64; - - // Get window size - let tumbling_window_ms = self - .streaming_config - .get_aggregation_config(base_context.agg_info.aggregation_id_for_value) - .map(|config| config.window_size * 1000)?; - - // Validate parameters - self.validate_range_query_params(start_ms, end_ms, step_ms, tumbling_window_ms) - .map_err(|e| { - warn!("Range query validation failed: {}", e); - e - }) - .ok()?; - - // Calculate lookback from the base context's store plan - let lookback_ms = base_context.store_plan.values_query.end_timestamp - - base_context.store_plan.values_query.start_timestamp; - - let buckets_per_step = (step_ms / tumbling_window_ms) as usize; - let lookback_bucket_count = (lookback_ms / tumbling_window_ms) as usize; - - // Modify the store plan to cover the entire range - let mut extended_store_plan = base_context.store_plan.clone(); - extended_store_plan.values_query.start_timestamp = start_ms.saturating_sub(lookback_ms); - extended_store_plan.values_query.end_timestamp = end_ms; - // Range queries always use range fetch, not exact - extended_store_plan.values_query.is_exact_query = false; - - Some(RangeQueryExecutionContext { - base: QueryExecutionContext { - store_plan: extended_store_plan, - ..base_context - }, - range_params: RangeQueryParams { - start: start_ms, - end: end_ms, - step: step_ms, - }, - buckets_per_step, - lookback_bucket_count, - tumbling_window_ms, - }) - } - // /// Try to handle a PromQL range query via the sketch shortcut path. // /// Returns Some if the query is sketch-backed and PromSketchStore is available. // /// Returns None to fall through to the precomputed pipeline. @@ -3225,106 +1380,6 @@ impl SimpleEngine { // Some((output_labels, QueryResult::matrix(range_elements))) // } - /// Main entry point for range queries - pub fn handle_range_query_promql( - &self, - query: String, - start: f64, - end: f64, - step: f64, - ) -> Option<(KeyByLabelNames, QueryResult)> { - let query_start_time = Instant::now(); - debug!( - "Handling range query: {} from {} to {} step {}", - query, start, end, step - ); - - // Check for binary arithmetic before attempting single-query dispatch. - if let Ok(ast) = promql_parser::parser::parse(&query) { - if matches!(&ast, promql_parser::parser::Expr::Binary(_)) { - let result = self.handle_binary_expr_range_promql(&ast, start, end, step); - let total_duration = query_start_time.elapsed(); - debug!( - "Binary arithmetic range query handling took: {:.2}ms", - total_duration.as_secs_f64() * 1000.0 - ); - return result; - } - } - - let context = self.build_range_query_execution_context_promql(query, start, end, step)?; - - // Execute range query pipeline - let results: Vec = self - .execute_range_query_pipeline(&context) - .map_err(|e| { - warn!("Range query execution failed: {}", e); - e - }) - .ok()?; - - // // Determine query routing order based on function type. - // // USampling functions prefer the precomputed path first (sketch fallback), - // // while EHUniv/EHKLL functions prefer the sketch path first. - // let prefer_precomputed = self - // .extract_sketch_func_name(&query) - // .is_some_and(|name| is_usampling_function(&name)); - - // if !prefer_precomputed { - // // Non-USampling sketch functions: try sketch path first - // if let Some(result) = self.handle_sketch_range_query_promql(&query, start, end, step) { - // let total_duration = query_start_time.elapsed(); - // debug!( - // "Sketch range query handling took: {:.2}ms", - // total_duration.as_secs_f64() * 1000.0 - // ); - // return Some(result); - // } - // } - - // // Precomputed pipeline - // let precomputed_result = (|| -> Option<(KeyByLabelNames, QueryResult)> { - // let context = - // self.build_range_query_execution_context_promql(query.clone(), start, end, step)?; - - // let results: Vec = self - // .execute_range_query_pipeline(&context) - // .map_err(|e| { - // warn!("Range query execution failed: {}", e); - // e - // }) - // .ok()?; - - // Some(( - // context.base.metadata.query_output_labels, - // QueryResult::matrix(results), - // )) - // })(); - - // // Fallback: USampling functions try sketch if precomputed had no data - // if prefer_precomputed { - // if let Some(result) = self.handle_sketch_range_query_promql(&query, start, end, step) { - // let total_duration = query_start_time.elapsed(); - // debug!( - // "Sketch fallback range query handling took: {:.2}ms", - // total_duration.as_secs_f64() * 1000.0 - // ); - // return Some(result); - // } - // } - - let total_duration = query_start_time.elapsed(); - debug!( - "Total range query handling took: {:.2}ms", - total_duration.as_secs_f64() * 1000.0 - ); - - Some(( - context.base.metadata.query_output_labels, - QueryResult::matrix(results), - )) - } - /// Execute the range query pipeline fn execute_range_query_pipeline( &self, diff --git a/asap-query-engine/src/engines/simple_engine/promql.rs b/asap-query-engine/src/engines/simple_engine/promql.rs new file mode 100644 index 0000000..c49e7f0 --- /dev/null +++ b/asap-query-engine/src/engines/simple_engine/promql.rs @@ -0,0 +1,1249 @@ +//! PromQL query language handler for SimpleEngine. +//! +//! Contains all PromQL-specific context building, pattern matching, binary arithmetic +//! dispatch, range-query handling, and query dispatch. + +use super::SimpleEngine; +use super::{ + QueryExecutionContext, QueryMetadata, QueryTimestamps, RangeQueryExecutionContext, + RangeQueryParams, +}; +use crate::data_model::{AggregationIdInfo, KeyByLabelValues, QueryConfig, SchemaConfig}; +use crate::engines::query_result::{QueryResult, RangeVectorElement}; +use asap_types::query_requirements::QueryRequirements; +use asap_types::utils::normalize_spatial_filter; +use promql_utilities::ast_matching::PromQLMatchResult; +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::get_is_collapsable; +use promql_utilities::query_logics::enums::{ + AggregationOperator, PromQLFunction, QueryPatternType, Statistic, +}; +use promql_utilities::query_logics::parsing::{ + get_metric_and_spatial_filter, get_spatial_aggregation_output_labels, get_statistics_to_compute, +}; +use std::collections::HashMap; +use std::time::Instant; +use tracing::{debug, warn}; + +impl SimpleEngine { + /// Calculates start timestamp for PromQL queries + fn calculate_start_timestamp_promql( + &self, + end_timestamp: u64, + query_pattern_type: QueryPatternType, + match_result: &PromQLMatchResult, + ) -> u64 { + match query_pattern_type { + QueryPatternType::OnlyTemporal | QueryPatternType::OneTemporalOneSpatial => { + let range_seconds = match_result.get_range_duration().unwrap().num_seconds() as u64; + end_timestamp - (range_seconds * 1000) + } + QueryPatternType::OnlySpatial => { + end_timestamp - (self.prometheus_scrape_interval * 1000) + } + } + } + + /// Calculates and validates query timestamps for PromQL + fn calculate_query_timestamps_promql( + &self, + query_time: u64, + query_pattern_type: QueryPatternType, + match_result: &PromQLMatchResult, + ) -> QueryTimestamps { + let mut end_timestamp = if let Some(at_modifier) = match_result + .tokens + .get("metric") + .and_then(|t| t.metric.as_ref()) + .and_then(|m| m.at_modifier) + { + at_modifier * 1000 + } else { + query_time + }; + + end_timestamp = self.validate_and_align_end_timestamp(end_timestamp, query_pattern_type); + let start_timestamp = + self.calculate_start_timestamp_promql(end_timestamp, query_pattern_type, match_result); + + QueryTimestamps { + start_timestamp, + end_timestamp, + } + } + + /// Extracts quantile parameter from PromQL match result + fn extract_quantile_param_promql( + &self, + query_pattern_type: QueryPatternType, + match_result: &PromQLMatchResult, + ) -> Option { + let quantile_value = match query_pattern_type { + QueryPatternType::OnlyTemporal | QueryPatternType::OneTemporalOneSpatial => { + match_result + .tokens + .get("function_args") + .and_then(|token| token.function.as_ref()) + .and_then(|func| func.args.first()) + } + QueryPatternType::OnlySpatial => match_result + .tokens + .get("aggregation") + .and_then(|token| token.aggregation.as_ref()) + .and_then(|agg| agg.param.as_ref()), + }; + + quantile_value.map(|s| s.to_string()) + } + + /// Extracts topk k parameter from PromQL match result + fn extract_topk_param( + &self, + query_pattern_type: QueryPatternType, + match_result: &PromQLMatchResult, + ) -> Result { + match query_pattern_type { + QueryPatternType::OnlySpatial => match_result + .tokens + .get("aggregation") + .and_then(|token| token.aggregation.as_ref()) + .and_then(|agg| agg.param.as_ref()) + .map(|s| s.to_string()) + .ok_or_else(|| "Missing k parameter for top-k query".to_string()), + _ => Err(format!( + "Top-k statistic is only supported for OnlySpatial pattern, found {:?}", + query_pattern_type + )), + } + } + + /// Builds query kwargs (quantile, k, etc.) for PromQL queries + fn build_query_kwargs_promql( + &self, + statistic: &Statistic, + query_pattern_type: QueryPatternType, + match_result: &PromQLMatchResult, + ) -> Result, String> { + let mut query_kwargs = HashMap::new(); + + match statistic { + Statistic::Quantile => { + let quantile = self + .extract_quantile_param_promql(query_pattern_type, match_result) + .ok_or_else(|| "Missing quantile parameter for quantile query".to_string())?; + debug!("Extracted quantile value: {:?}", quantile); + query_kwargs.insert("quantile".to_string(), quantile); + } + Statistic::Topk => { + let k = self.extract_topk_param(query_pattern_type, match_result)?; + debug!("Extracted k value: {:?}", k); + query_kwargs.insert("k".to_string(), k); + } + _ => {} + } + + Ok(query_kwargs) + } + + /// Finds a query config by structurally comparing `arm_ast` against each + /// config's parsed query. + /// + /// Both the arm AST and each config's query string are first normalized to + /// the canonical `Display` form produced by `promql_parser`. This ensures + /// that user-written variants like `"sum(x) by (lbl)"` and the parser's + /// canonical `"sum by (lbl) (x)"` compare equal. + pub fn find_query_config_promql_structural( + &self, + arm_ast: &promql_parser::parser::Expr, + ) -> Option<&QueryConfig> { + let arm_canonical = format!("{}", arm_ast); + self.inference_config.query_configs.iter().find(|config| { + let config_canonical = promql_parser::parser::parse(&config.query) + .map(|ast| format!("{}", ast)) + .unwrap_or_default(); + config_canonical == arm_canonical + }) + } + + /// Variant of `build_query_execution_context_promql` that accepts a pre-parsed + /// AST node and a pre-found `QueryConfig`, avoiding redundant parsing and lookup. + pub fn build_query_execution_context_from_ast( + &self, + arm_ast: &promql_parser::parser::Expr, + query_config: &QueryConfig, + time: f64, + ) -> Option { + let query_time = Self::convert_query_time_to_data_time(time); + + let mut found_match = None; + for (pattern_type, patterns) in &self.controller_patterns { + for pattern in patterns { + let match_result = pattern.matches(arm_ast); + if match_result.matches { + found_match = Some((*pattern_type, match_result)); + break; + } + } + if found_match.is_some() { + break; + } + } + + let (query_pattern_type, match_result) = found_match?; + + let agg_info = self + .get_aggregation_id_info(query_config) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()?; + + self.build_promql_execution_context_tail( + &match_result, + query_pattern_type, + query_time, + agg_info, + ) + } + + /// Shared context-building tail for both PromQL context builders. + /// + /// Called by `build_query_execution_context_from_ast` and + /// `build_query_execution_context_promql` after pattern matching and + /// `agg_info` resolution are complete. Computes labels, statistics, + /// kwargs, metadata, query plan, and the final `QueryExecutionContext`. + fn build_promql_execution_context_tail( + &self, + match_result: &PromQLMatchResult, + query_pattern_type: QueryPatternType, + query_time: u64, + agg_info: AggregationIdInfo, + ) -> Option { + let (metric, spatial_filter) = get_metric_and_spatial_filter(match_result); + + let promql_schema = match &self.inference_config.schema { + SchemaConfig::PromQL(schema) => schema, + _ => return None, + }; + let all_labels = match promql_schema.get_labels(&metric).cloned() { + Some(labels) => labels, + None => { + warn!("No metric configuration found for '{}'", metric); + return None; + } + }; + + let mut query_output_labels = match query_pattern_type { + QueryPatternType::OnlyTemporal => all_labels.clone(), + QueryPatternType::OnlySpatial => { + get_spatial_aggregation_output_labels(match_result, &all_labels) + } + QueryPatternType::OneTemporalOneSpatial => { + let temporal_aggregation = match_result.get_function_name().unwrap(); + let spatial_aggregation = match_result.get_aggregation_op().unwrap(); + let collapsable = temporal_aggregation + .parse::() + .ok() + .zip(spatial_aggregation.parse::().ok()) + .is_some_and(|(f, o)| get_is_collapsable(f, o)); + if collapsable { + get_spatial_aggregation_output_labels(match_result, &all_labels) + } else { + all_labels.clone() + } + } + }; + + let timestamps = + self.calculate_query_timestamps_promql(query_time, query_pattern_type, match_result); + + let statistics_to_compute = get_statistics_to_compute(query_pattern_type, match_result); + if statistics_to_compute.len() != 1 { + warn!( + "Expected exactly one statistic to compute, found {}", + statistics_to_compute.len() + ); + return None; + } + let statistic_to_compute = statistics_to_compute.first().unwrap(); + + if *statistic_to_compute == Statistic::Topk { + let mut new_labels = vec!["__name__".to_string()]; + new_labels.extend(query_output_labels.labels); + query_output_labels = KeyByLabelNames::new(new_labels); + } + + let query_kwargs = self + .build_query_kwargs_promql(statistic_to_compute, query_pattern_type, match_result) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()?; + + let metadata = QueryMetadata { + query_output_labels: query_output_labels.clone(), + statistic_to_compute: *statistic_to_compute, + query_kwargs, + }; + + let query_plan = self + .create_store_query_plan(&metric, ×tamps, &agg_info) + .map_err(|e| { + warn!("Failed to create store query plan: {}", e); + e + }) + .ok()?; + + let do_merge = query_pattern_type == QueryPatternType::OnlyTemporal + || query_pattern_type == QueryPatternType::OneTemporalOneSpatial; + + let grouping_labels = self + .streaming_config + .get_aggregation_config(agg_info.aggregation_id_for_value) + .map(|config| config.grouping_labels.clone()) + .unwrap_or_else(|| query_output_labels.clone()); + + let aggregated_labels = self + .streaming_config + .get_aggregation_config(agg_info.aggregation_id_for_key) + .map(|config| config.aggregated_labels.clone()) + .unwrap_or_else(KeyByLabelNames::empty); + + Some(QueryExecutionContext { + metric, + metadata, + store_plan: query_plan, + agg_info, + do_merge, + spatial_filter, + query_time, + grouping_labels, + aggregated_labels, + }) + } + + /// Recursively builds a DataFusion logical plan for one arm of a binary + /// arithmetic expression. + /// + /// - Leaf arm (supported PromQL pattern): look up config structurally, build + /// context, return its `to_logical_plan()` together with the output label names. + /// - Binary arm: recursively build both sub-arms and combine with + /// `build_binary_vector_plan`. + /// - Scalar literal: returns `None` (handled by the caller separately). + fn build_arm_logical_plan( + &self, + arm_ast: &promql_parser::parser::Expr, + time: f64, + ) -> Option<(datafusion::logical_expr::LogicalPlan, Vec)> { + use crate::engines::logical::plan_builder::build_binary_vector_plan; + use promql_parser::parser::Expr; + + match arm_ast { + Expr::NumberLiteral(_) => None, // caller handles scalars + Expr::Paren(paren) => self.build_arm_logical_plan(&paren.expr, time), + Expr::Binary(binary) => { + // Nested binary expression — recurse on both sides + let (lhs_plan, lhs_labels) = self.build_arm_logical_plan(&binary.lhs, time)?; + let (rhs_plan, _) = self.build_arm_logical_plan(&binary.rhs, time)?; + let combined = + build_binary_vector_plan(lhs_plan, rhs_plan, &binary.op, lhs_labels.clone()) + .ok()?; + Some((combined, lhs_labels)) + } + other => { + // Leaf pattern: structural config lookup + context + plan + let config = self.find_query_config_promql_structural(other)?; + let ctx = self.build_query_execution_context_from_ast(other, config, time)?; + let label_names = ctx.metadata.query_output_labels.labels.clone(); + let plan = ctx.to_logical_plan().ok()?; + Some((plan, label_names)) + } + } + } + + /// Handles a binary arithmetic PromQL expression by building a combined + /// DataFusion plan (vector–vector join or scalar projection) and executing it. + /// + /// Returns `None` if any arm is not acceleratable (caller falls back to Prometheus). + fn handle_binary_expr_promql( + &self, + ast: &promql_parser::parser::Expr, + time: f64, + ) -> Option<(KeyByLabelNames, QueryResult)> { + use crate::engines::logical::plan_builder::{build_binary_vector_plan, build_scalar_plan}; + use promql_parser::parser::Expr; + + let query_time = Self::convert_query_time_to_data_time(time); + + let binary = match ast { + Expr::Binary(b) => b, + _ => return None, + }; + + let lhs = binary.lhs.as_ref(); + let rhs = binary.rhs.as_ref(); + let op = &binary.op; + + // Scalar case: either side may be a numeric literal + let scalar_case: Option<(f64, &Expr, bool)> = match (lhs, rhs) { + (_, Expr::NumberLiteral(nl)) => Some((nl.val, lhs, false)), + (Expr::NumberLiteral(nl), _) => Some((nl.val, rhs, true)), + _ => None, + }; + if let Some((scalar, vector_arm, scalar_on_left)) = scalar_case { + let (vector_plan, label_names) = self.build_arm_logical_plan(vector_arm, time)?; + let combined = + build_scalar_plan(vector_plan, scalar, op, scalar_on_left, label_names.clone()) + .ok()?; + let results = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(self.execute_logical_plan( + combined, + label_names.clone(), + "", + &Statistic::Sum, + )) + }) + .ok()?; + return Some(( + KeyByLabelNames::new(label_names), + QueryResult::vector(results, query_time), + )); + } + + // Vector–vector + let (lhs_plan, lhs_labels) = self.build_arm_logical_plan(lhs, time)?; + let (rhs_plan, _) = self.build_arm_logical_plan(rhs, time)?; + let combined = build_binary_vector_plan(lhs_plan, rhs_plan, op, lhs_labels.clone()).ok()?; + let results = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(self.execute_logical_plan( + combined, + lhs_labels.clone(), + "", + &Statistic::Sum, + )) + }) + .ok()?; + let output_labels = KeyByLabelNames::new(lhs_labels); + Some((output_labels, QueryResult::vector(results, query_time))) + } + + /// Applies a PromQL binary arithmetic operator to two f64 values. + fn apply_range_binary_op( + op: &promql_parser::parser::token::TokenType, + lhs: f64, + rhs: f64, + ) -> f64 { + use promql_parser::parser::token::{T_ADD, T_DIV, T_MOD, T_MUL, T_POW, T_SUB}; + match op.id() { + id if id == T_ADD => lhs + rhs, + id if id == T_SUB => lhs - rhs, + id if id == T_MUL => lhs * rhs, + id if id == T_DIV => lhs / rhs, + id if id == T_MOD => lhs % rhs, + id if id == T_POW => lhs.powf(rhs), + _ => f64::NAN, + } + } + + /// Recursively builds a range execution context for one arm of a binary arithmetic expression. + fn build_arm_range_context( + &self, + arm_ast: &promql_parser::parser::Expr, + start: f64, + end: f64, + step: f64, + ) -> Option<(RangeQueryExecutionContext, Vec)> { + use promql_parser::parser::Expr; + + match arm_ast { + Expr::NumberLiteral(_) => None, // caller handles scalars + Expr::Paren(paren) => self.build_arm_range_context(&paren.expr, start, end, step), + other => { + let config = self.find_query_config_promql_structural(other)?; + let base_context = + self.build_query_execution_context_from_ast(other, config, end)?; + let label_names = base_context.metadata.query_output_labels.labels.clone(); + + let start_ms = Self::convert_query_time_to_data_time(start); + let end_ms = Self::convert_query_time_to_data_time(end); + let step_ms = (step * 1000.0) as u64; + + let tumbling_window_ms = self + .streaming_config + .get_aggregation_config(base_context.agg_info.aggregation_id_for_value) + .map(|c| c.window_size * 1000)?; + + self.validate_range_query_params(start_ms, end_ms, step_ms, tumbling_window_ms) + .map_err(|e| { + warn!("Range arm query validation failed: {}", e); + e + }) + .ok()?; + + let lookback_ms = base_context.store_plan.values_query.end_timestamp + - base_context.store_plan.values_query.start_timestamp; + + let buckets_per_step = (step_ms / tumbling_window_ms) as usize; + let lookback_bucket_count = (lookback_ms / tumbling_window_ms) as usize; + + let mut extended_store_plan = base_context.store_plan.clone(); + extended_store_plan.values_query.start_timestamp = + start_ms.saturating_sub(lookback_ms); + extended_store_plan.values_query.end_timestamp = end_ms; + extended_store_plan.values_query.is_exact_query = false; + + let range_context = RangeQueryExecutionContext { + base: QueryExecutionContext { + store_plan: extended_store_plan, + ..base_context + }, + range_params: RangeQueryParams { + start: start_ms, + end: end_ms, + step: step_ms, + }, + buckets_per_step, + lookback_bucket_count, + tumbling_window_ms, + }; + + Some((range_context, label_names)) + } + } + } + + /// Handles a binary arithmetic PromQL expression for range queries. + /// + /// Evaluates each arm independently over the full range, then joins the + /// resulting series by label key and applies the arithmetic operator + /// sample-by-sample at matching timestamps. + fn handle_binary_expr_range_promql( + &self, + ast: &promql_parser::parser::Expr, + start: f64, + end: f64, + step: f64, + ) -> Option<(KeyByLabelNames, QueryResult)> { + use promql_parser::parser::Expr; + + let binary = match ast { + Expr::Binary(b) => b, + _ => return None, + }; + + let lhs = binary.lhs.as_ref(); + let rhs = binary.rhs.as_ref(); + let op = &binary.op; + + // Scalar case: either side may be a numeric literal + let scalar_case: Option<(f64, &Expr, bool)> = match (lhs, rhs) { + (_, Expr::NumberLiteral(nl)) => Some((nl.val, lhs, false)), + (Expr::NumberLiteral(nl), _) => Some((nl.val, rhs, true)), + _ => None, + }; + if let Some((scalar, vector_arm, scalar_on_left)) = scalar_case { + let (ctx, labels) = self.build_arm_range_context(vector_arm, start, end, step)?; + let results = self.execute_range_query_pipeline(&ctx).ok()?; + let combined: Vec = results + .into_iter() + .map(|mut elem| { + for s in &mut elem.samples { + s.value = if scalar_on_left { + Self::apply_range_binary_op(op, scalar, s.value) + } else { + Self::apply_range_binary_op(op, s.value, scalar) + }; + } + elem + }) + .collect(); + return Some((KeyByLabelNames::new(labels), QueryResult::matrix(combined))); + } + + // Vector-vector: evaluate both arms, join by label key, apply op per matching timestamp + let (lhs_ctx, lhs_labels) = self.build_arm_range_context(lhs, start, end, step)?; + let (rhs_ctx, _) = self.build_arm_range_context(rhs, start, end, step)?; + let lhs_results = self.execute_range_query_pipeline(&lhs_ctx).ok()?; + let rhs_results = self.execute_range_query_pipeline(&rhs_ctx).ok()?; + + // Build lookup: label_key -> {timestamp -> value} for rhs + let mut rhs_map: HashMap> = HashMap::new(); + for elem in rhs_results { + let ts_map: HashMap = elem + .samples + .iter() + .map(|s| (s.timestamp, s.value)) + .collect(); + rhs_map.insert(elem.labels, ts_map); + } + + let mut combined: Vec = Vec::new(); + for lhs_elem in lhs_results { + if let Some(rhs_ts_map) = rhs_map.get(&lhs_elem.labels) { + let mut new_elem = RangeVectorElement::new(lhs_elem.labels.clone()); + for s in &lhs_elem.samples { + if let Some(&rhs_val) = rhs_ts_map.get(&s.timestamp) { + new_elem.add_sample( + s.timestamp, + Self::apply_range_binary_op(op, s.value, rhs_val), + ); + } + } + if !new_elem.samples.is_empty() { + combined.push(new_elem); + } + } + } + + let output_labels = KeyByLabelNames::new(lhs_labels); + Some((output_labels, QueryResult::matrix(combined))) + } + + /// Extract QueryRequirements from a parsed PromQL match result. + /// Used as the fallback path when no query_configs entry is found. + fn build_query_requirements_promql( + &self, + match_result: &PromQLMatchResult, + query_pattern_type: QueryPatternType, + ) -> QueryRequirements { + let (metric, spatial_filter) = get_metric_and_spatial_filter(match_result); + + let statistics = get_statistics_to_compute(query_pattern_type, match_result); + + let data_range_ms = match query_pattern_type { + QueryPatternType::OnlySpatial => None, + _ => match_result + .get_range_duration() + .map(|d| d.num_seconds() as u64 * 1000), + }; + + let all_labels = match &self.inference_config.schema { + SchemaConfig::PromQL(schema) => schema + .get_labels(&metric) + .cloned() + .unwrap_or_else(KeyByLabelNames::empty), + _ => KeyByLabelNames::empty(), + }; + + let grouping_labels = match query_pattern_type { + QueryPatternType::OnlyTemporal => all_labels, + QueryPatternType::OnlySpatial | QueryPatternType::OneTemporalOneSpatial => { + get_spatial_aggregation_output_labels(match_result, &all_labels) + } + }; + + QueryRequirements { + metric, + statistics, + data_range_ms, + grouping_labels, + spatial_filter_normalized: normalize_spatial_filter(&spatial_filter), + } + } + + // /// Try to extract sketch query components from a PromQL query string. + // /// + // /// Attempts the standard AST parser first. If that fails (e.g. for custom + // /// sketch-only functions), falls back to a lightweight regex extraction for + // /// patterns like `func(metric[range])` and `func(number, metric[range])`. + // /// Extract just the sketch function name from a query without full evaluation. + // fn extract_sketch_func_name(&self, query: &str) -> Option { + // self.parse_sketch_query_components(query) + // .map(|c| c.func_name) + // } + + // fn parse_sketch_query_components(&self, query: &str) -> Option { + // // --- Path A: standard PromQL parser + pattern matching --- + // if let Some(components) = self.parse_sketch_via_ast(query) { + // return Some(components); + // } + + // // --- Path B: regex fallback for custom sketch functions --- + // self.parse_sketch_via_regex(query) + // } + + // /// Parse sketch components using the standard PromQL AST parser. + // fn parse_sketch_via_ast(&self, query: &str) -> Option { + // let ast = match promql_parser::parser::parse(query) { + // Ok(ast) => ast, + // Err(_) => return None, + // }; + + // let mut found_match = None; + // for (pattern_type, patterns) in &self.controller_patterns { + // for pattern in patterns { + // let match_result = pattern.matches(&ast); + // if match_result.matches { + // found_match = Some((*pattern_type, match_result)); + // break; + // } + // } + // if found_match.is_some() { + // break; + // } + // } + + // let (query_pattern_type, match_result) = found_match?; + + // if query_pattern_type != QueryPatternType::OnlyTemporal { + // debug!( + // "Sketch query (AST): pattern type {:?} is not OnlyTemporal, skipping for '{}'", + // query_pattern_type, query + // ); + // return None; + // } + + // let func_name = match_result.get_function_name()?; + // promsketch_store::promsketch_func_map(&func_name)?; + + // let (metric, spatial_filter) = get_metric_and_spatial_filter(&match_result); + // let metric = if spatial_filter.is_empty() { + // metric + // } else { + // format!("{}{{{}}}", metric, spatial_filter) + // }; + + // let range_seconds = match_result.get_range_duration()?.num_seconds() as u64; + + // let args = if func_name == "quantile_over_time" { + // self.extract_quantile_param_promql(query_pattern_type, &match_result) + // .and_then(|s| s.parse::().ok()) + // .unwrap_or(0.5) + // } else { + // 0.0 + // }; + + // Some(SketchQueryComponents { + // func_name, + // metric, + // range_seconds, + // args, + // }) + // } + + // /// Regex fallback for custom sketch functions the PromQL parser doesn't know. + // /// + // /// Matches two forms: + // /// - `func_name(metric[duration])` (generic) + // /// - `func_name(number, metric[duration])` (quantile) + // /// - `func_name(metric{filter}[duration])` (with label filter) + // fn parse_sketch_via_regex(&self, query: &str) -> Option { + // use regex::Regex; + + // // quantile form: quantile_over_time(0.5, metric{...}[5m]) + // let quantile_re = + // Regex::new(r"^(\w+)\(\s*([0-9.]+)\s*,\s*(\w+(?:\{[^}]*\})?)\[(\d+)([smhd])\]\s*\)$") + // .ok()?; + + // // generic form: func(metric{...}[5m]) + // let generic_re = + // Regex::new(r"^(\w+)\(\s*(\w+(?:\{[^}]*\})?)\[(\d+)([smhd])\]\s*\)$").ok()?; + + // if let Some(caps) = quantile_re.captures(query.trim()) { + // let func_name = caps[1].to_string(); + // promsketch_store::promsketch_func_map(&func_name)?; + // let args: f64 = caps[2].parse().ok()?; + // let metric = caps[3].to_string(); + // let range_seconds = Self::parse_duration_to_seconds(&caps[4], &caps[5])?; + // debug!( + // "Sketch query (regex/quantile): parsed {} with metric={}, range={}s, args={}", + // func_name, metric, range_seconds, args + // ); + // return Some(SketchQueryComponents { + // func_name, + // metric, + // range_seconds, + // args, + // }); + // } + + // if let Some(caps) = generic_re.captures(query.trim()) { + // let func_name = caps[1].to_string(); + // promsketch_store::promsketch_func_map(&func_name)?; + // let metric = caps[2].to_string(); + // let range_seconds = Self::parse_duration_to_seconds(&caps[3], &caps[4])?; + // debug!( + // "Sketch query (regex/generic): parsed {} with metric={}, range={}s", + // func_name, metric, range_seconds + // ); + // return Some(SketchQueryComponents { + // func_name, + // metric, + // range_seconds, + // args: 0.0, + // }); + // } + + // None + // } + + // /// Convert a numeric value + unit suffix into seconds. + // fn parse_duration_to_seconds(value: &str, unit: &str) -> Option { + // let n: u64 = value.parse().ok()?; + // let multiplier = match unit { + // "s" => 1, + // "m" => 60, + // "h" => 3600, + // "d" => 86400, + // _ => return None, + // }; + // Some(n * multiplier) + // } + + // /// Try to handle a PromQL query via the sketch shortcut path. + // /// Returns Some if the query is sketch-backed and PromSketchStore is available. + // /// Returns None to fall through to the precomputed pipeline. + // fn handle_sketch_query_promql( + // &self, + // query: &str, + // time: f64, + // ) -> Option<(KeyByLabelNames, QueryResult)> { + // let ps = self.promsketch_store.as_ref()?; + + // let components = match self.parse_sketch_query_components(query) { + // Some(c) => c, + // None => { + // debug!( + // "Sketch query: could not parse sketch components from '{}'", + // query + // ); + // return None; + // } + // }; + + // let eval_start = Instant::now(); + + // let query_time = Self::convert_query_time_to_data_time(time); + // let end = query_time; + // let start = end.saturating_sub(components.range_seconds * 1000); + + // debug!( + // "Sketch query: evaluating {}({}) range=[{}, {}] args={}", + // components.func_name, components.metric, start, end, components.args + // ); + + // let results = match ps.eval_matching( + // &components.func_name, + // &components.metric, + // components.args, + // start, + // end, + // ) { + // Ok(r) => r, + // Err(e) => { + // warn!( + // "Sketch query: eval_matching failed for {}({}): {}", + // components.func_name, components.metric, e + // ); + // ps_metrics::SKETCH_QUERIES_TOTAL + // .with_label_values(&["miss"]) + // .inc(); + // return None; + // } + // }; + + // if results.is_empty() { + // debug!( + // "Sketch query: no matching series with data for {}({}), falling through", + // components.func_name, components.metric + // ); + // ps_metrics::SKETCH_QUERIES_TOTAL + // .with_label_values(&["miss"]) + // .inc(); + // return None; + // } + + // ps_metrics::SKETCH_QUERIES_TOTAL + // .with_label_values(&["hit"]) + // .inc(); + // ps_metrics::SKETCH_QUERY_DURATION.observe(eval_start.elapsed().as_secs_f64()); + + // info!( + // "Sketch query: {}({}) returned {} series results", + // components.func_name, + // components.metric, + // results.len() + // ); + + // let elements: Vec = results + // .into_iter() + // .map(|(labels_str, value)| { + // let labels = KeyByLabelValues::new_with_labels(vec![labels_str]); + // InstantVectorElement::new(labels, value) + // }) + // .collect(); + + // let output_labels = KeyByLabelNames::new(vec!["__name__".to_string()]); + // Some((output_labels, QueryResult::vector(elements, query_time))) + // } + + pub fn handle_query_promql( + &self, + query: String, + time: f64, + ) -> Option<(KeyByLabelNames, QueryResult)> { + let query_start_time = Instant::now(); + debug!("Handling query: {} at time {}", query, time); + + // Check for binary arithmetic before attempting single-query dispatch. + // Binary expressions won't have a matching query_config, so we handle them here. + if let Ok(ast) = promql_parser::parser::parse(&query) { + if matches!(&ast, promql_parser::parser::Expr::Binary(_)) { + let result = self.handle_binary_expr_promql(&ast, time); + let total_query_duration = query_start_time.elapsed(); + debug!( + "Binary arithmetic query handling took: {:.2}ms", + total_query_duration.as_secs_f64() * 1000.0 + ); + return result; + } + } + + let context = self.build_query_execution_context_promql(query, time)?; + + debug!( + "Querying store for metric: {}, aggregation_id: {}, range: [{}, {}]", + context.metric, + context.agg_info.aggregation_id_for_value, + context.store_plan.values_query.start_timestamp, + context.store_plan.values_query.end_timestamp + ); + + let result = self.execute_context(context, true); + + // Determine query routing order based on function type. + // USampling functions prefer the precomputed path first (sketch fallback), + // while EHUniv/EHKLL functions prefer the sketch path first. + // let prefer_precomputed = self + // .extract_sketch_func_name(&query) + // .is_some_and(|name| is_usampling_function(&name)); + + // if !prefer_precomputed { + // // Non-USampling sketch functions: try sketch path first + // if let Some(result) = self.handle_sketch_query_promql(&query, time) { + // let total_query_duration = query_start_time.elapsed(); + // debug!( + // "Sketch query handling took: {:.2}ms", + // total_query_duration.as_secs_f64() * 1000.0 + // ); + // return Some(result); + // } + // } + + // // Precomputed pipeline + // let precomputed_result = (|| -> Option<(KeyByLabelNames, QueryResult)> { + // let context = self.build_query_execution_context_promql(query.clone(), time)?; + + // debug!( + // "Querying store for metric: {}, aggregation_id: {}, range: [{}, {}]", + // context.metric, + // context.agg_info.aggregation_id_for_value, + // context.store_plan.values_query.start_timestamp, + // context.store_plan.values_query.end_timestamp + // ); + + // let results = self + // .execute_query_pipeline(&context, true) // PromQL: topk enabled + // .map_err(|e| { + // warn!("Query execution failed: {}", e); + // e + // }) + // .ok()?; + + // Some(( + // context.metadata.query_output_labels, + // QueryResult::vector(results, context.query_time), + // )) + // })(); + + // if precomputed_result.is_some() { + // let total_query_duration = query_start_time.elapsed(); + // debug!( + // "Total query handling took: {:.2}ms", + // total_query_duration.as_secs_f64() * 1000.0 + // ); + // return precomputed_result; + // } + + // // Fallback: USampling functions try sketch if precomputed had no data + // if prefer_precomputed { + // if let Some(result) = self.handle_sketch_query_promql(&query, time) { + // let total_query_duration = query_start_time.elapsed(); + // debug!( + // "Sketch fallback query handling took: {:.2}ms", + // total_query_duration.as_secs_f64() * 1000.0 + // ); + // return Some(result); + // } + // } + + let total_query_duration = query_start_time.elapsed(); + debug!( + "Total query handling took: {:.2}ms (no results)", + total_query_duration.as_secs_f64() * 1000.0 + ); + result + } + + pub fn build_query_execution_context_promql( + &self, + query: String, + time: f64, + ) -> Option { + let query_time = Self::convert_query_time_to_data_time(time); + + // Parse PromQL AST using promql-parser crate + let parse_start_time = Instant::now(); + let ast = match promql_parser::parser::parse(&query) { + Ok(ast) => { + let parse_duration = parse_start_time.elapsed(); + debug!( + "PromQL parsing took: {:.2}ms", + parse_duration.as_secs_f64() * 1000.0 + ); + ast + } + Err(e) => { + warn!("Failed to parse PromQL query '{}': {}", query, e); + return None; + } + }; + + let pattern_match_start_time = Instant::now(); + + let mut found_match = None; + for (pattern_type, patterns) in &self.controller_patterns { + for pattern in patterns { + debug!( + "Trying pattern type: {:?} for query: {}", + pattern_type, query + ); + let match_result = pattern.matches(&ast); + debug!("Match result: {:?}", match_result); + if match_result.matches { + found_match = Some((*pattern_type, match_result)); + break; + } + } + if found_match.is_some() { + break; + } + } + + let (query_pattern_type, match_result) = match found_match { + Some((pt, result)) => { + let pattern_match_duration = pattern_match_start_time.elapsed(); + debug!( + "Pattern matching took: {:.2}ms", + pattern_match_duration.as_secs_f64() * 1000.0 + ); + (pt, result) + } + None => { + warn!("No matching pattern found for query: {}", query); + return None; + } + }; + + debug!("Found matching query config for: {}", query); + + let query_context_start_time = Instant::now(); + + // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. + let agg_info: AggregationIdInfo = if let Some(config) = self.find_query_config(&query) { + self.get_aggregation_id_info(config) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()? + } else { + warn!( + "No query_config entry for PromQL query '{}'. Attempting capability-based matching.", + query + ); + let requirements = + self.build_query_requirements_promql(&match_result, query_pattern_type); + self.streaming_config + .find_compatible_aggregation(&requirements)? + }; + + let result = self.build_promql_execution_context_tail( + &match_result, + query_pattern_type, + query_time, + agg_info, + ); + + let query_context_duration = query_context_start_time.elapsed(); + debug!( + "[LATENCY] Query context build: {:.2}ms", + query_context_duration.as_secs_f64() * 1000.0 + ); + + result + } + + /// Build execution context for range query + pub fn build_range_query_execution_context_promql( + &self, + query: String, + start: f64, + end: f64, + step: f64, + ) -> Option { + // First, build the base instant query context (reuse existing logic) + // Use 'end' as the reference time for parsing + let base_context = self.build_query_execution_context_promql(query, end)?; + + // Convert to milliseconds + let start_ms = Self::convert_query_time_to_data_time(start); + let end_ms = Self::convert_query_time_to_data_time(end); + let step_ms = (step * 1000.0) as u64; + + // Get window size + let tumbling_window_ms = self + .streaming_config + .get_aggregation_config(base_context.agg_info.aggregation_id_for_value) + .map(|config| config.window_size * 1000)?; + + // Validate parameters + self.validate_range_query_params(start_ms, end_ms, step_ms, tumbling_window_ms) + .map_err(|e| { + warn!("Range query validation failed: {}", e); + e + }) + .ok()?; + + // Calculate lookback from the base context's store plan + let lookback_ms = base_context.store_plan.values_query.end_timestamp + - base_context.store_plan.values_query.start_timestamp; + + let buckets_per_step = (step_ms / tumbling_window_ms) as usize; + let lookback_bucket_count = (lookback_ms / tumbling_window_ms) as usize; + + // Modify the store plan to cover the entire range + let mut extended_store_plan = base_context.store_plan.clone(); + extended_store_plan.values_query.start_timestamp = start_ms.saturating_sub(lookback_ms); + extended_store_plan.values_query.end_timestamp = end_ms; + // Range queries always use range fetch, not exact + extended_store_plan.values_query.is_exact_query = false; + + Some(RangeQueryExecutionContext { + base: QueryExecutionContext { + store_plan: extended_store_plan, + ..base_context + }, + range_params: RangeQueryParams { + start: start_ms, + end: end_ms, + step: step_ms, + }, + buckets_per_step, + lookback_bucket_count, + tumbling_window_ms, + }) + } + + /// Main entry point for range queries + pub fn handle_range_query_promql( + &self, + query: String, + start: f64, + end: f64, + step: f64, + ) -> Option<(KeyByLabelNames, QueryResult)> { + let query_start_time = Instant::now(); + debug!( + "Handling range query: {} from {} to {} step {}", + query, start, end, step + ); + + // Check for binary arithmetic before attempting single-query dispatch. + if let Ok(ast) = promql_parser::parser::parse(&query) { + if matches!(&ast, promql_parser::parser::Expr::Binary(_)) { + let result = self.handle_binary_expr_range_promql(&ast, start, end, step); + let total_duration = query_start_time.elapsed(); + debug!( + "Binary arithmetic range query handling took: {:.2}ms", + total_duration.as_secs_f64() * 1000.0 + ); + return result; + } + } + + let context = self.build_range_query_execution_context_promql(query, start, end, step)?; + + // Execute range query pipeline + let results: Vec = self + .execute_range_query_pipeline(&context) + .map_err(|e| { + warn!("Range query execution failed: {}", e); + e + }) + .ok()?; + + // // Determine query routing order based on function type. + // // USampling functions prefer the precomputed path first (sketch fallback), + // // while EHUniv/EHKLL functions prefer the sketch path first. + // let prefer_precomputed = self + // .extract_sketch_func_name(&query) + // .is_some_and(|name| is_usampling_function(&name)); + + // if !prefer_precomputed { + // // Non-USampling sketch functions: try sketch path first + // if let Some(result) = self.handle_sketch_range_query_promql(&query, start, end, step) { + // let total_duration = query_start_time.elapsed(); + // debug!( + // "Sketch range query handling took: {:.2}ms", + // total_duration.as_secs_f64() * 1000.0 + // ); + // return Some(result); + // } + // } + + // // Precomputed pipeline + // let precomputed_result = (|| -> Option<(KeyByLabelNames, QueryResult)> { + // let context = + // self.build_range_query_execution_context_promql(query.clone(), start, end, step)?; + + // let results: Vec = self + // .execute_range_query_pipeline(&context) + // .map_err(|e| { + // warn!("Range query execution failed: {}", e); + // e + // }) + // .ok()?; + + // Some(( + // context.base.metadata.query_output_labels, + // QueryResult::matrix(results), + // )) + // })(); + + // // Fallback: USampling functions try sketch if precomputed had no data + // if prefer_precomputed { + // if let Some(result) = self.handle_sketch_range_query_promql(&query, start, end, step) { + // let total_duration = query_start_time.elapsed(); + // debug!( + // "Sketch fallback range query handling took: {:.2}ms", + // total_duration.as_secs_f64() * 1000.0 + // ); + // return Some(result); + // } + // } + + let total_duration = query_start_time.elapsed(); + debug!( + "Total range query handling took: {:.2}ms", + total_duration.as_secs_f64() * 1000.0 + ); + + Some(( + context.base.metadata.query_output_labels, + QueryResult::matrix(results), + )) + } +} diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs new file mode 100644 index 0000000..388954d --- /dev/null +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -0,0 +1,556 @@ +//! SQL query language handler for SimpleEngine. +//! +//! Contains all SQL-specific context building, pattern matching, and query dispatch. + +use super::SimpleEngine; +use super::{QueryExecutionContext, QueryMetadata, QueryTimestamps}; +use crate::data_model::{AggregationIdInfo, QueryConfig, SchemaConfig}; +use crate::engines::query_result::QueryResult; +use asap_types::query_requirements::QueryRequirements; +use asap_types::utils::normalize_spatial_filter; +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::query_logics::enums::{QueryPatternType, Statistic}; +use sql_utilities::ast_matching::QueryType; +use sql_utilities::ast_matching::{SQLPatternMatcher, SQLPatternParser, SQLQuery}; +use sql_utilities::sqlhelper::{AggregationInfo, SQLQueryData}; +use sqlparser::dialect::*; +use sqlparser::parser::Parser as parser; +use std::collections::HashMap; +use tracing::{debug, warn}; + +impl SimpleEngine { + /// Finds the query configuration for a SQL query using structural pattern matching. + /// + /// Unlike `find_query_config` (which does exact string comparison), this method parses + /// each template in query_configs and compares it structurally against the incoming + /// query_data — ignoring absolute timestamps and comparing only metric, aggregation, + /// labels, time column name, and duration. + fn find_query_config_sql(&self, query_data: &SQLQueryData) -> Option<&QueryConfig> { + let schema = match &self.inference_config.schema { + SchemaConfig::SQL(sql_schema) => sql_schema, + _ => return None, + }; + + self.inference_config.query_configs.iter().find(|config| { + let template_statements = + match parser::parse_sql(&GenericDialect {}, config.query.as_str()) { + Ok(stmts) => stmts, + Err(_) => return false, + }; + let template_data = + match SQLPatternParser::new(schema, 0.0).parse_query(&template_statements) { + Some(data) => data, + None => return false, + }; + query_data.matches_sql_pattern(&template_data) + }) + } + + /// Calculates start timestamp for SQL queries + fn calculate_start_timestamp_sql( + &self, + end_timestamp: u64, + query_pattern_type: QueryPatternType, + match_result: &SQLQuery, + ) -> u64 { + match query_pattern_type { + QueryPatternType::OnlyTemporal => { + let scrape_intervals = match_result + .outer_data() + .expect("OnlyTemporal pattern guarantees outer_data is present") + .time_info + .clone() + .get_duration() as u64; + end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000) + } + QueryPatternType::OneTemporalOneSpatial => { + let scrape_intervals = match_result + .inner_data() + .expect("OneTemporalOneSpatial pattern guarantees inner_data is present") + .time_info + .clone() + .get_duration() as u64; + end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000) + } + QueryPatternType::OnlySpatial => { + end_timestamp - (self.prometheus_scrape_interval * 1000) + } + } + } + + /// Calculates and validates query timestamps for SQL + fn calculate_query_timestamps_sql( + &self, + query_time: u64, + query_pattern_type: QueryPatternType, + match_result: &SQLQuery, + ) -> QueryTimestamps { + let mut end_timestamp = query_time; + end_timestamp = self.validate_and_align_end_timestamp(end_timestamp, query_pattern_type); + let start_timestamp = + self.calculate_start_timestamp_sql(end_timestamp, query_pattern_type, match_result); + + QueryTimestamps { + start_timestamp, + end_timestamp, + } + } + + /// Extracts quantile parameter from SQL match result + fn extract_quantile_param_sql(&self, match_result: &SQLQuery) -> Option { + match_result + .query_data + .first() + .map(|data| data.aggregation_info.get_args()[0].to_string()) + } + + /// Builds query kwargs for SQL queries + fn build_query_kwargs_sql( + &self, + statistic: &Statistic, + match_result: &SQLQuery, + ) -> Result, String> { + let mut query_kwargs = HashMap::new(); + + if *statistic == Statistic::Quantile { + let quantile = self + .extract_quantile_param_sql(match_result) + .ok_or_else(|| "Missing quantile parameter for quantile query".to_string())?; + query_kwargs.insert("quantile".to_string(), quantile); + } + // Note: SQL doesn't support topk limiting yet + + Ok(query_kwargs) + } + + fn sql_get_is_collapsable( + &self, + temporal_aggregation: &AggregationInfo, + spatial_aggregation: &AggregationInfo, + ) -> bool { + match spatial_aggregation.get_name() { + "SUM" => matches!( + temporal_aggregation.get_name(), + "SUM" | "COUNT" // Note: "increase" and "rate" are commented out in Python + ), + "MIN" => temporal_aggregation.get_name() == "MIN", + "MAX" => temporal_aggregation.get_name() == "MAX", + _ => false, + } + } + + /// Extract QueryRequirements from a parsed SQL match result. + /// Used as the fallback path when no query_configs entry is found. + fn build_query_requirements_sql( + &self, + match_result: &SQLQuery, + query_pattern_type: QueryPatternType, + ) -> QueryRequirements { + let query_data = match_result + .outer_data() + .expect("build_query_requirements_sql called on valid SQLQuery"); + let metric = query_data.metric.clone(); + + let statistic_name = match query_pattern_type { + QueryPatternType::OneTemporalOneSpatial => match_result + .inner_data() + .expect("OneTemporalOneSpatial pattern guarantees inner_data is present") + .aggregation_info + .get_name() + .to_lowercase(), + _ => query_data.aggregation_info.get_name().to_lowercase(), + }; + + let statistics: Vec = Self::parse_single_statistic(&statistic_name) + .into_iter() + .collect(); + + let data_range_ms = match query_pattern_type { + QueryPatternType::OnlySpatial => None, + QueryPatternType::OnlyTemporal => { + let scrape_intervals = query_data.time_info.clone().get_duration() as u64; + Some(scrape_intervals * self.prometheus_scrape_interval * 1000) + } + QueryPatternType::OneTemporalOneSpatial => { + let scrape_intervals = match_result + .inner_data() + .expect("OneTemporalOneSpatial pattern guarantees inner_data is present") + .time_info + .clone() + .get_duration() as u64; + Some(scrape_intervals * self.prometheus_scrape_interval * 1000) + } + }; + + let grouping_labels = KeyByLabelNames::new(query_data.labels.clone().into_iter().collect()); + + QueryRequirements { + metric, + statistics, + data_range_ms, + grouping_labels, + spatial_filter_normalized: normalize_spatial_filter(""), + } + } + + pub fn handle_query_sql( + &self, + query: String, + time: f64, + ) -> Option<(KeyByLabelNames, QueryResult)> { + let context = self.build_query_execution_context_sql(query, time)?; + self.execute_context(context, false) + } + + pub fn build_query_execution_context_sql( + &self, + query: String, + time: f64, + ) -> Option { + // Get SQL schema from inference config + let schema = match &self.inference_config.schema { + SchemaConfig::SQL(sql_schema) => sql_schema.clone(), + SchemaConfig::PromQL(_) => { + warn!("SQL query requested but config has PromQL schema"); + return None; + } + &SchemaConfig::ElasticQueryDSL => todo!(), + SchemaConfig::ElasticSQL(sql_schema) => sql_schema.clone(), + }; + + let statements = parser::parse_sql(&GenericDialect {}, query.as_str()).unwrap(); + let query_data = SQLPatternParser::new(&schema, time).parse_query(&statements); + + let query_data = match query_data { + Some(data) => data, + None => { + debug!("Could not parse query"); + return None; + } + }; + + let matcher = SQLPatternMatcher::new(schema, self.prometheus_scrape_interval as f64); + let match_result = matcher.query_info_to_pattern(&query_data); + + debug!("Match result: {:?}", match_result); + debug!("Validity: {}", match_result.is_valid()); + + if !match_result.is_valid() { + return None; + } + + // Handle SpatioTemporal queries separately - they bypass QueryPatternType mapping + if match_result.query_type == vec![QueryType::SpatioTemporal] { + let query_time = Self::convert_query_time_to_data_time( + query_data.time_info.get_start() + query_data.time_info.get_duration(), + ); + return self.build_spatiotemporal_context(&match_result, query_time, &query_data); + } + + let query_pattern_type = match &match_result.query_type[..] { + [x] => match x { + QueryType::Spatial => QueryPatternType::OnlySpatial, + QueryType::TemporalGeneric => QueryPatternType::OnlyTemporal, + QueryType::TemporalQuantile => QueryPatternType::OnlyTemporal, + QueryType::SpatioTemporal => unreachable!("SpatioTemporal handled above"), + }, + [x, y] => match (x, y) { + (QueryType::Spatial, QueryType::TemporalGeneric) => { + QueryPatternType::OneTemporalOneSpatial + } + (QueryType::Spatial, QueryType::TemporalQuantile) => { + QueryPatternType::OneTemporalOneSpatial + } + _ => panic!("Unsupported query type found"), + }, + _ => panic!("Unsupported query type found"), + }; + + // For nested queries (spatial of temporal), the outer query has no time clause, + // so we need to use the inner (temporal) query's time_info to compute query_time + let query_time = match query_pattern_type { + QueryPatternType::OneTemporalOneSpatial => { + let inner_time_info = &match_result.inner_data()?.time_info; + Self::convert_query_time_to_data_time( + inner_time_info.get_start() + inner_time_info.get_duration(), + ) + } + _ => Self::convert_query_time_to_data_time( + query_data.time_info.get_start() + query_data.time_info.get_duration(), + ), + }; + + // self.handle_sql_temporal_aggregation( + // query_config, + // &match_result, + // query_time, + // query_pattern_type, + // ) + // } + + // fn handle_sql_temporal_aggregation( + // &self, + // query_config: &QueryConfig, + // match_result: &SQLQuery, + // query_time: u64, + // query_pattern_type: QueryPatternType, + // ) -> Option<(KeyByLabelNames, QueryResult)> { + // Labels + + let query_output_labels = match &match_result.query_type.len() { + // Potentially change SQLQueryType + 1 => { + // For non-nested queries, output associated labels + let labels = &match_result.outer_data()?.labels; + + KeyByLabelNames::new(labels.clone().into_iter().collect()) + } + 2 => { + // Extract spatial aggregation output labels using AST-based approach + let temporal_labels = &match_result.inner_data()?.labels; + let spatial_labels = &match_result.outer_data()?.labels; + + let temporal_aggregation = &match_result.inner_data()?.aggregation_info; + let spatial_aggregation = &match_result.outer_data()?.aggregation_info; + + match self.sql_get_is_collapsable(temporal_aggregation, spatial_aggregation) { + // If false: get all labels, which are all temporal labels. If true, get only spatial labels + false => KeyByLabelNames::new(temporal_labels.clone().into_iter().collect()), + true => KeyByLabelNames::new(spatial_labels.clone().into_iter().collect()), + } + } + _ => { + warn!("Invalid query type: {}", query_pattern_type); + KeyByLabelNames::new(Vec::new()) + } + }; + + // Statistic - determine based on query pattern type + let statistic_name = match query_pattern_type { + QueryPatternType::OnlyTemporal => { + // Use the temporal aggregation (first subquery) + match_result + .outer_data()? + .aggregation_info + .get_name() + .to_lowercase() + } + QueryPatternType::OneTemporalOneSpatial => { + // Use the temporal aggregation (second subquery contains temporal) + match_result + .inner_data()? + .aggregation_info + .get_name() + .to_lowercase() + } + QueryPatternType::OnlySpatial => { + // Use the spatial aggregation (first subquery) + match_result + .outer_data()? + .aggregation_info + .get_name() + .to_lowercase() + } + }; + + let statistic_to_compute = Self::parse_single_statistic(&statistic_name)?; + + let query_kwargs = self + .build_query_kwargs_sql(&statistic_to_compute, &match_result) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()?; + + // Create query metadata + let metadata = QueryMetadata { + query_output_labels: query_output_labels.clone(), + statistic_to_compute, + query_kwargs: query_kwargs.clone(), + }; + + // Time + let timestamps = + self.calculate_query_timestamps_sql(query_time, query_pattern_type, &match_result); + + // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. + let agg_info: AggregationIdInfo = if let Some(config) = + self.find_query_config_sql(&query_data) + { + self.get_aggregation_id_info(config) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()? + } else { + warn!("No query_config entry for SQL query. Attempting capability-based matching."); + let requirements = self.build_query_requirements_sql(&match_result, query_pattern_type); + self.streaming_config + .find_compatible_aggregation(&requirements)? + }; + + let metric = &match_result.outer_data()?.metric; + + let spatial_filter = if query_pattern_type == QueryPatternType::OneTemporalOneSpatial { + match_result + .outer_data()? + .labels + .iter() + .cloned() + .collect::>() + .join(",") + } else { + String::new() + }; + + let do_merge = query_pattern_type == QueryPatternType::OnlyTemporal + || query_pattern_type == QueryPatternType::OneTemporalOneSpatial; + + self.build_sql_execution_context_tail( + metric, + ×tamps, + metadata, + agg_info, + do_merge, + spatial_filter, + query_time, + ) + } + + /// Shared context-building tail for both SQL context builders. + /// + /// Called by `build_query_execution_context_sql` and `build_spatiotemporal_context` + /// after labels, statistic, metadata, timestamps, and `agg_info` are resolved. + /// Builds the query plan, derives grouping/aggregated labels, and returns the + /// final `QueryExecutionContext`. + #[allow(clippy::too_many_arguments)] + fn build_sql_execution_context_tail( + &self, + metric: &str, + timestamps: &QueryTimestamps, + metadata: QueryMetadata, + agg_info: AggregationIdInfo, + do_merge: bool, + spatial_filter: String, + query_time: u64, + ) -> Option { + let query_plan = self + .create_store_query_plan(metric, timestamps, &agg_info) + .map_err(|e| { + warn!("Failed to create store query plan: {}", e); + e + }) + .ok()?; + + let grouping_labels = self + .streaming_config + .get_aggregation_config(agg_info.aggregation_id_for_value) + .map(|config| config.grouping_labels.clone()) + .unwrap_or_else(|| metadata.query_output_labels.clone()); + + let aggregated_labels = self + .streaming_config + .get_aggregation_config(agg_info.aggregation_id_for_key) + .map(|config| config.aggregated_labels.clone()) + .unwrap_or_else(KeyByLabelNames::empty); + + Some(QueryExecutionContext { + metric: metric.to_string(), + metadata, + store_plan: query_plan, + agg_info, + do_merge, + spatial_filter, + query_time, + grouping_labels, + aggregated_labels, + }) + } + + /// Build execution context for SpatioTemporal queries. + /// These queries span multiple scrape intervals but GROUP BY a subset of labels. + fn build_spatiotemporal_context( + &self, + match_result: &SQLQuery, + query_time: u64, + query_data: &SQLQueryData, + ) -> Option { + // Output labels are the GROUP BY columns (subset of all labels) + let query_output_labels = KeyByLabelNames::new( + match_result + .outer_data()? + .labels + .clone() + .into_iter() + .collect(), + ); + + // Get the statistic from the aggregation + let statistic_name = match_result + .outer_data()? + .aggregation_info + .get_name() + .to_lowercase(); + + let statistic_to_compute = Self::parse_single_statistic(&statistic_name)?; + + let query_kwargs = self + .build_query_kwargs_sql(&statistic_to_compute, match_result) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()?; + + let metadata = QueryMetadata { + query_output_labels: query_output_labels.clone(), + statistic_to_compute, + query_kwargs: query_kwargs.clone(), + }; + + // Calculate timestamps - similar to OnlyTemporal + let end_timestamp = + self.validate_and_align_end_timestamp(query_time, QueryPatternType::OnlyTemporal); + let scrape_intervals = match_result.outer_data()?.time_info.get_duration() as u64; + let start_timestamp = + end_timestamp - (scrape_intervals * self.prometheus_scrape_interval * 1000); + + let timestamps = QueryTimestamps { + start_timestamp, + end_timestamp, + }; + + // Resolve aggregation: try pre-configured query_configs first, fall back to capability matching. + let agg_info: AggregationIdInfo = if let Some(config) = + self.find_query_config_sql(query_data) + { + self.get_aggregation_id_info(config) + .map_err(|e| { + warn!("{}", e); + e + }) + .ok()? + } else { + warn!( + "No query_config entry for SQL spatio-temporal query. Attempting capability-based matching." + ); + let requirements = + self.build_query_requirements_sql(match_result, QueryPatternType::OnlyTemporal); + self.streaming_config + .find_compatible_aggregation(&requirements)? + }; + let metric = &match_result.outer_data()?.metric; + + self.build_sql_execution_context_tail( + metric, + ×tamps, + metadata, + agg_info, + true, + String::new(), + query_time, + ) + } +}