Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions asap-common/dependencies/rs/asap_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition.workspace = true

[dependencies]
promql_utilities.workspace = true
elastic_dsl_utilities.workspace = true
tracing.workspace = true
sql_utilities.workspace = true
serde.workspace = true
Expand Down
63 changes: 60 additions & 3 deletions asap-common/dependencies/rs/asap_types/src/inference_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::aggregation_reference::AggregationReference;
use crate::enums::{CleanupPolicy, QueryLanguage};
use crate::promql_schema::PromQLSchema;
use crate::query_config::QueryConfig;
use elastic_dsl_utilities::{ElasticIndexSchema, ElasticMappingSchema};
use promql_utilities::data_model::KeyByLabelNames;
use sql_utilities::sqlhelper::{SQLSchema, Table};

Expand All @@ -16,7 +17,7 @@ use sql_utilities::sqlhelper::{SQLSchema, Table};
pub enum SchemaConfig {
PromQL(PromQLSchema),
SQL(SQLSchema),
ElasticQueryDSL,
ElasticQueryDSL(ElasticMappingSchema),
ElasticSQL(SQLSchema),
}

Expand All @@ -32,7 +33,9 @@ impl InferenceConfig {
let schema = match query_language {
QueryLanguage::promql => SchemaConfig::PromQL(PromQLSchema::new()),
QueryLanguage::sql => SchemaConfig::SQL(SQLSchema::new(Vec::new())),
QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL,
QueryLanguage::elastic_querydsl => {
SchemaConfig::ElasticQueryDSL(ElasticMappingSchema::new(Vec::new()))
}
QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL(SQLSchema::new(Vec::new())),
};
Self {
Expand Down Expand Up @@ -60,7 +63,10 @@ impl InferenceConfig {
let sql_schema = Self::parse_sql_schema(data)?;
SchemaConfig::SQL(sql_schema)
}
QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL,
QueryLanguage::elastic_querydsl => {
let elastic_schema = Self::parse_elastic_querydsl_schema(data)?;
SchemaConfig::ElasticQueryDSL(elastic_schema)
}
QueryLanguage::elastic_sql => {
let sql_schema = Self::parse_sql_schema(data)?;
SchemaConfig::SQL(sql_schema)
Expand Down Expand Up @@ -153,6 +159,57 @@ impl InferenceConfig {
Ok(SQLSchema::new(tables))
}

/// Parse Elasticsearch mapping schema from YAML data (indices: key at top level).
fn parse_elastic_querydsl_schema(data: &Value) -> Result<ElasticMappingSchema> {
let Some(indices_data) = data.get("indices").and_then(|v| v.as_sequence()) else {
return Ok(ElasticMappingSchema::new(Vec::new()));
};

let mut indices = Vec::new();
for index_data in indices_data {
let name = index_data
.get("name")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing name field in elastic index"))?
.to_string();

let time_field = index_data
.get("time_field")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing time_field field in elastic index {name}"))?
.to_string();

let metric_columns: HashSet<String> = index_data
.get("metric_columns")
.and_then(|v| v.as_sequence())
.ok_or_else(|| {
anyhow::anyhow!("Missing metric_columns field in elastic index {name}")
})?
.iter()
.filter_map(|v| v.as_str())
.map(|s| s.to_string())
.collect();

let metadata_columns: HashSet<String> = index_data
.get("metadata_columns")
.and_then(|v| v.as_sequence())
.ok_or_else(|| {
anyhow::anyhow!("Missing metadata_columns field in elastic index {name}")
})?
.iter()
.filter_map(|v| v.as_str())
.map(|s| s.to_string())
.collect();

indices.push((
name,
ElasticIndexSchema::new(time_field, metric_columns, metadata_columns),
));
}

Ok(ElasticMappingSchema::new(indices))
}

/// Parse cleanup policy from YAML data. Errors if not specified.
fn parse_cleanup_policy(data: &Value) -> Result<CleanupPolicy> {
let cleanup_policy_data = data.get("cleanup_policy").ok_or_else(|| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl StreamingConfig {
.map(|ic| match &ic.schema {
SchemaConfig::PromQL(_) => QueryLanguage::promql,
SchemaConfig::SQL(_) => QueryLanguage::sql,
SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl,
SchemaConfig::ElasticQueryDSL(_) => QueryLanguage::elastic_querydsl,
SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql,
})
.unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::ast_parsing::query_info::{
AggregationType, ElasticDSLQueryInfo, FieldName, GroupBySpec, Predicate, TermValue,
};
use crate::datemath::TimeRange;
use crate::helpers::strip_keyword_suffix;
use elasticsearch_dsl_ast::{self as dsl};
use serde_json;
Expand Down Expand Up @@ -36,8 +37,10 @@ pub fn walk_ast_and_extract_info(ast: &dsl::Search) -> Option<ElasticDSLQueryInf
};
let (target_field, aggregation_type, group_by_spec) =
walk_aggregations_and_extract_info(&ast.aggs)?;
let time_field = infer_time_field(&predicates);
Some(ElasticDSLQueryInfo::new(
target_field,
time_field,
predicates,
group_by_spec,
aggregation_type,
Expand Down Expand Up @@ -205,6 +208,32 @@ fn map_term_to_json_value(term: &dsl::Term) -> Option<TermValue> {
}
}

fn infer_time_field(predicates: &[Predicate]) -> FieldName {
for predicate in predicates {
if let Predicate::Range { field, gte, lte } = predicate {
let bound_is_time_like = gte.iter().chain(lte.iter()).any(|term| {
matches!(term, TermValue::String(value) if TimeRange::parse_date_math(value.as_str(), 0).is_some())
});
let looks_like_time_field = field == "@timestamp"
|| field.contains("time")
|| field.contains("timestamp")
|| bound_is_time_like;

if looks_like_time_field {
return field.clone();
}
}
}

for predicate in predicates {
if let Predicate::Range { field, .. } = predicate {
return field.clone();
}
}

"@timestamp".to_string()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -350,6 +379,7 @@ mod tests {
let info = walk_ast_and_extract_info(&ast).expect("info should parse");

assert_eq!(info.target_field, "latency_ms");
assert_eq!(info.time_field, "@timestamp");
assert_eq!(info.aggregation, AggregationType::Max);
assert_eq!(info.predicates.len(), 2);
assert_eq!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub type FieldName = String;
pub struct ElasticDSLQueryInfo {
// A distilled representation of an ElasticSearch DSL query, capturing the essential logic and structure.
pub target_field: FieldName, // List of metrics being queried
pub time_field: FieldName, // Time field used by the query's range filter
pub predicates: Vec<Predicate>, // Predicates applied to the query (e.g. filters in bool.filter)
pub group_by_buckets: Option<GroupBySpec>, // Grouping specification if the query includes a group by clause
pub aggregation: AggregationType, // The statistic being computed (e.g. avg, sum, percentiles)
Expand All @@ -16,12 +17,14 @@ impl ElasticDSLQueryInfo {

pub fn new(
target_field: FieldName,
time_field: FieldName,
predicates: Vec<Predicate>,
group_by_buckets: Option<GroupBySpec>,
aggregation: AggregationType,
) -> Self {
Self {
target_field,
time_field,
predicates,
group_by_buckets,
aggregation,
Expand Down
69 changes: 69 additions & 0 deletions asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,72 @@ pub mod helpers;
pub use ast_parsing::*;
pub use datemath::*;
pub use helpers::*;

use std::collections::{HashMap, HashSet};

#[derive(Debug, Clone)]
pub struct ElasticIndexSchema {
pub time_field: String,
pub metric_columns: HashSet<String>,
pub metadata_columns: HashSet<String>,
}

impl ElasticIndexSchema {
pub fn new(
time_field: String,
metric_columns: HashSet<String>,
metadata_columns: HashSet<String>,
) -> Self {
Self {
time_field,
metric_columns,
metadata_columns,
}
}
}

#[derive(Debug, Clone)]
pub struct ElasticMappingSchema {
pub config: HashMap<String, ElasticIndexSchema>,
}

impl ElasticMappingSchema {
pub fn new(indexes: Vec<(String, ElasticIndexSchema)>) -> Self {
let mut config = HashMap::new();
for (index_name, index_schema) in indexes {
config.insert(index_name, index_schema);
}
Self { config }
}

pub fn add_index(mut self, index: String, schema: ElasticIndexSchema) -> Self {
self.config.insert(index, schema);
self
}

pub fn get_time_field(&self, index: &str) -> Option<&String> {
self.config.get(index).map(|schema| &schema.time_field)
}

pub fn get_metric_columns(&self, index: &str) -> Option<&HashSet<String>> {
self.config.get(index).map(|schema| &schema.metric_columns)
}

pub fn get_metadata_columns(&self, index: &str) -> Option<&HashSet<String>> {
self.config
.get(index)
.map(|schema| &schema.metadata_columns)
}

pub fn is_valid_metric_column(&self, index: &str, metric_column: &str) -> bool {
self.get_metric_columns(index)
.map(|columns| columns.contains(metric_column))
.unwrap_or(false)
}

pub fn are_valid_metadata_columns(&self, index: &str, columns: &HashSet<String>) -> bool {
self.get_metadata_columns(index)
.map(|schema_columns| columns.iter().all(|c| schema_columns.contains(c)))
.unwrap_or(false)
}
}
1 change: 1 addition & 0 deletions asap-planner-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ path = "src/main.rs"
asap_types.workspace = true
promql_utilities.workspace = true
sql_utilities.workspace = true
elastic_dsl_utilities.workspace = true
sqlparser = "0.59.0"
serde.workspace = true
serde_json.workspace = true
Expand Down
17 changes: 17 additions & 0 deletions asap-planner-rs/src/config/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,20 @@ pub struct TableDefinition {
pub value_columns: Vec<String>,
pub metadata_columns: Vec<String>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct ElasticDSLControllerConfig {
pub query_groups: Vec<ElasticDSLQueryGroup>,
pub index: Option<String>,
pub sketch_parameters: Option<SketchParameterOverrides>,
pub aggregate_cleanup: Option<AggregateCleanupConfig>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct ElasticDSLQueryGroup {
pub id: Option<u32>,
pub queries: Vec<String>,
pub repetition_delay: u64,
pub index: Option<String>,
pub controller_options: ControllerOptions,
}
45 changes: 45 additions & 0 deletions asap-planner-rs/src/elastic_dsl/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::path::Path;

use crate::config::input::ElasticDSLControllerConfig;
use crate::elastic_dsl::generator::{generate_elastic_plan, ElasticRuntimeOptions};
use crate::error::ControllerError;
use crate::planner_output::PlannerOutput;

pub struct ElasticController {
config: ElasticDSLControllerConfig,
options: ElasticRuntimeOptions,
}

impl ElasticController {
pub fn new(config: ElasticDSLControllerConfig, options: ElasticRuntimeOptions) -> Self {
Self { config, options }
}

pub fn from_file(path: &Path, opts: ElasticRuntimeOptions) -> Result<Self, ControllerError> {
let yaml_str = std::fs::read_to_string(path)?;
Self::from_yaml(&yaml_str, opts)
}

pub fn from_yaml(yaml: &str, opts: ElasticRuntimeOptions) -> Result<Self, ControllerError> {
let config: ElasticDSLControllerConfig = serde_yaml::from_str(yaml)?;
Ok(Self {
config,
options: opts,
})
}

pub fn generate(&self) -> Result<PlannerOutput, ControllerError> {
let output = generate_elastic_plan(&self.config, &self.options)?;
Ok(PlannerOutput::from_output(output))
}

pub fn generate_to_dir(&self, dir: &Path) -> Result<PlannerOutput, ControllerError> {
let output = self.generate()?;
std::fs::create_dir_all(dir)?;
let streaming_str = serde_yaml::to_string(output.streaming_yaml())?;
let inference_str = serde_yaml::to_string(output.inference_yaml())?;
std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?;
std::fs::write(dir.join("inference_config.yaml"), inference_str)?;
Ok(output)
}
}
Loading
Loading