From 283dcb1c19d1a48c2e349b9510af8d1893eb619d Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Thu, 23 Apr 2026 16:45:19 -0400 Subject: [PATCH 1/7] Initial updated parsing utilities w/ new AST module (Elastic DSL). --- .../src/ast_parsing/extract_info.rs | 319 ++++++++++++++++++ .../src/ast_parsing/mod.rs | 2 + .../src/ast_parsing/query_info.rs | 67 ++++ .../rs/elastic_dsl_utilities/src/datemath.rs | 206 +++++++++++ .../rs/elastic_dsl_utilities/src/helpers.rs | 15 + .../rs/elastic_dsl_utilities/src/lib.rs | 3 + 6 files changed, 612 insertions(+) create mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs create mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs create mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs create mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs create mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/helpers.rs diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs new file mode 100644 index 00000000..221e727f --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs @@ -0,0 +1,319 @@ +use opensearch_dsl::{self as dsl}; +use serde_json; +use crate::ast_parsing::query_info::{ElasticDSLQuery, Predicate, GroupBySpec, AggregationType, FieldName, TermValue}; +use crate::helpers::{strip_keyword_suffix}; + + +pub fn parse_query_to_ast(query: &str) -> Option { + let search_request = serde_json::from_str(query).ok()?; + search_request +} + +pub fn walk_ast_and_extract_info(ast: &dsl::Search) -> Option { + // Placeholder for walking the AST and extracting relevant information + // This would involve traversing the AST nodes and applying logic to determine query patterns, labels, statistics, etc. + let predicates = match ast.query.clone()? { + dsl::Query::Bool(bool_query) => { + // Extract information from the bool query + walk_bool_query_and_extract_info(&bool_query) + } + _ => { + // Handle other query types + Vec::new() // Return an empty vector of predicates for unsupported query types + } + }; + let (target_field, aggregation_type, group_by_spec) = walk_aggregations_and_extract_info(&ast.aggs)?; + Some(ElasticDSLQuery::new(target_field, predicates, group_by_spec, aggregation_type)) +} + +pub fn walk_bool_query_and_extract_info(bool_query: &dsl::BoolQuery) -> Vec { + // Placeholder for walking the filter context of the AST and extracting relevant information + // This would involve traversing the filter nodes and applying logic to determine label filters, time ranges, etc. + let dsl::QueryCollection(filters) = bool_query.filter.clone(); + let mut predicates = Vec::new(); + for query in filters { + match query { + dsl::Query::Term(term_query) => { + // Extract information from the term query + let field = strip_keyword_suffix(&term_query.field).to_owned(); + let Some(value) = term_query.value else { + continue; // Skip if term query does not have a value + }; + let Some(term_value) = map_term_to_json_value(&value) else { + continue; // Skip if term query value cannot be mapped to a JSON value + }; + // Process the term query information as needed + predicates.push(Predicate::Term { field, value: term_value }); + } + + dsl::Query::Range(range_query) => { + // Extract information from the range query + let field = strip_keyword_suffix(&range_query.field).to_owned(); + let gte = range_query.gte.clone(); + let lte = range_query.lte.clone(); + // Process the range query information as needed + let gte_value = gte.as_ref().and_then(|gte_term| map_term_to_json_value(gte_term)); + let lte_value = lte.as_ref().and_then(|lte_term| map_term_to_json_value(lte_term)); + predicates.push(Predicate::Range { field, gte: gte_value, lte: lte_value }); + } + _ => { + // Handle other query types + todo!() + } + } + } + predicates +} + +pub fn walk_aggregations_and_extract_info(aggregations: &dsl::Aggregations) -> Option<(FieldName, AggregationType, Option)> { + // Traverse the aggregations in the AST and extracting relevant information. Extract the first valid aggregation type found, along with any associated group by specifications. + for (_, agg) in aggregations { + match agg { + dsl::Aggregation::MultiTerms(terms_agg) => { + // Extract information from the terms aggregation + let field_names: Vec = terms_agg.multi_terms.terms.iter().filter_map(|multi_term| multi_term.field.clone()).collect(); + let field_names: Vec = field_names.iter().map(|s| strip_keyword_suffix(s).to_owned()).collect(); + if field_names.is_empty() { + return None; // Return None if no valid field names are found in the multi-terms aggregation. + } + let group_by_spec = Some(GroupBySpec::Fields(field_names)); + let (target_field, aggregation_type) = find_aggregation_info(&terms_agg.aggs.clone())?; + return Some((target_field, aggregation_type, group_by_spec)); + }, + dsl::Aggregation::Terms(terms_agg) => { + // Extract information from the terms aggregation + if let Some(field) = terms_agg.terms.field.clone() { + let field = strip_keyword_suffix(&field).to_owned(); + // Process the terms aggregation information as needed + let group_by_spec = Some(GroupBySpec::Fields(vec![field])); + let (target_field, aggregation_type) = find_aggregation_info(&terms_agg.aggs.clone())?; + return Some((target_field, aggregation_type, group_by_spec)); + } + } + other => { + // Handle other aggregation types + let (target_field, aggregation_type) = extract_aggregation_info(&other)?; + return Some((target_field, aggregation_type, None)); + } + } + } + None // Return None if no relevant aggregation information is found +} + +fn find_aggregation_info(aggregations: &dsl::Aggregations) -> Option<(FieldName, AggregationType)> { + // Placeholder for extracting specific information from an aggregation node + for (_, agg) in aggregations { + let (field, aggregation_type) = extract_aggregation_info(&agg)?; + return Some((field, aggregation_type)); + } + None // Return None if no relevant aggregation information is found +} + +pub fn extract_aggregation_info(agg: &dsl::Aggregation) -> Option<(FieldName, AggregationType)> { + // Extracts the specific aggregation type and target field from the given aggregation node, if it matches supported types (avg, sum, min, max, percentiles). + match agg { + dsl::Aggregation::Avg(avg_agg) => { + let field = strip_keyword_suffix(&avg_agg.avg.field).to_owned(); + let aggregation_type = AggregationType::Avg; + Some((field, aggregation_type)) + }, + dsl::Aggregation::Sum(sum_agg) => { + let field = strip_keyword_suffix(&sum_agg.sum.field.clone()?).to_owned(); + let aggregation_type = AggregationType::Sum; + Some((field, aggregation_type)) + }, + dsl::Aggregation::Min(min_agg) => { + let field = strip_keyword_suffix(&min_agg.min.field.clone()?).to_owned(); + let aggregation_type = AggregationType::Min; + Some((field, aggregation_type)) + }, + dsl::Aggregation::Max(max_agg) => { + let field = strip_keyword_suffix(&max_agg.max.field.clone()?).to_owned(); + let aggregation_type = AggregationType::Max; + Some((field, aggregation_type)) + }, + dsl::Aggregation::Percentiles(percentiles_agg) => { + let field = percentiles_agg.percentiles.field.clone(); + let percents = percentiles_agg.percentiles.percents.clone().unwrap_or_default(); + let aggregation_type = AggregationType::Percentiles(percents); + Some((field, aggregation_type)) + }, + _ => None, // Return None for unsupported aggregation types + } +} + +fn map_term_to_json_value(term: &dsl::Term) -> Option { + // Placeholder for extracting field and value from a term query + match term { + dsl::Term::String(value) => { + let value_str = value.to_string(); // Convert the term value to a string representation + Some(TermValue::String(value_str)) + }, + dsl::Term::Float32(value) => { + Some(TermValue::Float(value.clone() as f64)) + }, + dsl::Term::Float64(value) => { + Some(TermValue::Float(value.clone())) + }, + dsl::Term::PositiveNumber(value) => Some(TermValue::UnsignedInt(value.clone())), + dsl::Term::NegativeNumber(value) => Some(TermValue::Int(value.clone())), + dsl::Term::Boolean(value) => Some(TermValue::Boolean(value.clone())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_query_to_ast_parses_valid_search_request() { + let query = r#" + { + "query": { + "bool": { + "filter": [ + { "term": { "service.keyword": { "value": "frontend" } } } + ] + } + }, + "aggs": { + "avg_latency": { "avg": { "field": "latency_ms" } } + } + } + "#; + + let ast = parse_query_to_ast(query); + assert!(ast.is_some()); + } + + #[test] + fn parse_query_to_ast_returns_none_for_invalid_json() { + let query = r#"{ "query": { "bool": { "filter": [ } }"#; + assert!(parse_query_to_ast(query).is_none()); + } + + #[test] + fn walk_bool_query_and_extract_info_extracts_term_and_range_predicates() { + let bool_query = dsl::Query::bool() + .filter(dsl::Query::term("service.keyword", "frontend")) + .filter(dsl::Query::term("is_canary", true)) + .filter(dsl::Query::range("@timestamp").gte("now-30s").lte("now")); + + let predicates = walk_bool_query_and_extract_info(&bool_query); + assert_eq!(predicates.len(), 3); + assert_eq!( + predicates[0], + Predicate::Term { + field: "service".to_string(), + value: TermValue::String("frontend".to_string()), + } + ); + assert_eq!( + predicates[1], + Predicate::Term { + field: "is_canary".to_string(), + value: TermValue::Boolean(true), + } + ); + assert_eq!( + predicates[2], + Predicate::Range { + field: "@timestamp".to_string(), + gte: Some(TermValue::String("now-30s".to_string())), + lte: Some(TermValue::String("now".to_string())), + } + ); + } + + #[test] + fn walk_aggregations_and_extract_info_extracts_terms_group_by_and_metric() { + let query = r#" + { + "aggs": { + "by_service": { + "terms": { "field": "service.keyword" }, + "aggs": { + "avg_latency": { "avg": { "field": "latency_ms" } } + } + } + } + } + "#; + let ast = parse_query_to_ast(query).expect("query should parse"); + + let (target_field, agg_type, group_by) = + walk_aggregations_and_extract_info(&ast.aggs).expect("aggregation info should parse"); + assert_eq!(target_field, "latency_ms"); + assert_eq!(agg_type, AggregationType::Avg); + assert_eq!( + group_by, + Some(GroupBySpec::Fields(vec!["service".to_string()])) + ); + } + + #[test] + fn walk_aggregations_and_extract_info_extracts_multi_terms_and_percentiles() { + let query = r#" + { + "aggs": { + "by_labels": { + "multi_terms": { + "terms": [ + { "field": "service.keyword" }, + { "field": "env.keyword" } + ] + }, + "aggs": { + "latency_percentiles": { + "percentiles": { + "field": "latency_ms", + "percents": [50.0, 95.0] + } + } + } + } + } + } + "#; + let ast = parse_query_to_ast(query).expect("query should parse"); + + let (target_field, agg_type, group_by) = + walk_aggregations_and_extract_info(&ast.aggs).expect("aggregation info should parse"); + assert_eq!(target_field, "latency_ms"); + assert_eq!( + agg_type, + AggregationType::Percentiles(vec![50.0, 95.0]) + ); + assert_eq!( + group_by, + Some(GroupBySpec::Fields(vec![ + "service".to_string(), + "env".to_string() + ])) + ); + } + + #[test] + fn walk_ast_and_extract_info_builds_elastic_dsl_query() { + let ast = dsl::Search::new() + .query( + dsl::Query::bool() + .filter(dsl::Query::term("service.keyword", "frontend")) + .filter(dsl::Query::range("@timestamp").gte("now-30s").lte("now")), + ) + .aggregate( + "by_service", + dsl::Aggregation::terms("service.keyword") + .aggregate("max_latency", dsl::Aggregation::max("latency_ms")), + ); + let info = walk_ast_and_extract_info(&ast).expect("info should parse"); + + assert_eq!(info.target_field, "latency_ms"); + assert_eq!(info.aggregation, AggregationType::Max); + assert_eq!(info.predicates.len(), 2); + assert_eq!( + info.group_by_buckets, + Some(GroupBySpec::Fields(vec!["service".to_string()])) + ); + } +} \ No newline at end of file diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs new file mode 100644 index 00000000..77c57c6b --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs @@ -0,0 +1,2 @@ +pub mod extract_info; +pub mod query_info; \ No newline at end of file diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs new file mode 100644 index 00000000..8eb60f69 --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs @@ -0,0 +1,67 @@ + +use serde::{Deserialize, Serialize}; + + +pub type FieldName = String; + +pub struct ElasticDSLQuery { + // A distilled representation of an ElasticSearch DSL query, capturing the essential logic and structure. + pub target_field: FieldName, // List of metrics being queried + pub predicates: Vec, // Predicates applied to the query (e.g. filters in bool.filter) + pub group_by_buckets: Option, // Grouping specification if the query includes a group by clause + pub aggregation: AggregationType, // The statistic being computed (e.g. avg, sum, percentiles) +} + +impl ElasticDSLQuery { + // Additional methods for processing or analyzing the query can be added here + + pub fn new( + target_field: FieldName, + predicates: Vec, + group_by_buckets: Option, + aggregation: AggregationType, + ) -> Self { + Self { + target_field, + predicates, + group_by_buckets, + aggregation, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum AggregationType { + Avg, + Sum, + Min, + Max, + Percentiles(Vec), // List of percentiles being computed +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Predicate { + Term { field: FieldName, value: TermValue }, + Range { field: FieldName, gte: Option, lte: Option }, + // Other predicate types can be added here (e.g. exists, wildcard, etc.) +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum TermValue { + String(String), + Float(f64), + Int(i64), + UnsignedInt(u64), + Boolean(bool), + // Other term value types can be added here (e.g. boolean, date, etc.) +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum GroupBySpec { + Fields(Vec), + Filters(Vec), // Grouping by filters (e.g. group by whether a field matches a certain value) +} \ No newline at end of file diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs new file mode 100644 index 00000000..075526f9 --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs @@ -0,0 +1,206 @@ +use serde::{Deserialize, Serialize}; +use crate::ast_parsing::query_info::{FieldName, TermValue, Predicate}; + + +/// Time range bounds resolved into epoch milliseconds. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ResolvedTimeRange { + pub field: FieldName, + pub gte_ms: Option, + pub lte_ms: Option, +} + +/// An optional time range applied to a timestamp field. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TimeRange { + pub field: FieldName, + pub gte: Option, + pub lte: Option, +} + +impl TimeRange { + /// Parse a date-math expression into epoch milliseconds using the provided + /// `now_ms` as reference for `now`-relative expressions. + /// + /// Supported forms: + /// - `now` + /// - `now-30s`, `now+5m`, `now-1h`, `now-2d`, `now-1w`, `now-500ms` + /// - RFC3339 timestamps (e.g. `2026-03-22T12:34:56Z`) + /// - Plain integer timestamps (returned as-is) + pub fn parse_date_math(expr: &str, now_ms: i64) -> Option { + if expr == "now" { + return Some(now_ms); + } + + if let Some(delta) = Self::parse_now_delta_ms(expr) { + return now_ms.checked_add(delta); + } + + if let Ok(v) = expr.parse::() { + return Some(v); + } + + chrono::DateTime::parse_from_rfc3339(expr) + .ok() + .map(|dt| dt.timestamp_millis()) + } + + /// Resolve `gte`/`lte` date-math strings into numeric epoch-millisecond + /// values relative to `now_ms`. + pub fn resolve_epoch_millis(&self, now_ms: i64) -> Option { + let gte_ms = match &self.gte { + Some(v) => Some(Self::parse_date_math(v, now_ms)?), + None => None, + }; + let lte_ms = match &self.lte { + Some(v) => Some(Self::parse_date_math(v, now_ms)?), + None => None, + }; + + Some(ResolvedTimeRange { + field: self.field.clone(), + gte_ms, + lte_ms, + }) + } + + fn parse_now_delta_ms(expr: &str) -> Option { + let rest = expr.strip_prefix("now")?; + if rest.is_empty() { + return Some(0); + } + + let sign_char = rest.chars().next()?; + let sign = match sign_char { + '+' => 1_i64, + '-' => -1_i64, + _ => return None, + }; + + let offset = &rest[1..]; + if offset.is_empty() { + return None; + } + + let digit_count = offset.chars().take_while(|c| c.is_ascii_digit()).count(); + if digit_count == 0 || digit_count == offset.len() { + return None; + } + + let qty = offset[..digit_count].parse::().ok()?; + let unit = &offset[digit_count..]; + let unit_ms = match unit { + "ms" => 1_i64, + "s" => 1_000_i64, + "m" => 60_000_i64, + "h" => 3_600_000_i64, + "d" => 86_400_000_i64, + "w" => 604_800_000_i64, + _ => return None, + }; + + qty.checked_mul(unit_ms)?.checked_mul(sign) + } +} + +pub fn range_query_to_time_range(predicate: &Predicate, now_ms: i64) -> Option { + match predicate { + Predicate::Range { field, gte, lte } => { + let tr = TimeRange { + field: field.clone(), + gte: gte.as_ref().and_then(|v| match v { + TermValue::String(s) => Some(s.clone()), + _ => None, + }), + lte: lte.as_ref().and_then(|v| match v { + TermValue::String(s) => Some(s.clone()), + _ => None, + }), + }; + let resolved =tr.resolve_epoch_millis(now_ms)?; + Some(resolved) + }, + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_date_math_supports_now_relative_and_numeric() { + let now_ms = 1_700_000_000_000_i64; + assert_eq!(TimeRange::parse_date_math("now", now_ms), Some(now_ms)); + assert_eq!( + TimeRange::parse_date_math("now-30s", now_ms), + Some(now_ms - 30_000) + ); + assert_eq!( + TimeRange::parse_date_math("now+5m", now_ms), + Some(now_ms + 300_000) + ); + assert_eq!( + TimeRange::parse_date_math("1700000000123", now_ms), + Some(1_700_000_000_123) + ); + } + + #[test] + fn parse_date_math_supports_rfc3339() { + let now_ms = 0; + let value = TimeRange::parse_date_math("2026-03-22T12:34:56Z", now_ms) + .expect("RFC3339 timestamp should parse"); + assert_eq!(value, 1_774_182_896_000); + } + + #[test] + fn parse_date_math_rejects_invalid_expressions() { + let now_ms = 1_700_000_000_000_i64; + assert_eq!(TimeRange::parse_date_math("now+", now_ms), None); + assert_eq!(TimeRange::parse_date_math("now-10", now_ms), None); + assert_eq!(TimeRange::parse_date_math("yesterday", now_ms), None); + } + + #[test] + fn resolve_epoch_millis_resolves_both_bounds() { + let range = TimeRange { + field: "@timestamp".to_string(), + gte: Some("now-1m".to_string()), + lte: Some("now".to_string()), + }; + let now_ms = 2_000_000_i64; + + let resolved = range + .resolve_epoch_millis(now_ms) + .expect("range should resolve"); + assert_eq!(resolved.field, "@timestamp"); + assert_eq!(resolved.gte_ms, Some(1_940_000)); + assert_eq!(resolved.lte_ms, Some(2_000_000)); + } + + #[test] + fn range_query_to_time_range_converts_range_predicate() { + let predicate = Predicate::Range { + field: "@timestamp".to_string(), + gte: Some(TermValue::String("now-30s".to_string())), + lte: Some(TermValue::String("now".to_string())), + }; + let now_ms = 1_000_000_i64; + + let resolved = range_query_to_time_range(&predicate, now_ms) + .expect("range predicate should convert"); + assert_eq!(resolved.field, "@timestamp"); + assert_eq!(resolved.gte_ms, Some(970_000)); + assert_eq!(resolved.lte_ms, Some(1_000_000)); + } + + #[test] + fn range_query_to_time_range_returns_none_for_non_range_predicates() { + let predicate = Predicate::Term { + field: "service".to_string(), + value: TermValue::String("frontend".to_string()), + }; + assert!(range_query_to_time_range(&predicate, 100).is_none()); + } +} \ No newline at end of file diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/helpers.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/helpers.rs new file mode 100644 index 00000000..cdd5aeac --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/helpers.rs @@ -0,0 +1,15 @@ +/// Strip the `.keyword` suffix from a field name, if present. +pub fn strip_keyword_suffix(field: &str) -> &str { + field.strip_suffix(".keyword").unwrap_or(field) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn strip_keyword_suffix_removes_only_trailing_keyword() { + assert_eq!(strip_keyword_suffix("service.keyword"), "service"); + assert_eq!(strip_keyword_suffix("env"), "env"); + } +} \ No newline at end of file diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs index c03929fb..aff9be4c 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -1,6 +1,9 @@ pub mod parsing; pub mod pattern; pub mod types; +pub mod ast_parsing; +pub mod datemath; +pub mod helpers; pub use parsing::*; pub use pattern::{classify, parse_and_classify}; From 2accb8cc61a30ed90b1061cf3ef4d824989eb0bc Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Thu, 23 Apr 2026 16:45:49 -0400 Subject: [PATCH 2/7] Add modded opensearch-dsl crate Cargo.toml. --- Cargo.lock | 11 +++++++++++ .../dependencies/rs/elastic_dsl_utilities/Cargo.toml | 1 + 2 files changed, 12 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 78b6cfd2..6344e25e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1679,6 +1679,7 @@ name = "elastic_dsl_utilities" version = "0.3.0" dependencies = [ "chrono", + "opensearch-dsl", "serde", "serde_json", ] @@ -3028,6 +3029,16 @@ version = "11.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" +[[package]] +name = "opensearch-dsl" +version = "0.3.1" +dependencies = [ + "chrono", + "num-traits", + "serde", + "serde_json", +] + [[package]] name = "openssl" version = "0.10.76" diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml b/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml index 3726cabb..92e1278f 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml @@ -7,3 +7,4 @@ version.workspace = true serde.workspace = true serde_json.workspace = true chrono.workspace = true +opensearch-dsl = { version = "0.3.1", path = "../../../../../opensearch-client-rs/opensearch-dsl" } From 86f968accc1bb79624a562e891d4bde632016031 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Thu, 23 Apr 2026 16:47:53 -0400 Subject: [PATCH 3/7] Cargo format. --- .../src/ast_parsing/extract_info.rs | 285 ++++++++++-------- .../src/ast_parsing/mod.rs | 2 +- .../src/ast_parsing/query_info.rs | 19 +- .../rs/elastic_dsl_utilities/src/datemath.rs | 13 +- .../rs/elastic_dsl_utilities/src/helpers.rs | 2 +- .../rs/elastic_dsl_utilities/src/lib.rs | 6 +- 6 files changed, 179 insertions(+), 148 deletions(-) diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs index 221e727f..e140af97 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs @@ -1,8 +1,9 @@ +use crate::ast_parsing::query_info::{ + AggregationType, ElasticDSLQuery, FieldName, GroupBySpec, Predicate, TermValue, +}; +use crate::helpers::strip_keyword_suffix; use opensearch_dsl::{self as dsl}; use serde_json; -use crate::ast_parsing::query_info::{ElasticDSLQuery, Predicate, GroupBySpec, AggregationType, FieldName, TermValue}; -use crate::helpers::{strip_keyword_suffix}; - pub fn parse_query_to_ast(query: &str) -> Option { let search_request = serde_json::from_str(query).ok()?; @@ -22,8 +23,14 @@ pub fn walk_ast_and_extract_info(ast: &dsl::Search) -> Option { Vec::new() // Return an empty vector of predicates for unsupported query types } }; - let (target_field, aggregation_type, group_by_spec) = walk_aggregations_and_extract_info(&ast.aggs)?; - Some(ElasticDSLQuery::new(target_field, predicates, group_by_spec, aggregation_type)) + let (target_field, aggregation_type, group_by_spec) = + walk_aggregations_and_extract_info(&ast.aggs)?; + Some(ElasticDSLQuery::new( + target_field, + predicates, + group_by_spec, + aggregation_type, + )) } pub fn walk_bool_query_and_extract_info(bool_query: &dsl::BoolQuery) -> Vec { @@ -43,7 +50,10 @@ pub fn walk_bool_query_and_extract_info(bool_query: &dsl::BoolQuery) -> Vec { @@ -52,41 +62,61 @@ pub fn walk_bool_query_and_extract_info(bool_query: &dsl::BoolQuery) -> Vec { // Handle other query types - todo!() + continue; // Skip unsupported query types } } } predicates } -pub fn walk_aggregations_and_extract_info(aggregations: &dsl::Aggregations) -> Option<(FieldName, AggregationType, Option)> { +pub fn walk_aggregations_and_extract_info( + aggregations: &dsl::Aggregations, +) -> Option<(FieldName, AggregationType, Option)> { // Traverse the aggregations in the AST and extracting relevant information. Extract the first valid aggregation type found, along with any associated group by specifications. for (_, agg) in aggregations { match agg { dsl::Aggregation::MultiTerms(terms_agg) => { // Extract information from the terms aggregation - let field_names: Vec = terms_agg.multi_terms.terms.iter().filter_map(|multi_term| multi_term.field.clone()).collect(); - let field_names: Vec = field_names.iter().map(|s| strip_keyword_suffix(s).to_owned()).collect(); + let field_names: Vec = terms_agg + .multi_terms + .terms + .iter() + .filter_map(|multi_term| multi_term.field.clone()) + .collect(); + let field_names: Vec = field_names + .iter() + .map(|s| strip_keyword_suffix(s).to_owned()) + .collect(); if field_names.is_empty() { return None; // Return None if no valid field names are found in the multi-terms aggregation. } let group_by_spec = Some(GroupBySpec::Fields(field_names)); - let (target_field, aggregation_type) = find_aggregation_info(&terms_agg.aggs.clone())?; + let (target_field, aggregation_type) = + find_aggregation_info(&terms_agg.aggs.clone())?; return Some((target_field, aggregation_type, group_by_spec)); - }, + } dsl::Aggregation::Terms(terms_agg) => { // Extract information from the terms aggregation - if let Some(field) = terms_agg.terms.field.clone() { + if let Some(field) = terms_agg.terms.field.clone() { let field = strip_keyword_suffix(&field).to_owned(); // Process the terms aggregation information as needed let group_by_spec = Some(GroupBySpec::Fields(vec![field])); - let (target_field, aggregation_type) = find_aggregation_info(&terms_agg.aggs.clone())?; + let (target_field, aggregation_type) = + find_aggregation_info(&terms_agg.aggs.clone())?; return Some((target_field, aggregation_type, group_by_spec)); } } @@ -116,28 +146,32 @@ pub fn extract_aggregation_info(agg: &dsl::Aggregation) -> Option<(FieldName, Ag let field = strip_keyword_suffix(&avg_agg.avg.field).to_owned(); let aggregation_type = AggregationType::Avg; Some((field, aggregation_type)) - }, + } dsl::Aggregation::Sum(sum_agg) => { let field = strip_keyword_suffix(&sum_agg.sum.field.clone()?).to_owned(); let aggregation_type = AggregationType::Sum; Some((field, aggregation_type)) - }, + } dsl::Aggregation::Min(min_agg) => { let field = strip_keyword_suffix(&min_agg.min.field.clone()?).to_owned(); let aggregation_type = AggregationType::Min; Some((field, aggregation_type)) - }, + } dsl::Aggregation::Max(max_agg) => { let field = strip_keyword_suffix(&max_agg.max.field.clone()?).to_owned(); let aggregation_type = AggregationType::Max; Some((field, aggregation_type)) - }, + } dsl::Aggregation::Percentiles(percentiles_agg) => { let field = percentiles_agg.percentiles.field.clone(); - let percents = percentiles_agg.percentiles.percents.clone().unwrap_or_default(); + let percents = percentiles_agg + .percentiles + .percents + .clone() + .unwrap_or_default(); let aggregation_type = AggregationType::Percentiles(percents); Some((field, aggregation_type)) - }, + } _ => None, // Return None for unsupported aggregation types } } @@ -148,26 +182,22 @@ fn map_term_to_json_value(term: &dsl::Term) -> Option { dsl::Term::String(value) => { let value_str = value.to_string(); // Convert the term value to a string representation Some(TermValue::String(value_str)) - }, - dsl::Term::Float32(value) => { - Some(TermValue::Float(value.clone() as f64)) - }, - dsl::Term::Float64(value) => { - Some(TermValue::Float(value.clone())) - }, + } + dsl::Term::Float32(value) => Some(TermValue::Float(value.clone() as f64)), + dsl::Term::Float64(value) => Some(TermValue::Float(value.clone())), dsl::Term::PositiveNumber(value) => Some(TermValue::UnsignedInt(value.clone())), dsl::Term::NegativeNumber(value) => Some(TermValue::Int(value.clone())), - dsl::Term::Boolean(value) => Some(TermValue::Boolean(value.clone())) + dsl::Term::Boolean(value) => Some(TermValue::Boolean(value.clone())), } } #[cfg(test)] mod tests { - use super::*; + use super::*; - #[test] - fn parse_query_to_ast_parses_valid_search_request() { - let query = r#" + #[test] + fn parse_query_to_ast_parses_valid_search_request() { + let query = r#" { "query": { "bool": { @@ -182,52 +212,52 @@ mod tests { } "#; - let ast = parse_query_to_ast(query); - assert!(ast.is_some()); - } + let ast = parse_query_to_ast(query); + assert!(ast.is_some()); + } - #[test] - fn parse_query_to_ast_returns_none_for_invalid_json() { - let query = r#"{ "query": { "bool": { "filter": [ } }"#; - assert!(parse_query_to_ast(query).is_none()); - } + #[test] + fn parse_query_to_ast_returns_none_for_invalid_json() { + let query = r#"{ "query": { "bool": { "filter": [ } }"#; + assert!(parse_query_to_ast(query).is_none()); + } - #[test] - fn walk_bool_query_and_extract_info_extracts_term_and_range_predicates() { - let bool_query = dsl::Query::bool() - .filter(dsl::Query::term("service.keyword", "frontend")) - .filter(dsl::Query::term("is_canary", true)) - .filter(dsl::Query::range("@timestamp").gte("now-30s").lte("now")); + #[test] + fn walk_bool_query_and_extract_info_extracts_term_and_range_predicates() { + let bool_query = dsl::Query::bool() + .filter(dsl::Query::term("service.keyword", "frontend")) + .filter(dsl::Query::term("is_canary", true)) + .filter(dsl::Query::range("@timestamp").gte("now-30s").lte("now")); - let predicates = walk_bool_query_and_extract_info(&bool_query); - assert_eq!(predicates.len(), 3); - assert_eq!( - predicates[0], - Predicate::Term { - field: "service".to_string(), - value: TermValue::String("frontend".to_string()), - } - ); - assert_eq!( - predicates[1], - Predicate::Term { - field: "is_canary".to_string(), - value: TermValue::Boolean(true), - } - ); - assert_eq!( - predicates[2], - Predicate::Range { - field: "@timestamp".to_string(), - gte: Some(TermValue::String("now-30s".to_string())), - lte: Some(TermValue::String("now".to_string())), - } - ); - } + let predicates = walk_bool_query_and_extract_info(&bool_query); + assert_eq!(predicates.len(), 3); + assert_eq!( + predicates[0], + Predicate::Term { + field: "service".to_string(), + value: TermValue::String("frontend".to_string()), + } + ); + assert_eq!( + predicates[1], + Predicate::Term { + field: "is_canary".to_string(), + value: TermValue::Boolean(true), + } + ); + assert_eq!( + predicates[2], + Predicate::Range { + field: "@timestamp".to_string(), + gte: Some(TermValue::String("now-30s".to_string())), + lte: Some(TermValue::String("now".to_string())), + } + ); + } - #[test] - fn walk_aggregations_and_extract_info_extracts_terms_group_by_and_metric() { - let query = r#" + #[test] + fn walk_aggregations_and_extract_info_extracts_terms_group_by_and_metric() { + let query = r#" { "aggs": { "by_service": { @@ -239,21 +269,21 @@ mod tests { } } "#; - let ast = parse_query_to_ast(query).expect("query should parse"); + let ast = parse_query_to_ast(query).expect("query should parse"); - let (target_field, agg_type, group_by) = - walk_aggregations_and_extract_info(&ast.aggs).expect("aggregation info should parse"); - assert_eq!(target_field, "latency_ms"); - assert_eq!(agg_type, AggregationType::Avg); - assert_eq!( - group_by, - Some(GroupBySpec::Fields(vec!["service".to_string()])) - ); - } + let (target_field, agg_type, group_by) = + walk_aggregations_and_extract_info(&ast.aggs).expect("aggregation info should parse"); + assert_eq!(target_field, "latency_ms"); + assert_eq!(agg_type, AggregationType::Avg); + assert_eq!( + group_by, + Some(GroupBySpec::Fields(vec!["service".to_string()])) + ); + } - #[test] - fn walk_aggregations_and_extract_info_extracts_multi_terms_and_percentiles() { - let query = r#" + #[test] + fn walk_aggregations_and_extract_info_extracts_multi_terms_and_percentiles() { + let query = r#" { "aggs": { "by_labels": { @@ -275,45 +305,42 @@ mod tests { } } "#; - let ast = parse_query_to_ast(query).expect("query should parse"); + let ast = parse_query_to_ast(query).expect("query should parse"); - let (target_field, agg_type, group_by) = - walk_aggregations_and_extract_info(&ast.aggs).expect("aggregation info should parse"); - assert_eq!(target_field, "latency_ms"); - assert_eq!( - agg_type, - AggregationType::Percentiles(vec![50.0, 95.0]) - ); - assert_eq!( - group_by, - Some(GroupBySpec::Fields(vec![ - "service".to_string(), - "env".to_string() - ])) - ); - } + let (target_field, agg_type, group_by) = + walk_aggregations_and_extract_info(&ast.aggs).expect("aggregation info should parse"); + assert_eq!(target_field, "latency_ms"); + assert_eq!(agg_type, AggregationType::Percentiles(vec![50.0, 95.0])); + assert_eq!( + group_by, + Some(GroupBySpec::Fields(vec![ + "service".to_string(), + "env".to_string() + ])) + ); + } - #[test] - fn walk_ast_and_extract_info_builds_elastic_dsl_query() { - let ast = dsl::Search::new() - .query( - dsl::Query::bool() - .filter(dsl::Query::term("service.keyword", "frontend")) - .filter(dsl::Query::range("@timestamp").gte("now-30s").lte("now")), - ) - .aggregate( - "by_service", - dsl::Aggregation::terms("service.keyword") - .aggregate("max_latency", dsl::Aggregation::max("latency_ms")), - ); - let info = walk_ast_and_extract_info(&ast).expect("info should parse"); + #[test] + fn walk_ast_and_extract_info_builds_elastic_dsl_query() { + let ast = dsl::Search::new() + .query( + dsl::Query::bool() + .filter(dsl::Query::term("service.keyword", "frontend")) + .filter(dsl::Query::range("@timestamp").gte("now-30s").lte("now")), + ) + .aggregate( + "by_service", + dsl::Aggregation::terms("service.keyword") + .aggregate("max_latency", dsl::Aggregation::max("latency_ms")), + ); + let info = walk_ast_and_extract_info(&ast).expect("info should parse"); - assert_eq!(info.target_field, "latency_ms"); - assert_eq!(info.aggregation, AggregationType::Max); - assert_eq!(info.predicates.len(), 2); - assert_eq!( - info.group_by_buckets, - Some(GroupBySpec::Fields(vec!["service".to_string()])) - ); - } -} \ No newline at end of file + assert_eq!(info.target_field, "latency_ms"); + assert_eq!(info.aggregation, AggregationType::Max); + assert_eq!(info.predicates.len(), 2); + assert_eq!( + info.group_by_buckets, + Some(GroupBySpec::Fields(vec!["service".to_string()])) + ); + } +} diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs index 77c57c6b..ad07eed3 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs @@ -1,2 +1,2 @@ pub mod extract_info; -pub mod query_info; \ No newline at end of file +pub mod query_info; diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs index 8eb60f69..63b5cb51 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs @@ -1,12 +1,10 @@ - use serde::{Deserialize, Serialize}; - pub type FieldName = String; pub struct ElasticDSLQuery { // A distilled representation of an ElasticSearch DSL query, capturing the essential logic and structure. - pub target_field: FieldName, // List of metrics being queried + pub target_field: FieldName, // List of metrics being queried pub predicates: Vec, // Predicates applied to the query (e.g. filters in bool.filter) pub group_by_buckets: Option, // Grouping specification if the query includes a group by clause pub aggregation: AggregationType, // The statistic being computed (e.g. avg, sum, percentiles) @@ -14,7 +12,7 @@ pub struct ElasticDSLQuery { impl ElasticDSLQuery { // Additional methods for processing or analyzing the query can be added here - + pub fn new( target_field: FieldName, predicates: Vec, @@ -43,8 +41,15 @@ pub enum AggregationType { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum Predicate { - Term { field: FieldName, value: TermValue }, - Range { field: FieldName, gte: Option, lte: Option }, + Term { + field: FieldName, + value: TermValue, + }, + Range { + field: FieldName, + gte: Option, + lte: Option, + }, // Other predicate types can be added here (e.g. exists, wildcard, etc.) } @@ -64,4 +69,4 @@ pub enum TermValue { pub enum GroupBySpec { Fields(Vec), Filters(Vec), // Grouping by filters (e.g. group by whether a field matches a certain value) -} \ No newline at end of file +} diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs index 075526f9..64f2cf87 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs @@ -1,6 +1,5 @@ +use crate::ast_parsing::query_info::{FieldName, Predicate, TermValue}; use serde::{Deserialize, Serialize}; -use crate::ast_parsing::query_info::{FieldName, TermValue, Predicate}; - /// Time range bounds resolved into epoch milliseconds. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -117,9 +116,9 @@ pub fn range_query_to_time_range(predicate: &Predicate, now_ms: i64) -> Option None, }), }; - let resolved =tr.resolve_epoch_millis(now_ms)?; + let resolved = tr.resolve_epoch_millis(now_ms)?; Some(resolved) - }, + } _ => None, } } @@ -188,8 +187,8 @@ mod tests { }; let now_ms = 1_000_000_i64; - let resolved = range_query_to_time_range(&predicate, now_ms) - .expect("range predicate should convert"); + let resolved = + range_query_to_time_range(&predicate, now_ms).expect("range predicate should convert"); assert_eq!(resolved.field, "@timestamp"); assert_eq!(resolved.gte_ms, Some(970_000)); assert_eq!(resolved.lte_ms, Some(1_000_000)); @@ -203,4 +202,4 @@ mod tests { }; assert!(range_query_to_time_range(&predicate, 100).is_none()); } -} \ No newline at end of file +} diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/helpers.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/helpers.rs index cdd5aeac..c2cf7a2c 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/helpers.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/helpers.rs @@ -12,4 +12,4 @@ mod tests { assert_eq!(strip_keyword_suffix("service.keyword"), "service"); assert_eq!(strip_keyword_suffix("env"), "env"); } -} \ No newline at end of file +} diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs index aff9be4c..1e504b4e 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -1,9 +1,9 @@ -pub mod parsing; -pub mod pattern; -pub mod types; pub mod ast_parsing; pub mod datemath; pub mod helpers; +pub mod parsing; +pub mod pattern; +pub mod types; pub use parsing::*; pub use pattern::{classify, parse_and_classify}; From 8059342494ea15403e290feec047a6cdd6ed1140 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Thu, 23 Apr 2026 18:31:15 -0400 Subject: [PATCH 4/7] Export updated DSL utilities and remove old code. Rename ElasticDSLQuery -> ElasticDSLQueryInfo to reflect its role as query metadata carrier. --- .../src/ast_parsing/extract_info.rs | 20 +- .../src/ast_parsing/mod.rs | 3 + .../src/ast_parsing/query_info.rs | 5 +- .../rs/elastic_dsl_utilities/src/lib.rs | 9 +- .../rs/elastic_dsl_utilities/src/parsing.rs | 439 ------------------ .../rs/elastic_dsl_utilities/src/pattern.rs | 403 ---------------- .../rs/elastic_dsl_utilities/src/types.rs | 286 ------------ 7 files changed, 22 insertions(+), 1143 deletions(-) delete mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs delete mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/pattern.rs delete mode 100644 asap-common/dependencies/rs/elastic_dsl_utilities/src/types.rs diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs index e140af97..59835d35 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs @@ -1,17 +1,23 @@ use crate::ast_parsing::query_info::{ - AggregationType, ElasticDSLQuery, FieldName, GroupBySpec, Predicate, TermValue, + AggregationType, ElasticDSLQueryInfo, FieldName, GroupBySpec, Predicate, TermValue, }; use crate::helpers::strip_keyword_suffix; use opensearch_dsl::{self as dsl}; use serde_json; +pub fn extract_query_info(query: &str) -> Option { + // Main entry point for extracting relevant information from the parsed query pattern. This function would contain the core logic for traversing the AST and applying rules to determine the target field, predicates, group by specifications, and aggregation type. + let search_request = parse_query_to_ast(query)?; + walk_ast_and_extract_info(&search_request) + } + pub fn parse_query_to_ast(query: &str) -> Option { let search_request = serde_json::from_str(query).ok()?; search_request } -pub fn walk_ast_and_extract_info(ast: &dsl::Search) -> Option { - // Placeholder for walking the AST and extracting relevant information +pub fn walk_ast_and_extract_info(ast: &dsl::Search) -> Option { + // Traverses the AST and extracts relevant information for answering sketchable aggregations within ASAPQuery. // This would involve traversing the AST nodes and applying logic to determine query patterns, labels, statistics, etc. let predicates = match ast.query.clone()? { dsl::Query::Bool(bool_query) => { @@ -25,7 +31,7 @@ pub fn walk_ast_and_extract_info(ast: &dsl::Search) -> Option { }; let (target_field, aggregation_type, group_by_spec) = walk_aggregations_and_extract_info(&ast.aggs)?; - Some(ElasticDSLQuery::new( + Some(ElasticDSLQueryInfo::new( target_field, predicates, group_by_spec, @@ -33,7 +39,7 @@ pub fn walk_ast_and_extract_info(ast: &dsl::Search) -> Option { )) } -pub fn walk_bool_query_and_extract_info(bool_query: &dsl::BoolQuery) -> Vec { +fn walk_bool_query_and_extract_info(bool_query: &dsl::BoolQuery) -> Vec { // Placeholder for walking the filter context of the AST and extracting relevant information // This would involve traversing the filter nodes and applying logic to determine label filters, time ranges, etc. let dsl::QueryCollection(filters) = bool_query.filter.clone(); @@ -83,7 +89,7 @@ pub fn walk_bool_query_and_extract_info(bool_query: &dsl::BoolQuery) -> Vec Option<(FieldName, AggregationType, Option)> { // Traverse the aggregations in the AST and extracting relevant information. Extract the first valid aggregation type found, along with any associated group by specifications. @@ -139,7 +145,7 @@ fn find_aggregation_info(aggregations: &dsl::Aggregations) -> Option<(FieldName, None // Return None if no relevant aggregation information is found } -pub fn extract_aggregation_info(agg: &dsl::Aggregation) -> Option<(FieldName, AggregationType)> { +fn extract_aggregation_info(agg: &dsl::Aggregation) -> Option<(FieldName, AggregationType)> { // Extracts the specific aggregation type and target field from the given aggregation node, if it matches supported types (avg, sum, min, max, percentiles). match agg { dsl::Aggregation::Avg(avg_agg) => { diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs index ad07eed3..a50b887d 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs @@ -1,2 +1,5 @@ pub mod extract_info; pub mod query_info; + +pub use extract_info::*; +pub use query_info::*; diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs index 63b5cb51..ce82d705 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs @@ -2,7 +2,8 @@ use serde::{Deserialize, Serialize}; pub type FieldName = String; -pub struct ElasticDSLQuery { +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ElasticDSLQueryInfo { // A distilled representation of an ElasticSearch DSL query, capturing the essential logic and structure. pub target_field: FieldName, // List of metrics being queried pub predicates: Vec, // Predicates applied to the query (e.g. filters in bool.filter) @@ -10,7 +11,7 @@ pub struct ElasticDSLQuery { pub aggregation: AggregationType, // The statistic being computed (e.g. avg, sum, percentiles) } -impl ElasticDSLQuery { +impl ElasticDSLQueryInfo { // Additional methods for processing or analyzing the query can be added here pub fn new( diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs index 1e504b4e..4a86ba8b 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -1,10 +1,7 @@ pub mod ast_parsing; pub mod datemath; pub mod helpers; -pub mod parsing; -pub mod pattern; -pub mod types; -pub use parsing::*; -pub use pattern::{classify, parse_and_classify}; -pub use types::*; +pub use ast_parsing::*; +pub use datemath::*; +pub use helpers::*; diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs deleted file mode 100644 index 3963c1f7..00000000 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs +++ /dev/null @@ -1,439 +0,0 @@ -use serde_json::Value; - -use crate::types::{GroupBySpec, LabelFilter, MetricAggType, MetricAggregation, TimeRange}; - -// --------------------------------------------------------------------------- -// Metric aggregation helpers -// --------------------------------------------------------------------------- - -/// Try to extract a list of metric aggregations from the top-level `"aggs"` -/// object of a query. Returns `None` if *any* aggregation entry is not one of -/// the recognised metric types (avg / min / max / sum / percentiles). -pub fn extract_metric_aggs(aggs: &Value) -> Option> { - let obj = aggs.as_object()?; - if obj.is_empty() { - return None; - } - - let mut result = Vec::with_capacity(obj.len()); - for (result_name, agg_body) in obj { - // Each aggregation body is an object that should contain exactly one - // recognised metric aggregation key. - let body_obj = agg_body.as_object()?; - let mut found = None; - for (key, inner) in body_obj { - if let Some(agg_type) = MetricAggType::from_json_str(key) { - let field = inner.get("field")?.as_str()?.to_owned(); - let kwargs_map = inner - .as_object()? - .iter() - .filter(|(k, _)| *k != "field") - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - let kwargs = serde_json::Value::Object(kwargs_map); - found = Some(MetricAggregation { - result_name: result_name.clone(), - agg_type, - field, - params: if kwargs.as_object().is_some_and(|o| o.is_empty()) { - None - } else { - Some(kwargs) - }, - }); - break; - } - } - result.push(found?); - } - Some(result) -} - -// --------------------------------------------------------------------------- -// Time range helpers -// --------------------------------------------------------------------------- - -/// Try to extract a `TimeRange` from a bare `{"range": {"": {...}}}` -/// query value. Accepts either string or numeric values for gte/lte. -pub fn extract_time_range(query: &Value) -> Option { - let range_obj = query.get("range")?.as_object()?; - // There should be exactly one field entry in the range object. - if range_obj.len() != 1 { - return None; - } - let (field, bounds) = range_obj.iter().next()?; - let gte = bounds.get("gte").and_then(value_to_string); - let lte = bounds.get("lte").and_then(value_to_string); - Some(TimeRange { - field: field.clone(), - gte, - lte, - }) -} - -fn value_to_string(v: &Value) -> Option { - match v { - Value::String(s) => Some(s.clone()), - Value::Number(n) => Some(n.to_string()), - _ => None, - } -} - -// --------------------------------------------------------------------------- -// Term / label-filter helpers -// --------------------------------------------------------------------------- - -/// Strip the `.keyword` suffix from a field name, if present. -fn strip_keyword_suffix(field: &str) -> &str { - field.strip_suffix(".keyword").unwrap_or(field) -} - -/// Try to extract a `LabelFilter` from a single `"term"` query object. -/// -/// Handles both the opensearch-dsl long form: -/// ```json -/// { "term": { "field": { "value": "val" } } } -/// ``` -/// and the ES shorthand: -/// ```json -/// { "term": { "field": "val" } } -/// ``` -pub fn extract_label_filter_from_term(term_query: &Value) -> Option { - let term_obj = term_query.get("term")?.as_object()?; - if term_obj.len() != 1 { - return None; - } - let (raw_field, field_value) = term_obj.iter().next()?; - let field = strip_keyword_suffix(raw_field).to_owned(); - let value = if let Some(s) = field_value.as_str() { - // Shorthand: "field": "value" - s.to_owned() - } else if let Some(inner) = field_value.as_object() { - // Long form: "field": { "value": "..." } - inner.get("value")?.as_str()?.to_owned() - } else { - return None; - }; - Some(LabelFilter { field, value }) -} - -// --------------------------------------------------------------------------- -// Bool filter helpers -// --------------------------------------------------------------------------- - -/// Try to extract a list of label filters (and optionally a time range) from a -/// `{"bool": {"filter": [...]}}` query structure. -/// -/// The `filter` array must contain at least a term query, and may also contain -/// a range query. Additional (unrecognised) entries in the array cause this -/// function to return `None`. -pub fn extract_label_filters(query: &Value) -> Option<(Vec, Option)> { - let filter_clauses = query.get("bool")?.get("filter")?; - - // The filter value may be an array (multiple clauses) or a single object. - let clauses: Vec<&Value> = if let Some(arr) = filter_clauses.as_array() { - arr.iter().collect() - } else if filter_clauses.is_object() { - vec![filter_clauses] - } else { - return None; - }; - - let mut label_filters: Vec = Vec::new(); - let mut time_range: Option = None; - - for clause in clauses { - if clause.get("term").is_some() { - label_filters.push(extract_label_filter_from_term(clause)?); - } else if clause.get("range").is_some() { - if time_range.is_some() { - return None; - } - time_range = Some(extract_time_range(clause)?); - } else { - // Unknown clause type in the filter. - return None; - } - } - - Some((label_filters, time_range)) -} - -// --------------------------------------------------------------------------- -// Query predicate helpers -// --------------------------------------------------------------------------- - -/// Extract optional predicates from top-level query: -/// - `{"range": ...}` -> `(label_filters=[], time_range=Some(...))` -/// - `{"bool": {"filter": ...}}` -> label filters + optional time range -/// - `None`/`null` query is represented by caller as `(vec![], None)`. -pub fn extract_predicates_from_query( - query: &Value, -) -> Option<(Vec, Option)> { - if query.is_null() { - return Some((Vec::new(), None)); - } - - if let Some(time_range) = extract_time_range(query) { - return Some((Vec::new(), Some(time_range))); - } - - if query.get("bool").is_some() { - return extract_label_filters(query); - } - - None -} - -// --------------------------------------------------------------------------- -// Group-by helpers -// --------------------------------------------------------------------------- - -/// Try to extract a grouped aggregation from top-level `"aggs"` object. -/// -/// Expected shape: -/// ```json -/// { -/// "aggs": { -/// "": { -/// "terms": { "field": "