From 35e3cf0837c862e1f88b24ecc8306318f481058d Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Mon, 4 May 2026 22:43:04 -0400 Subject: [PATCH 1/5] Initial Elastic DSL path in planner. Logic almost 1-1 of SQL since we support semantically similar queries as of now. --- Cargo.lock | 1 + asap-planner-rs/Cargo.toml | 1 + asap-planner-rs/src/config/input.rs | 16 ++ asap-planner-rs/src/error.rs | 4 + asap-planner-rs/src/lib.rs | 47 +++++ asap-planner-rs/src/main.rs | 14 +- .../src/output/elastic_generator.rs | 140 +++++++++++++++ asap-planner-rs/src/output/mod.rs | 1 + .../src/planner/elastic_single_query.rs | 170 ++++++++++++++++++ asap-planner-rs/src/planner/mod.rs | 1 + 10 files changed, 393 insertions(+), 2 deletions(-) create mode 100644 asap-planner-rs/src/output/elastic_generator.rs create mode 100644 asap-planner-rs/src/planner/elastic_single_query.rs diff --git a/Cargo.lock b/Cargo.lock index cbbdde05..2da594de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -393,6 +393,7 @@ dependencies = [ "asap_types", "chrono", "clap 4.6.0", + "elastic_dsl_utilities", "indexmap 2.13.1", "pretty_assertions", "promql-parser", diff --git a/asap-planner-rs/Cargo.toml b/asap-planner-rs/Cargo.toml index abeaf025..53a32fc7 100644 --- a/asap-planner-rs/Cargo.toml +++ b/asap-planner-rs/Cargo.toml @@ -15,6 +15,7 @@ path = "src/main.rs" asap_types.workspace = true promql_utilities.workspace = true sql_utilities.workspace = true +elastic_dsl_utilities.workspace = true sqlparser = "0.59.0" serde.workspace = true serde_json.workspace = true diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 55422bc3..75c599a7 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -137,3 +137,19 @@ pub struct TableDefinition { pub value_columns: Vec, pub metadata_columns: Vec, } + +#[derive(Debug, Clone, Deserialize)] +pub struct ElasticDSLControllerConfig { + pub query_groups: Vec, + pub index: String, + pub sketch_parameters: Option, + pub aggregate_cleanup: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ElasticDSLQueryGroup { + pub id: Option, + pub queries: Vec, + pub repetition_delay: u64, + pub controller_options: ControllerOptions, +} diff --git a/asap-planner-rs/src/error.rs b/asap-planner-rs/src/error.rs index 02748805..873c25c7 100644 --- a/asap-planner-rs/src/error.rs +++ b/asap-planner-rs/src/error.rs @@ -20,4 +20,8 @@ pub enum ControllerError { UnknownTable(String), #[error("Prometheus client error: {0}")] PrometheusClient(String), + #[error("Elasticsearch DSL parse error: {0}")] + ElasticDSLParse(String), + #[error("Unsupported Elasticsearch DSL query: {0}")] + UnsupportedElasticDSLQuery(String), } diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 5aa3196b..0c6c2145 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -16,6 +16,7 @@ use tracing::debug; pub use asap_types::PromQLSchema; pub use config::input::ControllerConfig; pub use config::input::SQLControllerConfig; +pub use config::input::ElasticDSLControllerConfig; pub use error::ControllerError; pub use output::generator::{GeneratorOutput, PuntedQuery}; use output::generator::{ @@ -24,6 +25,7 @@ use output::generator::{ KEY_WINDOW_SIZE, }; pub use output::sql_generator::SQLRuntimeOptions; +pub use output::elastic_generator::ElasticRuntimeOptions; pub use prometheus_client::build_schema_from_prometheus; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -294,6 +296,51 @@ impl SQLController { } } +pub struct ElasticController { + config: ElasticDSLControllerConfig, + options: ElasticRuntimeOptions, +} + +impl ElasticController { + pub fn new(config: ElasticDSLControllerConfig, options: ElasticRuntimeOptions) -> Self { + Self { config, options } + } + + pub fn from_file(path: &Path, opts: ElasticRuntimeOptions) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + Self::from_yaml(&yaml_str, opts) + } + + pub fn from_yaml(yaml: &str, opts: ElasticRuntimeOptions) -> Result { + let config: ElasticDSLControllerConfig = serde_yaml::from_str(yaml)?; + Ok(Self { + config, + options: opts, + }) + } + + pub fn generate(&self) -> Result { + let output = output::elastic_generator::generate_elastic_plan(&self.config, &self.options)?; + Ok(PlannerOutput { + punted_queries: output.punted_queries, + streaming_yaml: output.streaming_yaml, + inference_yaml: output.inference_yaml, + aggregation_count: output.aggregation_count, + query_count: output.query_count, + }) + } + + pub fn generate_to_dir(&self, dir: &Path) -> Result { + let output = self.generate()?; + std::fs::create_dir_all(dir)?; + let streaming_str = serde_yaml::to_string(&output.streaming_yaml)?; + let inference_str = serde_yaml::to_string(&output.inference_yaml)?; + std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?; + std::fs::write(dir.join("inference_config.yaml"), inference_str)?; + Ok(output) + } +} + impl Controller { pub fn new(config: ControllerConfig, schema: PromQLSchema, options: RuntimeOptions) -> Self { Self { diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index 863f4d4e..3b88abd5 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -1,4 +1,4 @@ -use asap_planner::{Controller, RuntimeOptions, SQLController, SQLRuntimeOptions, StreamingEngine}; +use asap_planner::{Controller, RuntimeOptions, SQLController, SQLRuntimeOptions, ElasticController, ElasticRuntimeOptions, StreamingEngine}; use asap_types::enums::QueryLanguage; use clap::Parser; use std::path::PathBuf; @@ -126,7 +126,17 @@ fn main() -> anyhow::Result<()> { SQLController::from_file(&config_path, opts)?.generate_to_dir(&args.output_dir)?; } QueryLanguage::elastic_querydsl => { - anyhow::bail!("ElasticQueryDSL is not yet supported"); + let interval = args.data_ingestion_interval.ok_or_else(|| { + anyhow::anyhow!("--data-ingestion-interval is required for Elasticsearch DSL mode") + })?; + let config_path = args + .input_config + .ok_or_else(|| anyhow::anyhow!("--input_config is required for Elasticsearch DSL mode"))?; + let opts = ElasticRuntimeOptions { + streaming_engine: engine, + data_ingestion_interval: interval, + }; + ElasticController::from_file(&config_path, opts)?.generate_to_dir(&args.output_dir)?; } } diff --git a/asap-planner-rs/src/output/elastic_generator.rs b/asap-planner-rs/src/output/elastic_generator.rs new file mode 100644 index 00000000..8cc9b678 --- /dev/null +++ b/asap-planner-rs/src/output/elastic_generator.rs @@ -0,0 +1,140 @@ +use asap_types::enums::CleanupPolicy; +use indexmap::IndexMap; +use serde_yaml::Value as YamlValue; +use std::collections::HashMap; + +use crate::config::input::ElasticDSLControllerConfig; +use crate::error::ControllerError; +use crate::output::generator::{ + build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, + KEY_CLEANUP_POLICY, KEY_NAME, KEY_QUERIES, +}; +use crate::planner::single_query::IntermediateAggConfig; +use crate::planner::elastic_single_query::ElasticSingleQueryProcessor; +use crate::StreamingEngine; + +pub struct ElasticRuntimeOptions { + pub streaming_engine: StreamingEngine, + pub data_ingestion_interval: u64, +} + +pub fn generate_elastic_plan( + config: &ElasticDSLControllerConfig, + opts: &ElasticRuntimeOptions, +) -> Result { + let cleanup_policy = config + .aggregate_cleanup + .as_ref() + .and_then(|c| c.policy) + .unwrap_or(CleanupPolicy::ReadBased); + + // Validate T % data_ingestion_interval == 0 + for qg in &config.query_groups { + if qg.repetition_delay % opts.data_ingestion_interval != 0 { + return Err(ControllerError::PlannerError(format!( + "repetition_delay {} is not a multiple of data_ingestion_interval {}", + qg.repetition_delay, opts.data_ingestion_interval + ))); + } + } + + // Check for duplicate queries + let mut seen_queries = std::collections::HashSet::new(); + for qg in &config.query_groups { + for q in &qg.queries { + if !seen_queries.insert(q.clone()) { + return Err(ControllerError::DuplicateQuery(q.clone())); + } + } + } + + // Dedup map: identifying_key -> IntermediateAggConfig + let mut dedup_map: IndexMap = IndexMap::new(); + // query_string -> Vec<(key, cleanup_param)> + let mut query_keys_map: IndexMap)>> = IndexMap::new(); + + for qg in &config.query_groups { + for query_string in &qg.queries { + let processor = ElasticSingleQueryProcessor::new( + query_string.clone(), + qg.repetition_delay, + opts.data_ingestion_interval, + config.index.clone(), + opts.streaming_engine, + config.sketch_parameters.clone(), + cleanup_policy, + ); + + let (configs, cleanup_param) = processor.get_streaming_aggregation_configs()?; + + let mut keys_for_query = Vec::new(); + for config_item in configs { + let key = config_item.identifying_key(); + keys_for_query.push((key.clone(), cleanup_param)); + dedup_map.entry(key).or_insert(config_item); + } + query_keys_map.insert(query_string.clone(), keys_for_query); + } + } + + // Assign sequential IDs + let mut id_map: HashMap = HashMap::new(); + for (idx, key) in dedup_map.keys().enumerate() { + id_map.insert(key.clone(), idx as u32 + 1); + } + + let streaming_yaml = build_elastic_streaming_yaml(config, &dedup_map, &id_map)?; + let inference_yaml = build_elastic_inference_yaml(config, cleanup_policy, &query_keys_map, &id_map)?; + + Ok(GeneratorOutput { + punted_queries: Vec::new(), + streaming_yaml, + inference_yaml, + aggregation_count: dedup_map.len(), + query_count: query_keys_map.len(), + }) +} + +fn build_elastic_streaming_yaml( + _config: &ElasticDSLControllerConfig, + dedup_map: &IndexMap, + id_map: &HashMap, +) -> Result { + let aggregations: Vec = dedup_map + .iter() + .map(|(key, cfg)| build_aggregation_entry(id_map[key], cfg)) + .collect(); + + let mut root = serde_yaml::Mapping::new(); + root.insert( + YamlValue::String(KEY_AGGREGATIONS.to_string()), + YamlValue::Sequence(aggregations), + ); + + Ok(YamlValue::Mapping(root)) +} + +fn build_elastic_inference_yaml( + _config: &ElasticDSLControllerConfig, + cleanup_policy: CleanupPolicy, + query_keys_map: &IndexMap)>>, + id_map: &HashMap, +) -> Result { + let mut cleanup_map = serde_yaml::Mapping::new(); + cleanup_map.insert( + YamlValue::String(KEY_NAME.to_string()), + YamlValue::String(cleanup_policy.to_string()), + ); + + let mut root = serde_yaml::Mapping::new(); + root.insert( + YamlValue::String(KEY_CLEANUP_POLICY.to_string()), + YamlValue::Mapping(cleanup_map), + ); + root.insert( + YamlValue::String(KEY_QUERIES.to_string()), + YamlValue::Sequence(build_queries_yaml(cleanup_policy, query_keys_map, id_map)), + ); + + Ok(YamlValue::Mapping(root)) +} diff --git a/asap-planner-rs/src/output/mod.rs b/asap-planner-rs/src/output/mod.rs index 63c14cbb..773c921f 100644 --- a/asap-planner-rs/src/output/mod.rs +++ b/asap-planner-rs/src/output/mod.rs @@ -1,3 +1,4 @@ pub mod generator; pub mod sql_generator; +pub mod elastic_generator; pub use generator::*; diff --git a/asap-planner-rs/src/planner/elastic_single_query.rs b/asap-planner-rs/src/planner/elastic_single_query.rs new file mode 100644 index 00000000..db258909 --- /dev/null +++ b/asap-planner-rs/src/planner/elastic_single_query.rs @@ -0,0 +1,170 @@ +use asap_types::enums::{CleanupPolicy, WindowType}; +use elastic_dsl_utilities::ast_parsing::{ + extract_query_info, AggregationType as ElasticAggregationType, +}; +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, Statistic}; + +use crate::config::input::SketchParameterOverrides; +use crate::error::ControllerError; +use crate::planner::logics::{build_sketch_parameters, get_sql_cleanup_param, IntermediateWindowConfig}; +use crate::planner::single_query::{build_agg_configs_for_statistics, IntermediateAggConfig}; +use crate::StreamingEngine; + +pub struct ElasticSingleQueryProcessor { + query_string: String, + t_repeat: u64, + #[allow(dead_code)] + data_ingestion_interval: u64, + index: String, + #[allow(dead_code)] + streaming_engine: StreamingEngine, + sketch_parameters: Option, + cleanup_policy: CleanupPolicy, +} + +impl ElasticSingleQueryProcessor { + #[allow(clippy::too_many_arguments)] + pub fn new( + query_string: String, + t_repeat: u64, + data_ingestion_interval: u64, + index: String, + streaming_engine: StreamingEngine, + sketch_parameters: Option, + cleanup_policy: CleanupPolicy, + ) -> Self { + Self { + query_string, + t_repeat, + data_ingestion_interval, + index, + streaming_engine, + sketch_parameters, + cleanup_policy, + } + } + + pub fn get_streaming_aggregation_configs( + &self, + ) -> Result<(Vec, Option), ControllerError> { + // Parse and extract query info using utilities + let query_info = extract_query_info(&self.query_string).ok_or_else(|| { + ControllerError::ElasticDSLParse(format!( + "Failed to parse Elasticsearch DSL query: {}", + self.query_string + )) + })?; + + // Get aggregation type and statistics + let (treatment_type, statistics) = get_elastic_statistics(&query_info.aggregation)?; + + // Build window config (always tumbling for Elasticsearch queries) + let window_cfg = IntermediateWindowConfig { + window_size: self.t_repeat, + slide_interval: self.t_repeat, + window_type: WindowType::Tumbling, + }; + + // Extract target field and group by information + let target_field = query_info.target_field.clone(); + + // Determine spatial routing from group_by_buckets + let (spatial_output, rollup) = match &query_info.group_by_buckets { + Some(bucket_spec) => { + let group_fields = get_group_by_fields(bucket_spec); + let spatial = KeyByLabelNames::new(group_fields); + // For Elasticsearch, all potentially available fields become rollup + // when they're not in the group by + (spatial.clone(), spatial) + } + None => (KeyByLabelNames::empty(), KeyByLabelNames::empty()), + }; + + let configs = build_agg_configs_for_statistics( + &statistics, + treatment_type, + &spatial_output, + &rollup, + &window_cfg, + &query_info.target_field, + Some(&self.index), + Some(&target_field), + "", // Elasticsearch doesn't have spatial filters like SQL + |agg_type: AggregationType, agg_sub_type: &str| { + build_sketch_parameters( + agg_type, + agg_sub_type, + None, + self.sketch_parameters.as_ref(), + ) + }, + ) + .map_err(ControllerError::ElasticDSLParse)?; + + // Calculate cleanup param based on query's time window + let t_lookback = self.t_repeat; // Default to repetition delay + let cleanup_param = if self.cleanup_policy == CleanupPolicy::NoCleanup { + None + } else { + Some( + get_sql_cleanup_param(self.cleanup_policy, t_lookback, self.t_repeat) + .map_err(ControllerError::PlannerError)?, + ) + }; + + Ok((configs, cleanup_param)) + } +} + +/// Map Elasticsearch aggregation types to statistics and treatment types +fn get_elastic_statistics( + agg_type: &ElasticAggregationType, +) -> Result<(QueryTreatmentType, Vec), ControllerError> { + match agg_type { + ElasticAggregationType::Avg => { + // AVG requires SUM and COUNT + Ok((QueryTreatmentType::Exact, vec![Statistic::Sum, Statistic::Count])) + } + ElasticAggregationType::Sum => Ok((QueryTreatmentType::Approximate, vec![Statistic::Sum])), + ElasticAggregationType::Min => Ok((QueryTreatmentType::Exact, vec![Statistic::Min])), + ElasticAggregationType::Max => Ok((QueryTreatmentType::Exact, vec![Statistic::Max])), + ElasticAggregationType::Percentiles(percents) => { + // For percentiles, we use quantile statistic + // Check that we have valid percentiles + if percents.is_empty() { + return Err(ControllerError::UnsupportedElasticDSLQuery( + "Percentiles aggregation must specify percentile values".to_string(), + )); + } + Ok((QueryTreatmentType::Approximate, vec![Statistic::Quantile])) + } + } +} + +/// Extract field names from group by specification +fn get_group_by_fields(bucket_spec: &elastic_dsl_utilities::ast_parsing::GroupBySpec) -> Vec { + use elastic_dsl_utilities::ast_parsing::GroupBySpec; + match bucket_spec { + GroupBySpec::Fields(fields) => fields.clone(), + GroupBySpec::Filters(predicates) => { + // For filter-based grouping, we extract field names from predicates + let mut fields = Vec::new(); + for predicate in predicates { + match predicate { + elastic_dsl_utilities::ast_parsing::Predicate::Term { field, .. } => { + if !fields.contains(field) { + fields.push(field.clone()); + } + } + elastic_dsl_utilities::ast_parsing::Predicate::Range { field, .. } => { + if !fields.contains(field) { + fields.push(field.clone()); + } + } + } + } + fields + } + } +} diff --git a/asap-planner-rs/src/planner/mod.rs b/asap-planner-rs/src/planner/mod.rs index 44475bec..cc763854 100644 --- a/asap-planner-rs/src/planner/mod.rs +++ b/asap-planner-rs/src/planner/mod.rs @@ -2,4 +2,5 @@ pub mod logics; pub mod patterns; pub mod single_query; pub mod sql_single_query; +pub mod elastic_single_query; pub use single_query::*; From 90e24e50de70bd5ea75359541ccf5aa92cda2e56 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Fri, 8 May 2026 20:48:10 -0400 Subject: [PATCH 2/5] Add index specific mappings (schemas) for Elastic DSL in planner (similar to SQLSchema). --- Cargo.lock | 1 + .../dependencies/rs/asap_types/Cargo.toml | 1 + .../rs/asap_types/src/inference_config.rs | 59 +++++- .../rs/asap_types/src/streaming_config.rs | 2 +- .../src/ast_parsing/extract_info.rs | 30 +++ .../src/ast_parsing/query_info.rs | 3 + .../rs/elastic_dsl_utilities/src/lib.rs | 68 +++++++ asap-planner-rs/src/config/input.rs | 3 +- .../src/output/elastic_generator.rs | 173 +++++++++++++++++- .../tests/elastic_dsl_integration.rs | 30 +++ asap-planner-rs/tests/elastic_example.yaml | 34 ++++ .../src/engines/simple_engine/sql.rs | 2 +- .../tests/test_utilities/config_builders.rs | 2 +- 13 files changed, 399 insertions(+), 9 deletions(-) create mode 100644 asap-planner-rs/tests/elastic_dsl_integration.rs create mode 100644 asap-planner-rs/tests/elastic_example.yaml diff --git a/Cargo.lock b/Cargo.lock index 2da594de..d6a70d2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -434,6 +434,7 @@ version = "0.3.0" dependencies = [ "anyhow", "clap 4.6.0", + "elastic_dsl_utilities", "promql_utilities", "serde", "serde_json", diff --git a/asap-common/dependencies/rs/asap_types/Cargo.toml b/asap-common/dependencies/rs/asap_types/Cargo.toml index 6ca70998..5d22cdaa 100644 --- a/asap-common/dependencies/rs/asap_types/Cargo.toml +++ b/asap-common/dependencies/rs/asap_types/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true [dependencies] promql_utilities.workspace = true +elastic_dsl_utilities.workspace = true tracing.workspace = true sql_utilities.workspace = true serde.workspace = true diff --git a/asap-common/dependencies/rs/asap_types/src/inference_config.rs b/asap-common/dependencies/rs/asap_types/src/inference_config.rs index 8ddb3ca6..ff8da7de 100644 --- a/asap-common/dependencies/rs/asap_types/src/inference_config.rs +++ b/asap-common/dependencies/rs/asap_types/src/inference_config.rs @@ -10,13 +10,14 @@ use crate::promql_schema::PromQLSchema; use crate::query_config::QueryConfig; use promql_utilities::data_model::KeyByLabelNames; use sql_utilities::sqlhelper::{SQLSchema, Table}; +use elastic_dsl_utilities::{ElasticIndexSchema, ElasticMappingSchema}; /// Schema configuration that can be either PromQL or SQL format #[derive(Debug, Clone)] pub enum SchemaConfig { PromQL(PromQLSchema), SQL(SQLSchema), - ElasticQueryDSL, + ElasticQueryDSL(ElasticMappingSchema), ElasticSQL(SQLSchema), } @@ -32,7 +33,9 @@ impl InferenceConfig { let schema = match query_language { QueryLanguage::promql => SchemaConfig::PromQL(PromQLSchema::new()), QueryLanguage::sql => SchemaConfig::SQL(SQLSchema::new(Vec::new())), - QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL, + QueryLanguage::elastic_querydsl => { + SchemaConfig::ElasticQueryDSL(ElasticMappingSchema::new(Vec::new())) + } QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL(SQLSchema::new(Vec::new())), }; Self { @@ -60,7 +63,10 @@ impl InferenceConfig { let sql_schema = Self::parse_sql_schema(data)?; SchemaConfig::SQL(sql_schema) } - QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL, + QueryLanguage::elastic_querydsl => { + let elastic_schema = Self::parse_elastic_querydsl_schema(data)?; + SchemaConfig::ElasticQueryDSL(elastic_schema) + } QueryLanguage::elastic_sql => { let sql_schema = Self::parse_sql_schema(data)?; SchemaConfig::SQL(sql_schema) @@ -153,6 +159,53 @@ impl InferenceConfig { Ok(SQLSchema::new(tables)) } + /// Parse Elasticsearch mapping schema from YAML data (indices: key at top level). + fn parse_elastic_querydsl_schema(data: &Value) -> Result { + let Some(indices_data) = data.get("indices").and_then(|v| v.as_sequence()) else { + return Ok(ElasticMappingSchema::new(Vec::new())); + }; + + let mut indices = Vec::new(); + for index_data in indices_data { + let name = index_data + .get("name") + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing name field in elastic index"))? + .to_string(); + + let time_field = index_data + .get("time_field") + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing time_field field in elastic index {name}"))? + .to_string(); + + let metric_columns: HashSet = index_data + .get("metric_columns") + .and_then(|v| v.as_sequence()) + .ok_or_else(|| anyhow::anyhow!("Missing metric_columns field in elastic index {name}"))? + .iter() + .filter_map(|v| v.as_str()) + .map(|s| s.to_string()) + .collect(); + + let metadata_columns: HashSet = index_data + .get("metadata_columns") + .and_then(|v| v.as_sequence()) + .ok_or_else(|| anyhow::anyhow!("Missing metadata_columns field in elastic index {name}"))? + .iter() + .filter_map(|v| v.as_str()) + .map(|s| s.to_string()) + .collect(); + + indices.push(( + name, + ElasticIndexSchema::new(time_field, metric_columns, metadata_columns), + )); + } + + Ok(ElasticMappingSchema::new(indices)) + } + /// Parse cleanup policy from YAML data. Errors if not specified. fn parse_cleanup_policy(data: &Value) -> Result { let cleanup_policy_data = data.get("cleanup_policy").ok_or_else(|| { diff --git a/asap-common/dependencies/rs/asap_types/src/streaming_config.rs b/asap-common/dependencies/rs/asap_types/src/streaming_config.rs index 5a7b800c..6833f81d 100644 --- a/asap-common/dependencies/rs/asap_types/src/streaming_config.rs +++ b/asap-common/dependencies/rs/asap_types/src/streaming_config.rs @@ -72,7 +72,7 @@ impl StreamingConfig { .map(|ic| match &ic.schema { SchemaConfig::PromQL(_) => QueryLanguage::promql, SchemaConfig::SQL(_) => QueryLanguage::sql, - SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl, + SchemaConfig::ElasticQueryDSL(_) => QueryLanguage::elastic_querydsl, SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql, }) .unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs index 299af3b1..5a4f7743 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs @@ -1,6 +1,7 @@ use crate::ast_parsing::query_info::{ AggregationType, ElasticDSLQueryInfo, FieldName, GroupBySpec, Predicate, TermValue, }; +use crate::datemath::TimeRange; use crate::helpers::strip_keyword_suffix; use elasticsearch_dsl_ast::{self as dsl}; use serde_json; @@ -36,8 +37,10 @@ pub fn walk_ast_and_extract_info(ast: &dsl::Search) -> Option Option { } } +fn infer_time_field(predicates: &[Predicate]) -> FieldName { + for predicate in predicates { + if let Predicate::Range { field, gte, lte } = predicate { + let bound_is_time_like = gte.iter().chain(lte.iter()).any(|term| { + matches!(term, TermValue::String(value) if TimeRange::parse_date_math(value.as_str(), 0).is_some()) + }); + let looks_like_time_field = field == "@timestamp" + || field.contains("time") + || field.contains("timestamp") + || bound_is_time_like; + + if looks_like_time_field { + return field.clone(); + } + } + } + + for predicate in predicates { + if let Predicate::Range { field, .. } = predicate { + return field.clone(); + } + } + + "@timestamp".to_string() +} + #[cfg(test)] mod tests { use super::*; @@ -350,6 +379,7 @@ mod tests { let info = walk_ast_and_extract_info(&ast).expect("info should parse"); assert_eq!(info.target_field, "latency_ms"); + assert_eq!(info.time_field, "@timestamp"); assert_eq!(info.aggregation, AggregationType::Max); assert_eq!(info.predicates.len(), 2); assert_eq!( diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs index ce82d705..7905bf48 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs @@ -6,6 +6,7 @@ pub type FieldName = String; pub struct ElasticDSLQueryInfo { // A distilled representation of an ElasticSearch DSL query, capturing the essential logic and structure. pub target_field: FieldName, // List of metrics being queried + pub time_field: FieldName, // Time field used by the query's range filter pub predicates: Vec, // Predicates applied to the query (e.g. filters in bool.filter) pub group_by_buckets: Option, // Grouping specification if the query includes a group by clause pub aggregation: AggregationType, // The statistic being computed (e.g. avg, sum, percentiles) @@ -16,12 +17,14 @@ impl ElasticDSLQueryInfo { pub fn new( target_field: FieldName, + time_field: FieldName, predicates: Vec, group_by_buckets: Option, aggregation: AggregationType, ) -> Self { Self { target_field, + time_field, predicates, group_by_buckets, aggregation, diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs index 4a86ba8b..695610f7 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -5,3 +5,71 @@ pub mod helpers; pub use ast_parsing::*; pub use datemath::*; pub use helpers::*; + +use std::collections::{HashMap, HashSet}; + + +#[derive(Debug, Clone)] +pub struct ElasticIndexSchema { + pub time_field: String, + pub metric_columns: HashSet, + pub metadata_columns: HashSet, +} + +impl ElasticIndexSchema { + pub fn new( + time_field: String, + metric_columns: HashSet, + metadata_columns: HashSet, + ) -> Self { + Self { + time_field, + metric_columns, + metadata_columns, + } + } +} + +#[derive(Debug, Clone)] +pub struct ElasticMappingSchema { + pub config: HashMap, +} + +impl ElasticMappingSchema { + pub fn new(indexes: Vec<(String, ElasticIndexSchema)>) -> Self { + let mut config = HashMap::new(); + for (index_name, index_schema) in indexes { + config.insert(index_name, index_schema); + } + Self { config } + } + + pub fn add_index(mut self, index: String, schema: ElasticIndexSchema) -> Self { + self.config.insert(index, schema); + self + } + + pub fn get_time_field(&self, index: &str) -> Option<&String> { + self.config.get(index).map(|schema| &schema.time_field) + } + + pub fn get_metric_columns(&self, index: &str) -> Option<&HashSet> { + self.config.get(index).map(|schema| &schema.metric_columns) + } + + pub fn get_metadata_columns(&self, index: &str) -> Option<&HashSet> { + self.config.get(index).map(|schema| &schema.metadata_columns) + } + + pub fn is_valid_metric_column(&self, index: &str, metric_column: &str) -> bool { + self.get_metric_columns(index) + .map(|columns| columns.contains(metric_column)) + .unwrap_or(false) + } + + pub fn are_valid_metadata_columns(&self, index: &str, columns: &HashSet) -> bool { + self.get_metadata_columns(index) + .map(|schema_columns| columns.iter().all(|c| schema_columns.contains(c))) + .unwrap_or(false) + } +} \ No newline at end of file diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 75c599a7..12369088 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -141,7 +141,7 @@ pub struct TableDefinition { #[derive(Debug, Clone, Deserialize)] pub struct ElasticDSLControllerConfig { pub query_groups: Vec, - pub index: String, + pub index: Option, pub sketch_parameters: Option, pub aggregate_cleanup: Option, } @@ -151,5 +151,6 @@ pub struct ElasticDSLQueryGroup { pub id: Option, pub queries: Vec, pub repetition_delay: u64, + pub index: Option, pub controller_options: ControllerOptions, } diff --git a/asap-planner-rs/src/output/elastic_generator.rs b/asap-planner-rs/src/output/elastic_generator.rs index 8cc9b678..85788eca 100644 --- a/asap-planner-rs/src/output/elastic_generator.rs +++ b/asap-planner-rs/src/output/elastic_generator.rs @@ -1,5 +1,6 @@ use asap_types::enums::CleanupPolicy; use indexmap::IndexMap; +use indexmap::IndexSet; use serde_yaml::Value as YamlValue; use std::collections::HashMap; @@ -9,10 +10,44 @@ use crate::output::generator::{ build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, KEY_CLEANUP_POLICY, KEY_NAME, KEY_QUERIES, }; +use elastic_dsl_utilities::ast_parsing::{extract_query_info, GroupBySpec, Predicate}; use crate::planner::single_query::IntermediateAggConfig; use crate::planner::elastic_single_query::ElasticSingleQueryProcessor; use crate::StreamingEngine; +#[derive(Default)] +struct ElasticIndexSchemaBuilder { + time_field: Option, + metric_columns: IndexSet, + metadata_columns: IndexSet, +} + +impl ElasticIndexSchemaBuilder { + fn update_from_query_info( + &mut self, + query_info: &elastic_dsl_utilities::ast_parsing::ElasticDSLQueryInfo, + ) -> Result<(), ControllerError> { + match &self.time_field { + Some(existing) if existing != &query_info.time_field => { + return Err(ControllerError::PlannerError(format!( + "conflicting time fields for Elasticsearch index: '{}' vs '{}'", + existing, query_info.time_field + ))); + } + None => self.time_field = Some(query_info.time_field.clone()), + _ => {} + } + + self.metric_columns.insert(query_info.target_field.clone()); + + for field in collect_elastic_metadata_fields(query_info) { + self.metadata_columns.insert(field); + } + + Ok(()) + } +} + pub struct ElasticRuntimeOptions { pub streaming_engine: StreamingEngine, pub data_ingestion_interval: u64, @@ -52,14 +87,29 @@ pub fn generate_elastic_plan( let mut dedup_map: IndexMap = IndexMap::new(); // query_string -> Vec<(key, cleanup_param)> let mut query_keys_map: IndexMap)>> = IndexMap::new(); + // index -> schema builder derived from the queries targeting that index + let mut index_schema_builders: IndexMap = IndexMap::new(); for qg in &config.query_groups { + let index = resolve_elastic_index(config, qg)?; for query_string in &qg.queries { + let query_info = extract_query_info(query_string).ok_or_else(|| { + ControllerError::ElasticDSLParse(format!( + "Failed to parse Elasticsearch DSL query: {}", + query_string + )) + })?; + + index_schema_builders + .entry(index.clone()) + .or_default() + .update_from_query_info(&query_info)?; + let processor = ElasticSingleQueryProcessor::new( query_string.clone(), qg.repetition_delay, opts.data_ingestion_interval, - config.index.clone(), + index.clone(), opts.streaming_engine, config.sketch_parameters.clone(), cleanup_policy, @@ -84,7 +134,13 @@ pub fn generate_elastic_plan( } let streaming_yaml = build_elastic_streaming_yaml(config, &dedup_map, &id_map)?; - let inference_yaml = build_elastic_inference_yaml(config, cleanup_policy, &query_keys_map, &id_map)?; + let inference_yaml = build_elastic_inference_yaml( + config, + cleanup_policy, + &query_keys_map, + &id_map, + &index_schema_builders, + )?; Ok(GeneratorOutput { punted_queries: Vec::new(), @@ -119,6 +175,7 @@ fn build_elastic_inference_yaml( cleanup_policy: CleanupPolicy, query_keys_map: &IndexMap)>>, id_map: &HashMap, + index_schema_builders: &IndexMap, ) -> Result { let mut cleanup_map = serde_yaml::Mapping::new(); cleanup_map.insert( @@ -135,6 +192,118 @@ fn build_elastic_inference_yaml( YamlValue::String(KEY_QUERIES.to_string()), YamlValue::Sequence(build_queries_yaml(cleanup_policy, query_keys_map, id_map)), ); + root.insert( + YamlValue::String("indices".to_string()), + YamlValue::Sequence( + index_schema_builders + .iter() + .map(|(index_name, builder)| build_elastic_index_yaml(index_name, builder)) + .collect(), + ), + ); Ok(YamlValue::Mapping(root)) } + +fn build_elastic_index_yaml(index_name: &str, builder: &ElasticIndexSchemaBuilder) -> YamlValue { + let mut map = serde_yaml::Mapping::new(); + map.insert( + YamlValue::String("name".to_string()), + YamlValue::String(index_name.to_string()), + ); + map.insert( + YamlValue::String("time_field".to_string()), + YamlValue::String( + builder + .time_field + .clone() + .unwrap_or_else(|| "@timestamp".to_string()), + ), + ); + map.insert( + YamlValue::String("metric_columns".to_string()), + YamlValue::Sequence( + builder + .metric_columns + .iter() + .cloned() + .map(YamlValue::String) + .collect(), + ), + ); + map.insert( + YamlValue::String("metadata_columns".to_string()), + YamlValue::Sequence( + builder + .metadata_columns + .iter() + .cloned() + .map(YamlValue::String) + .collect(), + ), + ); + + YamlValue::Mapping(map) +} + +fn resolve_elastic_index( + config: &ElasticDSLControllerConfig, + query_group: &crate::config::input::ElasticDSLQueryGroup, +) -> Result { + query_group + .index + .clone() + .or_else(|| config.index.clone()) + .ok_or_else(|| { + ControllerError::PlannerError( + "each Elasticsearch query group must specify an index (or inherit one from the controller config)" + .to_string(), + ) + }) +} + +fn collect_elastic_metadata_fields( + query_info: &elastic_dsl_utilities::ast_parsing::ElasticDSLQueryInfo, +) -> IndexSet { + let mut fields = IndexSet::new(); + + for predicate in &query_info.predicates { + match predicate { + Predicate::Term { field, .. } => { + if field != &query_info.time_field { + fields.insert(field.clone()); + } + } + Predicate::Range { field, .. } => { + if field != &query_info.time_field { + fields.insert(field.clone()); + } + } + } + } + + if let Some(group_by_buckets) = &query_info.group_by_buckets { + match group_by_buckets { + GroupBySpec::Fields(group_fields) => { + for field in group_fields { + if field != &query_info.time_field { + fields.insert(field.clone()); + } + } + } + GroupBySpec::Filters(predicates) => { + for predicate in predicates { + match predicate { + Predicate::Term { field, .. } | Predicate::Range { field, .. } => { + if field != &query_info.time_field { + fields.insert(field.clone()); + } + } + } + } + } + } + } + + fields +} diff --git a/asap-planner-rs/tests/elastic_dsl_integration.rs b/asap-planner-rs/tests/elastic_dsl_integration.rs new file mode 100644 index 00000000..2dea8a2f --- /dev/null +++ b/asap-planner-rs/tests/elastic_dsl_integration.rs @@ -0,0 +1,30 @@ +use asap_planner::{ + ElasticController, ElasticRuntimeOptions, StreamingEngine, +}; +use asap_types::{QueryLanguage, SchemaConfig}; +use std::path::Path; + + +#[test] +fn elastic_querydsl_emits_index_schema() { + let opts = ElasticRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + data_ingestion_interval: 15, + }; + let c = ElasticController::from_file(Path::new("tests/elastic_example.yaml"), opts).unwrap(); + let out = c.generate().unwrap(); + let inference_config = out + .to_inference_config(QueryLanguage::elastic_querydsl) + .unwrap(); + + match inference_config.schema { + SchemaConfig::ElasticQueryDSL(schema) => { + assert_eq!(schema.get_time_field("metrics"), Some(&"@timestamp".to_string())); + let metric_columns = schema.get_metric_columns("metrics").unwrap(); + assert!(metric_columns.contains("cpu_usage")); + let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); + assert!(metadata_columns.is_empty()); + } + other => panic!("expected elastic querydsl schema, got {:?}", other), + } +} \ No newline at end of file diff --git a/asap-planner-rs/tests/elastic_example.yaml b/asap-planner-rs/tests/elastic_example.yaml new file mode 100644 index 00000000..9e431d90 --- /dev/null +++ b/asap-planner-rs/tests/elastic_example.yaml @@ -0,0 +1,34 @@ +query_groups: + - id: 1 + index: metrics + queries: + - | + { + "aggs": { + "avg_cpu": { + "avg": { + "field": "cpu_usage" + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } + } + repetition_delay: 300 + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 +aggregate_cleanup: + policy: read_based diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index e84f2a5f..26ff945c 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -218,7 +218,7 @@ impl SimpleEngine { warn!("SQL query requested but config has PromQL schema"); return None; } - &SchemaConfig::ElasticQueryDSL => todo!(), + &SchemaConfig::ElasticQueryDSL(_) => todo!(), SchemaConfig::ElasticSQL(sql_schema) => sql_schema.clone(), }; diff --git a/asap-query-engine/src/tests/test_utilities/config_builders.rs b/asap-query-engine/src/tests/test_utilities/config_builders.rs index ec691272..4bd68ade 100644 --- a/asap-query-engine/src/tests/test_utilities/config_builders.rs +++ b/asap-query-engine/src/tests/test_utilities/config_builders.rs @@ -306,7 +306,7 @@ mod tests { assert!(promql_schema.get_labels("cpu_usage").is_some()); } SchemaConfig::SQL(_) => panic!("Expected PromQL schema"), - SchemaConfig::ElasticQueryDSL => panic!("Expected PromQL schema"), + SchemaConfig::ElasticQueryDSL(_) => panic!("Expected PromQL schema"), SchemaConfig::ElasticSQL(_) => panic!("Expected PromQL schema"), } From 26e66d96e781b77a7cb96d9dc4c741c7bdf46bb3 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Tue, 12 May 2026 16:19:59 -0400 Subject: [PATCH 3/5] Add more integration tests (ES DSL planner). --- .../tests/elastic_dsl_integration.rs | 391 +++++++++++++++++- 1 file changed, 388 insertions(+), 3 deletions(-) diff --git a/asap-planner-rs/tests/elastic_dsl_integration.rs b/asap-planner-rs/tests/elastic_dsl_integration.rs index 2dea8a2f..a46c8d08 100644 --- a/asap-planner-rs/tests/elastic_dsl_integration.rs +++ b/asap-planner-rs/tests/elastic_dsl_integration.rs @@ -1,8 +1,81 @@ -use asap_planner::{ - ElasticController, ElasticRuntimeOptions, StreamingEngine, -}; +use asap_planner::{ElasticController, ElasticRuntimeOptions, PlannerOutput, StreamingEngine}; use asap_types::{QueryLanguage, SchemaConfig}; +use std::io::Write; use std::path::Path; +use tempfile::NamedTempFile; + +fn indent_block(text: &str, indent: usize) -> String { + let padding = " ".repeat(indent); + text.trim() + .lines() + .map(|line| format!("{}{}", padding, line)) + .collect::>() + .join("\n") +} + +fn elastic_yaml(index: &str, query: &str, t_repeat: u64) -> String { + format!( + r#" +query_groups: + - id: 1 + index: {index} + queries: + - | +{query} + repetition_delay: {t_repeat} + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 +aggregate_cleanup: + policy: read_based +"#, + index = index, + query = indent_block(query, 8), + t_repeat = t_repeat, + ) +} + +fn elastic_output(index: &str, query: &str, t_repeat: u64) -> PlannerOutput { + let yaml = elastic_yaml(index, query, t_repeat); + let mut file = NamedTempFile::new().unwrap(); + file.write_all(yaml.as_bytes()).unwrap(); + + let opts = ElasticRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + data_ingestion_interval: 15, + }; + + ElasticController::from_file(Path::new(file.path()), opts) + .unwrap() + .generate() + .unwrap() +} + +fn assert_index_schema( + out: &PlannerOutput, + index: &str, + metric_column: &str, + metadata_columns: &[&str], +) { + let inference_config = out + .to_inference_config(QueryLanguage::elastic_querydsl) + .unwrap(); + + match inference_config.schema { + SchemaConfig::ElasticQueryDSL(schema) => { + assert_eq!(schema.get_time_field(index), Some(&"@timestamp".to_string())); + + let metric_columns = schema.get_metric_columns(index).unwrap(); + assert!(metric_columns.contains(metric_column)); + + let actual_metadata = schema.get_metadata_columns(index).unwrap(); + for column in metadata_columns { + assert!(actual_metadata.contains(*column)); + } + } + other => panic!("expected elastic querydsl schema, got {:?}", other), + } +} #[test] @@ -27,4 +100,316 @@ fn elastic_querydsl_emits_index_schema() { } other => panic!("expected elastic querydsl schema, got {:?}", other), } +} + +#[test] +fn elastic_sum_produces_basic_plan_and_schema() { + let query = r#" +{ + "aggs": { + "by_datacenter": { + "terms": { + "field": "datacenter.keyword" + }, + "aggs": { + "sum_cpu": { + "sum": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 2); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("CountMinSketch")); + assert!(out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!(out.aggregation_table_name("CountMinSketch"), Some("metrics".to_string())); + assert_eq!(out.aggregation_value_column("CountMinSketch"), Some("cpu_usage".to_string())); + assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); +} + +#[test] +fn elastic_avg_produces_three_configs() { + let query = r#" +{ + "aggs": { + "by_datacenter": { + "terms": { + "field": "datacenter.keyword" + }, + "aggs": { + "avg_cpu": { + "avg": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 2); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("MultipleSum")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!(out.aggregation_table_name("MultipleSum"), Some("metrics".to_string())); + assert_eq!(out.aggregation_value_column("MultipleSum"), Some("cpu_usage".to_string())); + + assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); +} + +#[test] +fn elastic_min_produces_exact_plan() { + let query = r#" +{ + "aggs": { + "by_service": { + "terms": { + "field": "service.keyword" + }, + "aggs": { + "min_cpu": { + "min": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("MultipleMinMax")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!(out.aggregation_table_name("MultipleMinMax"), Some("metrics".to_string())); + assert_eq!(out.aggregation_value_column("MultipleMinMax"), Some("cpu_usage".to_string())); + assert_index_schema(&out, "metrics", "cpu_usage", &["service"]); +} + +#[test] +fn elastic_percentiles_produce_kll_plan() { + let query = r#" +{ + "aggs": { + "by_service": { + "terms": { + "field": "service.keyword" + }, + "aggs": { + "latency_percentiles": { + "percentiles": { + "field": "latency_ms", + "percents": [50.0, 95.0] + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } +} +"#; + let out = elastic_output("metrics", query, 300); + + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("DatasketchesKLL")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!(out.aggregation_table_name("DatasketchesKLL"), Some("metrics".to_string())); + assert_eq!(out.aggregation_value_column("DatasketchesKLL"), Some("latency_ms".to_string())); + assert_index_schema(&out, "metrics", "latency_ms", &["service"]); +} + +#[test] +fn elastic_multi_index_schema_inference() { + let yaml = r#" +query_groups: + - id: 1 + index: metrics + queries: + - | + { + "aggs": { + "by_datacenter": { + "terms": { + "field": "datacenter.keyword" + }, + "aggs": { + "avg_cpu": { + "avg": { + "field": "cpu_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } + } + repetition_delay: 300 + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 + - id: 2 + index: other_metrics + queries: + - | + { + "aggs": { + "by_service": { + "terms": { + "field": "service.keyword" + }, + "aggs": { + "avg_mem": { + "avg": { + "field": "memory_usage" + } + } + } + } + }, + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + } + } + repetition_delay: 300 + controller_options: + accuracy_sla: 0.95 + latency_sla: 1.0 +aggregate_cleanup: + policy: read_based +"#; + + let mut file = NamedTempFile::new().unwrap(); + file.write_all(yaml.as_bytes()).unwrap(); + + let opts = ElasticRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + data_ingestion_interval: 15, + }; + + let c = ElasticController::from_file(Path::new(file.path()), opts).unwrap(); + let out = c.generate().unwrap(); + + assert_eq!(out.streaming_aggregation_count(), 4); + assert_eq!(out.inference_query_count(), 2); + assert!(out.has_aggregation_type("MultipleSum")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + + let inference_config = out + .to_inference_config(QueryLanguage::elastic_querydsl) + .unwrap(); + + match inference_config.schema { + SchemaConfig::ElasticQueryDSL(schema) => { + assert_eq!(schema.get_time_field("metrics"), Some(&"@timestamp".to_string())); + let metric_columns = schema.get_metric_columns("metrics").unwrap(); + assert!(metric_columns.contains("cpu_usage")); + let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); + assert!(metadata_columns.contains("datacenter")); + + assert_eq!( + schema.get_time_field("other_metrics"), + Some(&"@timestamp".to_string()) + ); + let other_metric_columns = schema.get_metric_columns("other_metrics").unwrap(); + assert!(other_metric_columns.contains("memory_usage")); + let other_metadata_columns = schema.get_metadata_columns("other_metrics").unwrap(); + assert!(other_metadata_columns.contains("service")); + } + other => panic!("expected elastic querydsl schema, got {:?}", other), + } } \ No newline at end of file From 3bca6c3c1988caa96f27f0fa06ab4830a12ed0d6 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Tue, 12 May 2026 16:48:19 -0400 Subject: [PATCH 4/5] Refactor/rearrange Elastic DSL changes to match new crate layout. --- Cargo.lock | 4 +- asap-planner-rs/src/elastic_dsl/controller.rs | 46 +++++++++++++++++++ .../generator.rs} | 6 +-- asap-planner-rs/src/elastic_dsl/mod.rs | 4 ++ asap-planner-rs/src/lib.rs | 3 ++ ...elastic_single_query.rs => elastic_dsl.rs} | 6 ++- asap-planner-rs/src/planner/mod.rs | 1 + 7 files changed, 63 insertions(+), 7 deletions(-) create mode 100644 asap-planner-rs/src/elastic_dsl/controller.rs rename asap-planner-rs/src/{output/elastic_generator.rs => elastic_dsl/generator.rs} (98%) create mode 100644 asap-planner-rs/src/elastic_dsl/mod.rs rename asap-planner-rs/src/planner/{elastic_single_query.rs => elastic_dsl.rs} (96%) diff --git a/Cargo.lock b/Cargo.lock index b79abe1f..8e95d34a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -392,9 +392,9 @@ dependencies = [ "anyhow", "asap_types", "chrono", - "clap 4.6.0", + "clap 4.6.1", "elastic_dsl_utilities", - "indexmap 2.13.1", + "indexmap 2.14.0", "pretty_assertions", "promql-parser", "promql_utilities", diff --git a/asap-planner-rs/src/elastic_dsl/controller.rs b/asap-planner-rs/src/elastic_dsl/controller.rs new file mode 100644 index 00000000..ae1bab3f --- /dev/null +++ b/asap-planner-rs/src/elastic_dsl/controller.rs @@ -0,0 +1,46 @@ + +use std::path::Path; + +use crate::config::input::ElasticDSLControllerConfig; +use crate::error::ControllerError; +use crate::planner_output::PlannerOutput; +use crate::elastic_dsl::generator::{ElasticRuntimeOptions, generate_elastic_plan}; + +pub struct ElasticController { + config: ElasticDSLControllerConfig, + options: ElasticRuntimeOptions, +} + +impl ElasticController { + pub fn new(config: ElasticDSLControllerConfig, options: ElasticRuntimeOptions) -> Self { + Self { config, options } + } + + pub fn from_file(path: &Path, opts: ElasticRuntimeOptions) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + Self::from_yaml(&yaml_str, opts) + } + + pub fn from_yaml(yaml: &str, opts: ElasticRuntimeOptions) -> Result { + let config: ElasticDSLControllerConfig = serde_yaml::from_str(yaml)?; + Ok(Self { + config, + options: opts, + }) + } + + pub fn generate(&self) -> Result { + let output = generate_elastic_plan(&self.config, &self.options)?; + Ok(PlannerOutput::from_output(output)) + } + + pub fn generate_to_dir(&self, dir: &Path) -> Result { + let output = self.generate()?; + std::fs::create_dir_all(dir)?; + let streaming_str = serde_yaml::to_string(output.streaming_yaml())?; + let inference_str = serde_yaml::to_string(output.inference_yaml())?; + std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?; + std::fs::write(dir.join("inference_config.yaml"), inference_str)?; + Ok(output) + } +} \ No newline at end of file diff --git a/asap-planner-rs/src/output/elastic_generator.rs b/asap-planner-rs/src/elastic_dsl/generator.rs similarity index 98% rename from asap-planner-rs/src/output/elastic_generator.rs rename to asap-planner-rs/src/elastic_dsl/generator.rs index 85788eca..62e05a78 100644 --- a/asap-planner-rs/src/output/elastic_generator.rs +++ b/asap-planner-rs/src/elastic_dsl/generator.rs @@ -6,13 +6,13 @@ use std::collections::HashMap; use crate::config::input::ElasticDSLControllerConfig; use crate::error::ControllerError; -use crate::output::generator::{ +use crate::generator::{ build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, KEY_CLEANUP_POLICY, KEY_NAME, KEY_QUERIES, }; use elastic_dsl_utilities::ast_parsing::{extract_query_info, GroupBySpec, Predicate}; -use crate::planner::single_query::IntermediateAggConfig; -use crate::planner::elastic_single_query::ElasticSingleQueryProcessor; +use crate::planner::agg_config::IntermediateAggConfig; +use crate::planner::elastic_dsl::ElasticSingleQueryProcessor; use crate::StreamingEngine; #[derive(Default)] diff --git a/asap-planner-rs/src/elastic_dsl/mod.rs b/asap-planner-rs/src/elastic_dsl/mod.rs new file mode 100644 index 00000000..626dbd05 --- /dev/null +++ b/asap-planner-rs/src/elastic_dsl/mod.rs @@ -0,0 +1,4 @@ +pub mod controller; +pub mod generator; +pub use controller::ElasticController; +pub use generator::ElasticRuntimeOptions; \ No newline at end of file diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index ad02317e..56faff3a 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -7,6 +7,7 @@ pub mod prometheus_client; pub mod promql; pub mod query_log; pub mod sql; +pub mod elastic_dsl; pub use asap_types::PromQLSchema; pub use config::input::ControllerConfig; @@ -19,6 +20,8 @@ pub use prometheus_client::build_schema_from_prometheus; pub use promql::Controller; pub use sql::SQLController; pub use sql::SQLRuntimeOptions; +pub use elastic_dsl::ElasticController; +pub use elastic_dsl::ElasticRuntimeOptions; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamingEngine { diff --git a/asap-planner-rs/src/planner/elastic_single_query.rs b/asap-planner-rs/src/planner/elastic_dsl.rs similarity index 96% rename from asap-planner-rs/src/planner/elastic_single_query.rs rename to asap-planner-rs/src/planner/elastic_dsl.rs index db258909..ff763dd8 100644 --- a/asap-planner-rs/src/planner/elastic_single_query.rs +++ b/asap-planner-rs/src/planner/elastic_dsl.rs @@ -7,8 +7,10 @@ use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, use crate::config::input::SketchParameterOverrides; use crate::error::ControllerError; -use crate::planner::logics::{build_sketch_parameters, get_sql_cleanup_param, IntermediateWindowConfig}; -use crate::planner::single_query::{build_agg_configs_for_statistics, IntermediateAggConfig}; +use crate::planner::sketch::build_sketch_parameters; +use crate::planner::window::IntermediateWindowConfig; +use crate::planner::agg_config::{build_agg_configs_for_statistics, IntermediateAggConfig}; +use crate::planner::cleanup::get_sql_cleanup_param; use crate::StreamingEngine; pub struct ElasticSingleQueryProcessor { diff --git a/asap-planner-rs/src/planner/mod.rs b/asap-planner-rs/src/planner/mod.rs index ce9b172c..2e6fbb19 100644 --- a/asap-planner-rs/src/planner/mod.rs +++ b/asap-planner-rs/src/planner/mod.rs @@ -6,5 +6,6 @@ pub mod promql; pub mod sketch; pub mod sql; pub mod window; +pub mod elastic_dsl; pub use agg_config::*; pub use promql::*; From 018fc81cc2c8e77ab78431c01ffb1ba3d8fb8f58 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Tue, 12 May 2026 16:50:57 -0400 Subject: [PATCH 5/5] Cargo fmt and clippy. --- .../rs/asap_types/src/inference_config.rs | 10 +- .../rs/elastic_dsl_utilities/src/lib.rs | 7 +- asap-planner-rs/src/elastic_dsl/controller.rs | 5 +- asap-planner-rs/src/elastic_dsl/generator.rs | 2 +- asap-planner-rs/src/elastic_dsl/mod.rs | 2 +- asap-planner-rs/src/lib.rs | 8 +- asap-planner-rs/src/main.rs | 11 +- asap-planner-rs/src/planner/elastic_dsl.rs | 13 +- asap-planner-rs/src/planner/mod.rs | 2 +- .../tests/elastic_dsl_integration.rs | 188 ++++++++++-------- 10 files changed, 146 insertions(+), 102 deletions(-) diff --git a/asap-common/dependencies/rs/asap_types/src/inference_config.rs b/asap-common/dependencies/rs/asap_types/src/inference_config.rs index ff8da7de..6bd9a530 100644 --- a/asap-common/dependencies/rs/asap_types/src/inference_config.rs +++ b/asap-common/dependencies/rs/asap_types/src/inference_config.rs @@ -8,9 +8,9 @@ use crate::aggregation_reference::AggregationReference; use crate::enums::{CleanupPolicy, QueryLanguage}; use crate::promql_schema::PromQLSchema; use crate::query_config::QueryConfig; +use elastic_dsl_utilities::{ElasticIndexSchema, ElasticMappingSchema}; use promql_utilities::data_model::KeyByLabelNames; use sql_utilities::sqlhelper::{SQLSchema, Table}; -use elastic_dsl_utilities::{ElasticIndexSchema, ElasticMappingSchema}; /// Schema configuration that can be either PromQL or SQL format #[derive(Debug, Clone)] @@ -182,7 +182,9 @@ impl InferenceConfig { let metric_columns: HashSet = index_data .get("metric_columns") .and_then(|v| v.as_sequence()) - .ok_or_else(|| anyhow::anyhow!("Missing metric_columns field in elastic index {name}"))? + .ok_or_else(|| { + anyhow::anyhow!("Missing metric_columns field in elastic index {name}") + })? .iter() .filter_map(|v| v.as_str()) .map(|s| s.to_string()) @@ -191,7 +193,9 @@ impl InferenceConfig { let metadata_columns: HashSet = index_data .get("metadata_columns") .and_then(|v| v.as_sequence()) - .ok_or_else(|| anyhow::anyhow!("Missing metadata_columns field in elastic index {name}"))? + .ok_or_else(|| { + anyhow::anyhow!("Missing metadata_columns field in elastic index {name}") + })? .iter() .filter_map(|v| v.as_str()) .map(|s| s.to_string()) diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs index 695610f7..b8f2649f 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -8,7 +8,6 @@ pub use helpers::*; use std::collections::{HashMap, HashSet}; - #[derive(Debug, Clone)] pub struct ElasticIndexSchema { pub time_field: String, @@ -58,7 +57,9 @@ impl ElasticMappingSchema { } pub fn get_metadata_columns(&self, index: &str) -> Option<&HashSet> { - self.config.get(index).map(|schema| &schema.metadata_columns) + self.config + .get(index) + .map(|schema| &schema.metadata_columns) } pub fn is_valid_metric_column(&self, index: &str, metric_column: &str) -> bool { @@ -72,4 +73,4 @@ impl ElasticMappingSchema { .map(|schema_columns| columns.iter().all(|c| schema_columns.contains(c))) .unwrap_or(false) } -} \ No newline at end of file +} diff --git a/asap-planner-rs/src/elastic_dsl/controller.rs b/asap-planner-rs/src/elastic_dsl/controller.rs index ae1bab3f..22df6872 100644 --- a/asap-planner-rs/src/elastic_dsl/controller.rs +++ b/asap-planner-rs/src/elastic_dsl/controller.rs @@ -1,10 +1,9 @@ - use std::path::Path; use crate::config::input::ElasticDSLControllerConfig; +use crate::elastic_dsl::generator::{generate_elastic_plan, ElasticRuntimeOptions}; use crate::error::ControllerError; use crate::planner_output::PlannerOutput; -use crate::elastic_dsl::generator::{ElasticRuntimeOptions, generate_elastic_plan}; pub struct ElasticController { config: ElasticDSLControllerConfig, @@ -43,4 +42,4 @@ impl ElasticController { std::fs::write(dir.join("inference_config.yaml"), inference_str)?; Ok(output) } -} \ No newline at end of file +} diff --git a/asap-planner-rs/src/elastic_dsl/generator.rs b/asap-planner-rs/src/elastic_dsl/generator.rs index 62e05a78..7cbe7c07 100644 --- a/asap-planner-rs/src/elastic_dsl/generator.rs +++ b/asap-planner-rs/src/elastic_dsl/generator.rs @@ -10,10 +10,10 @@ use crate::generator::{ build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, KEY_CLEANUP_POLICY, KEY_NAME, KEY_QUERIES, }; -use elastic_dsl_utilities::ast_parsing::{extract_query_info, GroupBySpec, Predicate}; use crate::planner::agg_config::IntermediateAggConfig; use crate::planner::elastic_dsl::ElasticSingleQueryProcessor; use crate::StreamingEngine; +use elastic_dsl_utilities::ast_parsing::{extract_query_info, GroupBySpec, Predicate}; #[derive(Default)] struct ElasticIndexSchemaBuilder { diff --git a/asap-planner-rs/src/elastic_dsl/mod.rs b/asap-planner-rs/src/elastic_dsl/mod.rs index 626dbd05..78350ee4 100644 --- a/asap-planner-rs/src/elastic_dsl/mod.rs +++ b/asap-planner-rs/src/elastic_dsl/mod.rs @@ -1,4 +1,4 @@ pub mod controller; pub mod generator; pub use controller::ElasticController; -pub use generator::ElasticRuntimeOptions; \ No newline at end of file +pub use generator::ElasticRuntimeOptions; diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 56faff3a..243b1821 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; +pub mod elastic_dsl; pub mod error; pub mod generator; pub mod planner; @@ -7,12 +8,13 @@ pub mod prometheus_client; pub mod promql; pub mod query_log; pub mod sql; -pub mod elastic_dsl; pub use asap_types::PromQLSchema; pub use config::input::ControllerConfig; -pub use config::input::SQLControllerConfig; pub use config::input::ElasticDSLControllerConfig; +pub use config::input::SQLControllerConfig; +pub use elastic_dsl::ElasticController; +pub use elastic_dsl::ElasticRuntimeOptions; pub use error::ControllerError; pub use generator::{GeneratorOutput, PuntedQuery}; pub use planner_output::PlannerOutput; @@ -20,8 +22,6 @@ pub use prometheus_client::build_schema_from_prometheus; pub use promql::Controller; pub use sql::SQLController; pub use sql::SQLRuntimeOptions; -pub use elastic_dsl::ElasticController; -pub use elastic_dsl::ElasticRuntimeOptions; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamingEngine { diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index 3b88abd5..2035dada 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -1,4 +1,7 @@ -use asap_planner::{Controller, RuntimeOptions, SQLController, SQLRuntimeOptions, ElasticController, ElasticRuntimeOptions, StreamingEngine}; +use asap_planner::{ + Controller, ElasticController, ElasticRuntimeOptions, RuntimeOptions, SQLController, + SQLRuntimeOptions, StreamingEngine, +}; use asap_types::enums::QueryLanguage; use clap::Parser; use std::path::PathBuf; @@ -129,9 +132,9 @@ fn main() -> anyhow::Result<()> { let interval = args.data_ingestion_interval.ok_or_else(|| { anyhow::anyhow!("--data-ingestion-interval is required for Elasticsearch DSL mode") })?; - let config_path = args - .input_config - .ok_or_else(|| anyhow::anyhow!("--input_config is required for Elasticsearch DSL mode"))?; + let config_path = args.input_config.ok_or_else(|| { + anyhow::anyhow!("--input_config is required for Elasticsearch DSL mode") + })?; let opts = ElasticRuntimeOptions { streaming_engine: engine, data_ingestion_interval: interval, diff --git a/asap-planner-rs/src/planner/elastic_dsl.rs b/asap-planner-rs/src/planner/elastic_dsl.rs index ff763dd8..317f8567 100644 --- a/asap-planner-rs/src/planner/elastic_dsl.rs +++ b/asap-planner-rs/src/planner/elastic_dsl.rs @@ -7,10 +7,10 @@ use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, use crate::config::input::SketchParameterOverrides; use crate::error::ControllerError; -use crate::planner::sketch::build_sketch_parameters; -use crate::planner::window::IntermediateWindowConfig; use crate::planner::agg_config::{build_agg_configs_for_statistics, IntermediateAggConfig}; use crate::planner::cleanup::get_sql_cleanup_param; +use crate::planner::sketch::build_sketch_parameters; +use crate::planner::window::IntermediateWindowConfig; use crate::StreamingEngine; pub struct ElasticSingleQueryProcessor { @@ -126,7 +126,10 @@ fn get_elastic_statistics( match agg_type { ElasticAggregationType::Avg => { // AVG requires SUM and COUNT - Ok((QueryTreatmentType::Exact, vec![Statistic::Sum, Statistic::Count])) + Ok(( + QueryTreatmentType::Exact, + vec![Statistic::Sum, Statistic::Count], + )) } ElasticAggregationType::Sum => Ok((QueryTreatmentType::Approximate, vec![Statistic::Sum])), ElasticAggregationType::Min => Ok((QueryTreatmentType::Exact, vec![Statistic::Min])), @@ -145,7 +148,9 @@ fn get_elastic_statistics( } /// Extract field names from group by specification -fn get_group_by_fields(bucket_spec: &elastic_dsl_utilities::ast_parsing::GroupBySpec) -> Vec { +fn get_group_by_fields( + bucket_spec: &elastic_dsl_utilities::ast_parsing::GroupBySpec, +) -> Vec { use elastic_dsl_utilities::ast_parsing::GroupBySpec; match bucket_spec { GroupBySpec::Fields(fields) => fields.clone(), diff --git a/asap-planner-rs/src/planner/mod.rs b/asap-planner-rs/src/planner/mod.rs index 2e6fbb19..41a38081 100644 --- a/asap-planner-rs/src/planner/mod.rs +++ b/asap-planner-rs/src/planner/mod.rs @@ -1,11 +1,11 @@ pub mod agg_config; pub mod cleanup; +pub mod elastic_dsl; pub mod labels; pub mod patterns; pub mod promql; pub mod sketch; pub mod sql; pub mod window; -pub mod elastic_dsl; pub use agg_config::*; pub use promql::*; diff --git a/asap-planner-rs/tests/elastic_dsl_integration.rs b/asap-planner-rs/tests/elastic_dsl_integration.rs index a46c8d08..5e02813f 100644 --- a/asap-planner-rs/tests/elastic_dsl_integration.rs +++ b/asap-planner-rs/tests/elastic_dsl_integration.rs @@ -63,7 +63,10 @@ fn assert_index_schema( match inference_config.schema { SchemaConfig::ElasticQueryDSL(schema) => { - assert_eq!(schema.get_time_field(index), Some(&"@timestamp".to_string())); + assert_eq!( + schema.get_time_field(index), + Some(&"@timestamp".to_string()) + ); let metric_columns = schema.get_metric_columns(index).unwrap(); assert!(metric_columns.contains(metric_column)); @@ -77,7 +80,6 @@ fn assert_index_schema( } } - #[test] fn elastic_querydsl_emits_index_schema() { let opts = ElasticRuntimeOptions { @@ -92,7 +94,10 @@ fn elastic_querydsl_emits_index_schema() { match inference_config.schema { SchemaConfig::ElasticQueryDSL(schema) => { - assert_eq!(schema.get_time_field("metrics"), Some(&"@timestamp".to_string())); + assert_eq!( + schema.get_time_field("metrics"), + Some(&"@timestamp".to_string()) + ); let metric_columns = schema.get_metric_columns("metrics").unwrap(); assert!(metric_columns.contains("cpu_usage")); let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); @@ -104,7 +109,7 @@ fn elastic_querydsl_emits_index_schema() { #[test] fn elastic_sum_produces_basic_plan_and_schema() { - let query = r#" + let query = r#" { "aggs": { "by_datacenter": { @@ -136,21 +141,27 @@ fn elastic_sum_produces_basic_plan_and_schema() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", query, 300); - assert_eq!(out.streaming_aggregation_count(), 2); - assert_eq!(out.inference_query_count(), 1); - assert!(out.has_aggregation_type("CountMinSketch")); - assert!(out.has_aggregation_type("DeltaSetAggregator")); - assert!(out.all_tumbling_window_sizes_eq(300)); - assert_eq!(out.aggregation_table_name("CountMinSketch"), Some("metrics".to_string())); - assert_eq!(out.aggregation_value_column("CountMinSketch"), Some("cpu_usage".to_string())); - assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); + assert_eq!(out.streaming_aggregation_count(), 2); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("CountMinSketch")); + assert!(out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("CountMinSketch"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("CountMinSketch"), + Some("cpu_usage".to_string()) + ); + assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); } #[test] fn elastic_avg_produces_three_configs() { - let query = r#" + let query = r#" { "aggs": { "by_datacenter": { @@ -182,22 +193,28 @@ fn elastic_avg_produces_three_configs() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", query, 300); - assert_eq!(out.streaming_aggregation_count(), 2); - assert_eq!(out.inference_query_count(), 1); - assert!(out.has_aggregation_type("MultipleSum")); - assert!(!out.has_aggregation_type("DeltaSetAggregator")); - assert!(out.all_tumbling_window_sizes_eq(300)); - assert_eq!(out.aggregation_table_name("MultipleSum"), Some("metrics".to_string())); - assert_eq!(out.aggregation_value_column("MultipleSum"), Some("cpu_usage".to_string())); + assert_eq!(out.streaming_aggregation_count(), 2); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("MultipleSum")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("MultipleSum"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("MultipleSum"), + Some("cpu_usage".to_string()) + ); - assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); + assert_index_schema(&out, "metrics", "cpu_usage", &["datacenter"]); } #[test] fn elastic_min_produces_exact_plan() { - let query = r#" + let query = r#" { "aggs": { "by_service": { @@ -229,21 +246,27 @@ fn elastic_min_produces_exact_plan() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", query, 300); - assert_eq!(out.streaming_aggregation_count(), 1); - assert_eq!(out.inference_query_count(), 1); - assert!(out.has_aggregation_type("MultipleMinMax")); - assert!(!out.has_aggregation_type("DeltaSetAggregator")); - assert!(out.all_tumbling_window_sizes_eq(300)); - assert_eq!(out.aggregation_table_name("MultipleMinMax"), Some("metrics".to_string())); - assert_eq!(out.aggregation_value_column("MultipleMinMax"), Some("cpu_usage".to_string())); - assert_index_schema(&out, "metrics", "cpu_usage", &["service"]); + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("MultipleMinMax")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("MultipleMinMax"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("MultipleMinMax"), + Some("cpu_usage".to_string()) + ); + assert_index_schema(&out, "metrics", "cpu_usage", &["service"]); } #[test] fn elastic_percentiles_produce_kll_plan() { - let query = r#" + let query = r#" { "aggs": { "by_service": { @@ -276,21 +299,27 @@ fn elastic_percentiles_produce_kll_plan() { } } "#; - let out = elastic_output("metrics", query, 300); + let out = elastic_output("metrics", query, 300); - assert_eq!(out.streaming_aggregation_count(), 1); - assert_eq!(out.inference_query_count(), 1); - assert!(out.has_aggregation_type("DatasketchesKLL")); - assert!(!out.has_aggregation_type("DeltaSetAggregator")); - assert!(out.all_tumbling_window_sizes_eq(300)); - assert_eq!(out.aggregation_table_name("DatasketchesKLL"), Some("metrics".to_string())); - assert_eq!(out.aggregation_value_column("DatasketchesKLL"), Some("latency_ms".to_string())); - assert_index_schema(&out, "metrics", "latency_ms", &["service"]); + assert_eq!(out.streaming_aggregation_count(), 1); + assert_eq!(out.inference_query_count(), 1); + assert!(out.has_aggregation_type("DatasketchesKLL")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!( + out.aggregation_table_name("DatasketchesKLL"), + Some("metrics".to_string()) + ); + assert_eq!( + out.aggregation_value_column("DatasketchesKLL"), + Some("latency_ms".to_string()) + ); + assert_index_schema(&out, "metrics", "latency_ms", &["service"]); } #[test] fn elastic_multi_index_schema_inference() { - let yaml = r#" + let yaml = r#" query_groups: - id: 1 index: metrics @@ -372,44 +401,47 @@ aggregate_cleanup: policy: read_based "#; - let mut file = NamedTempFile::new().unwrap(); - file.write_all(yaml.as_bytes()).unwrap(); + let mut file = NamedTempFile::new().unwrap(); + file.write_all(yaml.as_bytes()).unwrap(); - let opts = ElasticRuntimeOptions { - streaming_engine: StreamingEngine::Arroyo, - data_ingestion_interval: 15, - }; + let opts = ElasticRuntimeOptions { + streaming_engine: StreamingEngine::Arroyo, + data_ingestion_interval: 15, + }; - let c = ElasticController::from_file(Path::new(file.path()), opts).unwrap(); - let out = c.generate().unwrap(); + let c = ElasticController::from_file(Path::new(file.path()), opts).unwrap(); + let out = c.generate().unwrap(); - assert_eq!(out.streaming_aggregation_count(), 4); - assert_eq!(out.inference_query_count(), 2); - assert!(out.has_aggregation_type("MultipleSum")); - assert!(!out.has_aggregation_type("DeltaSetAggregator")); - assert!(out.all_tumbling_window_sizes_eq(300)); + assert_eq!(out.streaming_aggregation_count(), 4); + assert_eq!(out.inference_query_count(), 2); + assert!(out.has_aggregation_type("MultipleSum")); + assert!(!out.has_aggregation_type("DeltaSetAggregator")); + assert!(out.all_tumbling_window_sizes_eq(300)); - let inference_config = out - .to_inference_config(QueryLanguage::elastic_querydsl) - .unwrap(); + let inference_config = out + .to_inference_config(QueryLanguage::elastic_querydsl) + .unwrap(); - match inference_config.schema { - SchemaConfig::ElasticQueryDSL(schema) => { - assert_eq!(schema.get_time_field("metrics"), Some(&"@timestamp".to_string())); - let metric_columns = schema.get_metric_columns("metrics").unwrap(); - assert!(metric_columns.contains("cpu_usage")); - let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); - assert!(metadata_columns.contains("datacenter")); + match inference_config.schema { + SchemaConfig::ElasticQueryDSL(schema) => { + assert_eq!( + schema.get_time_field("metrics"), + Some(&"@timestamp".to_string()) + ); + let metric_columns = schema.get_metric_columns("metrics").unwrap(); + assert!(metric_columns.contains("cpu_usage")); + let metadata_columns = schema.get_metadata_columns("metrics").unwrap(); + assert!(metadata_columns.contains("datacenter")); - assert_eq!( - schema.get_time_field("other_metrics"), - Some(&"@timestamp".to_string()) - ); - let other_metric_columns = schema.get_metric_columns("other_metrics").unwrap(); - assert!(other_metric_columns.contains("memory_usage")); - let other_metadata_columns = schema.get_metadata_columns("other_metrics").unwrap(); - assert!(other_metadata_columns.contains("service")); - } - other => panic!("expected elastic querydsl schema, got {:?}", other), + assert_eq!( + schema.get_time_field("other_metrics"), + Some(&"@timestamp".to_string()) + ); + let other_metric_columns = schema.get_metric_columns("other_metrics").unwrap(); + assert!(other_metric_columns.contains("memory_usage")); + let other_metadata_columns = schema.get_metadata_columns("other_metrics").unwrap(); + assert!(other_metadata_columns.contains("service")); } -} \ No newline at end of file + other => panic!("expected elastic querydsl schema, got {:?}", other), + } +}