diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 4683cd0e..43464627 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -1,3 +1,4 @@ +use asap_types::enums::CleanupPolicy; use asap_types::PromQLSchema; use promql_utilities::data_model::KeyByLabelNames; use serde::Deserialize; @@ -58,7 +59,7 @@ pub struct MetricDefinition { #[derive(Debug, Clone, Deserialize)] pub struct AggregateCleanupConfig { - pub policy: Option, + pub policy: Option, } #[derive(Debug, Clone, Deserialize, Default)] diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index dbdeb407..5aa3196b 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -18,6 +18,11 @@ pub use config::input::ControllerConfig; pub use config::input::SQLControllerConfig; pub use error::ControllerError; pub use output::generator::{GeneratorOutput, PuntedQuery}; +use output::generator::{ + KEY_AGGREGATIONS, KEY_AGG_SUB_TYPE, KEY_AGG_TYPE, KEY_LABELS, KEY_NUM_AGG_TO_RETAIN, + KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD, KEY_TABLE_NAME, KEY_VALUE_COLUMN, + KEY_WINDOW_SIZE, +}; pub use output::sql_generator::SQLRuntimeOptions; pub use prometheus_client::build_schema_from_prometheus; @@ -61,20 +66,28 @@ impl PlannerOutput { self.query_count } - pub fn has_aggregation_type(&self, t: &str) -> bool { + fn streaming_aggs_slice(&self) -> Option<&[YamlValue]> { if let YamlValue::Mapping(root) = &self.streaming_yaml { - if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") { - return aggs.iter().any(|agg| { - if let YamlValue::Mapping(m) = agg { - if let Some(YamlValue::String(agg_type)) = m.get("aggregationType") { - return agg_type == t; - } - } - false - }); + if let Some(YamlValue::Sequence(aggs)) = root.get(KEY_AGGREGATIONS) { + return Some(aggs.as_slice()); } } - false + None + } + + fn find_aggregation_by_type(&self, agg_type: &str) -> Option<&serde_yaml::Mapping> { + self.streaming_aggs_slice()?.iter().find_map(|agg| { + if let YamlValue::Mapping(m) = agg { + if matches!(m.get(KEY_AGG_TYPE), Some(YamlValue::String(s)) if s == agg_type) { + return Some(m); + } + } + None + }) + } + + pub fn has_aggregation_type(&self, t: &str) -> bool { + self.find_aggregation_by_type(t).is_some() } pub fn all_tumbling_window_sizes_eq(&self, s: u64) -> bool { @@ -86,11 +99,11 @@ impl PlannerOutput { } fn check_tumbling_window_sizes(&self, predicate: impl Fn(u64) -> bool) -> bool { - if let YamlValue::Mapping(root) = &self.streaming_yaml { - if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") { - return aggs.iter().all(|agg| { + self.streaming_aggs_slice() + .map(|aggs| { + aggs.iter().all(|agg| { if let YamlValue::Mapping(m) = agg { - if let Some(val) = m.get("windowSize") { + if let Some(val) = m.get(KEY_WINDOW_SIZE) { let size = match val { YamlValue::Number(n) => n.as_u64().unwrap_or(0), _ => 0, @@ -99,59 +112,61 @@ impl PlannerOutput { } } false - }); - } - } - false + }) + }) + .unwrap_or(false) } /// Returns the sorted labels for the first aggregation matching `agg_type`, /// for the given `label_kind` ("rollup", "grouping", or "aggregated"). pub fn aggregation_labels(&self, agg_type: &str, label_kind: &str) -> Vec { - if let YamlValue::Mapping(root) = &self.streaming_yaml { - if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") { - for agg in aggs { - if let YamlValue::Mapping(m) = agg { - if let Some(YamlValue::String(t)) = m.get("aggregationType") { - if t == agg_type { - if let Some(YamlValue::Mapping(labels)) = m.get("labels") { - if let Some(YamlValue::Sequence(seq)) = labels.get(label_kind) { - let mut result: Vec = seq - .iter() - .filter_map(|v| { - if let YamlValue::String(s) = v { - Some(s.clone()) - } else { - None - } - }) - .collect(); - result.sort(); - return result; - } - } - } - } - } + let Some(seq) = self + .find_aggregation_by_type(agg_type) + .and_then(|m| m.get(KEY_LABELS)) + .and_then(|v| { + if let YamlValue::Mapping(lm) = v { + Some(lm) + } else { + None } - } - } - vec![] + }) + .and_then(|lm| lm.get(label_kind)) + .and_then(|v| { + if let YamlValue::Sequence(seq) = v { + Some(seq) + } else { + None + } + }) + else { + return vec![]; + }; + let mut result: Vec = seq + .iter() + .filter_map(|v| { + if let YamlValue::String(s) = v { + Some(s.clone()) + } else { + None + } + }) + .collect(); + result.sort(); + result } /// Returns the cleanup param (read_count_threshold or num_aggregates_to_retain) /// for the first aggregation entry of the given query string. pub fn inference_cleanup_param(&self, query: &str) -> Option { if let YamlValue::Mapping(root) = &self.inference_yaml { - if let Some(YamlValue::Sequence(queries)) = root.get("queries") { + if let Some(YamlValue::Sequence(queries)) = root.get(KEY_QUERIES) { for q in queries { if let YamlValue::Mapping(qm) = q { - if let Some(YamlValue::String(qs)) = qm.get("query") { + if let Some(YamlValue::String(qs)) = qm.get(KEY_QUERY) { if qs == query { - if let Some(YamlValue::Sequence(aggs)) = qm.get("aggregations") { + if let Some(YamlValue::Sequence(aggs)) = qm.get(KEY_AGGREGATIONS) { if let Some(YamlValue::Mapping(agg)) = aggs.first() { - for key in - ["read_count_threshold", "num_aggregates_to_retain"] + for key in [KEY_READ_COUNT_THRESHOLD, KEY_NUM_AGG_TO_RETAIN] { if let Some(YamlValue::Number(n)) = agg.get(key) { return n.as_u64(); @@ -193,72 +208,44 @@ impl PlannerOutput { /// Returns the table_name field of the first aggregation matching agg_type. pub fn aggregation_table_name(&self, agg_type: &str) -> Option { - if let YamlValue::Mapping(root) = &self.streaming_yaml { - if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") { - for agg in aggs { - if let YamlValue::Mapping(m) = agg { - if let Some(YamlValue::String(t)) = m.get("aggregationType") { - if t == agg_type { - if let Some(YamlValue::String(name)) = m.get("table_name") { - return Some(name.clone()); - } - } - } - } + self.find_aggregation_by_type(agg_type) + .and_then(|m| m.get(KEY_TABLE_NAME)) + .and_then(|v| { + if let YamlValue::String(s) = v { + Some(s.clone()) + } else { + None } - } - } - None + }) } /// Returns the value_column field of the first aggregation matching agg_type. pub fn aggregation_value_column(&self, agg_type: &str) -> Option { - if let YamlValue::Mapping(root) = &self.streaming_yaml { - if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") { - for agg in aggs { - if let YamlValue::Mapping(m) = agg { - if let Some(YamlValue::String(t)) = m.get("aggregationType") { - if t == agg_type { - if let Some(YamlValue::String(col)) = m.get("value_column") { - return Some(col.clone()); - } - } - } - } + self.find_aggregation_by_type(agg_type) + .and_then(|m| m.get(KEY_VALUE_COLUMN)) + .and_then(|v| { + if let YamlValue::String(s) = v { + Some(s.clone()) + } else { + None } - } - } - None + }) } /// Returns true if any aggregation has the matching type AND sub_type. pub fn has_aggregation_type_and_sub_type(&self, agg_type: &str, sub_type: &str) -> bool { - if let YamlValue::Mapping(root) = &self.streaming_yaml { - if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") { - return aggs.iter().any(|agg| { + self.streaming_aggs_slice() + .map(|aggs| { + aggs.iter().any(|agg| { if let YamlValue::Mapping(m) = agg { - let type_matches = m.get("aggregationType").and_then(|v| { - if let YamlValue::String(s) = v { - Some(s.as_str()) - } else { - None - } - }) == Some(agg_type); - let sub_matches = m.get("aggregationSubType").and_then(|v| { - if let YamlValue::String(s) = v { - Some(s.as_str()) - } else { - None - } - }) == Some(sub_type); - type_matches && sub_matches + matches!(m.get(KEY_AGG_TYPE), Some(YamlValue::String(s)) if s == agg_type) + && matches!(m.get(KEY_AGG_SUB_TYPE), Some(YamlValue::String(s)) if s == sub_type) } else { false } - }); - } - } - false + }) + }) + .unwrap_or(false) } } diff --git a/asap-planner-rs/src/output/generator.rs b/asap-planner-rs/src/output/generator.rs index 6da86ab0..ce902400 100644 --- a/asap-planner-rs/src/output/generator.rs +++ b/asap-planner-rs/src/output/generator.rs @@ -12,6 +12,36 @@ use crate::error::ControllerError; use crate::planner::single_query::{BinaryArm, IntermediateAggConfig, SingleQueryProcessor}; use crate::RuntimeOptions; +// YAML key constants — shared with sql_generator.rs and lib.rs via pub(crate) +pub(crate) const KEY_AGG_ID: &str = "aggregationId"; +pub(crate) const KEY_AGG_SUB_TYPE: &str = "aggregationSubType"; +pub(crate) const KEY_AGG_TYPE: &str = "aggregationType"; +pub(crate) const KEY_AGGREGATION_ID: &str = "aggregation_id"; +pub(crate) const KEY_AGGREGATIONS: &str = "aggregations"; +pub(crate) const KEY_CLEANUP_POLICY: &str = "cleanup_policy"; +pub(crate) const KEY_LABELS: &str = "labels"; +pub(crate) const KEY_LABELS_AGGREGATED: &str = "aggregated"; +pub(crate) const KEY_LABELS_GROUPING: &str = "grouping"; +pub(crate) const KEY_LABELS_ROLLUP: &str = "rollup"; +pub(crate) const KEY_METADATA_COLUMNS: &str = "metadata_columns"; +pub(crate) const KEY_METRIC: &str = "metric"; +pub(crate) const KEY_METRICS: &str = "metrics"; +pub(crate) const KEY_NAME: &str = "name"; +pub(crate) const KEY_NUM_AGG_TO_RETAIN: &str = "num_aggregates_to_retain"; +pub(crate) const KEY_PARAMETERS: &str = "parameters"; +pub(crate) const KEY_QUERIES: &str = "queries"; +pub(crate) const KEY_QUERY: &str = "query"; +pub(crate) const KEY_READ_COUNT_THRESHOLD: &str = "read_count_threshold"; +pub(crate) const KEY_SLIDE_INTERVAL: &str = "slideInterval"; +pub(crate) const KEY_SPATIAL_FILTER: &str = "spatialFilter"; +pub(crate) const KEY_TABLE_NAME: &str = "table_name"; +pub(crate) const KEY_TABLES: &str = "tables"; +pub(crate) const KEY_TIME_COLUMN: &str = "time_column"; +pub(crate) const KEY_VALUE_COLUMN: &str = "value_column"; +pub(crate) const KEY_VALUE_COLUMNS: &str = "value_columns"; +pub(crate) const KEY_WINDOW_SIZE: &str = "windowSize"; +pub(crate) const KEY_WINDOW_TYPE: &str = "windowType"; + /// `(query_string, Vec<(identifying_key, cleanup_param)>)` pairs produced by binary leaf decomposition. type LeafEntries = Vec<(String, Vec<(String, Option)>)>; @@ -24,14 +54,11 @@ pub fn generate_plan( let metric_schema = schema.clone(); // Determine cleanup policy - let cleanup_policy_str = controller_config + let cleanup_policy = controller_config .aggregate_cleanup .as_ref() - .and_then(|c| c.policy.as_deref()) - .unwrap_or("read_based"); - let cleanup_policy = cleanup_policy_str.parse::().map_err(|_| { - ControllerError::PlannerError(format!("Unknown cleanup policy: {}", cleanup_policy_str)) - })?; + .and_then(|c| c.policy) + .unwrap_or(CleanupPolicy::ReadBased); // Validate no duplicate queries let mut seen_queries = std::collections::HashSet::new(); @@ -116,13 +143,8 @@ pub fn generate_plan( let streaming_yaml = build_streaming_yaml(&dedup_map, &id_map, &metric_schema)?; // Build inference_config YAML - let inference_yaml = build_inference_yaml( - cleanup_policy, - cleanup_policy_str, - &query_keys_map, - &id_map, - &metric_schema, - )?; + let inference_yaml = + build_inference_yaml(cleanup_policy, &query_keys_map, &id_map, &metric_schema)?; Ok(GeneratorOutput { punted_queries, @@ -202,72 +224,72 @@ pub fn key_by_labels_to_yaml(labels: &KeyByLabelNames) -> YamlValue { pub fn build_aggregation_entry(id: u32, cfg: &IntermediateAggConfig) -> YamlValue { let mut map = serde_yaml::Mapping::new(); map.insert( - YamlValue::String("aggregationId".to_string()), + YamlValue::String(KEY_AGG_ID.to_string()), YamlValue::Number(id.into()), ); map.insert( - YamlValue::String("aggregationSubType".to_string()), + YamlValue::String(KEY_AGG_SUB_TYPE.to_string()), YamlValue::String(cfg.aggregation_sub_type.clone()), ); map.insert( - YamlValue::String("aggregationType".to_string()), + YamlValue::String(KEY_AGG_TYPE.to_string()), YamlValue::String(cfg.aggregation_type.to_string()), ); let mut labels_map = serde_yaml::Mapping::new(); labels_map.insert( - YamlValue::String("aggregated".to_string()), + YamlValue::String(KEY_LABELS_AGGREGATED.to_string()), key_by_labels_to_yaml(&cfg.aggregated_labels), ); labels_map.insert( - YamlValue::String("grouping".to_string()), + YamlValue::String(KEY_LABELS_GROUPING.to_string()), key_by_labels_to_yaml(&cfg.grouping_labels), ); labels_map.insert( - YamlValue::String("rollup".to_string()), + YamlValue::String(KEY_LABELS_ROLLUP.to_string()), key_by_labels_to_yaml(&cfg.rollup_labels), ); map.insert( - YamlValue::String("labels".to_string()), + YamlValue::String(KEY_LABELS.to_string()), YamlValue::Mapping(labels_map), ); map.insert( - YamlValue::String("metric".to_string()), + YamlValue::String(KEY_METRIC.to_string()), YamlValue::String(cfg.metric.clone()), ); map.insert( - YamlValue::String("parameters".to_string()), + YamlValue::String(KEY_PARAMETERS.to_string()), params_to_yaml(&cfg.parameters), ); map.insert( - YamlValue::String("slideInterval".to_string()), + YamlValue::String(KEY_SLIDE_INTERVAL.to_string()), YamlValue::Number(cfg.slide_interval.into()), ); map.insert( - YamlValue::String("spatialFilter".to_string()), + YamlValue::String(KEY_SPATIAL_FILTER.to_string()), YamlValue::String(cfg.spatial_filter.clone()), ); map.insert( - YamlValue::String("table_name".to_string()), + YamlValue::String(KEY_TABLE_NAME.to_string()), match &cfg.table_name { Some(t) => YamlValue::String(t.clone()), None => YamlValue::Null, }, ); map.insert( - YamlValue::String("value_column".to_string()), + YamlValue::String(KEY_VALUE_COLUMN.to_string()), match &cfg.value_column { Some(v) => YamlValue::String(v.clone()), None => YamlValue::Null, }, ); map.insert( - YamlValue::String("windowSize".to_string()), + YamlValue::String(KEY_WINDOW_SIZE.to_string()), YamlValue::Number(cfg.window_size.into()), ); map.insert( - YamlValue::String("windowType".to_string()), + YamlValue::String(KEY_WINDOW_TYPE.to_string()), YamlValue::String(cfg.window_type.to_string()), ); @@ -288,20 +310,20 @@ pub fn build_queries_yaml( let agg_id = id_map[key]; let mut agg_map = serde_yaml::Mapping::new(); agg_map.insert( - YamlValue::String("aggregation_id".to_string()), + YamlValue::String(KEY_AGGREGATION_ID.to_string()), YamlValue::Number(agg_id.into()), ); if let Some(param) = cleanup_param { match cleanup_policy { CleanupPolicy::CircularBuffer => { agg_map.insert( - YamlValue::String("num_aggregates_to_retain".to_string()), + YamlValue::String(KEY_NUM_AGG_TO_RETAIN.to_string()), YamlValue::Number((*param).into()), ); } CleanupPolicy::ReadBased => { agg_map.insert( - YamlValue::String("read_count_threshold".to_string()), + YamlValue::String(KEY_READ_COUNT_THRESHOLD.to_string()), YamlValue::Number((*param).into()), ); } @@ -314,11 +336,11 @@ pub fn build_queries_yaml( let mut q_map = serde_yaml::Mapping::new(); q_map.insert( - YamlValue::String("aggregations".to_string()), + YamlValue::String(KEY_AGGREGATIONS.to_string()), YamlValue::Sequence(aggregations), ); q_map.insert( - YamlValue::String("query".to_string()), + YamlValue::String(KEY_QUERY.to_string()), YamlValue::String(query_str.clone()), ); YamlValue::Mapping(q_map) @@ -375,11 +397,11 @@ fn build_streaming_yaml( let mut root = serde_yaml::Mapping::new(); root.insert( - YamlValue::String("aggregations".to_string()), + YamlValue::String(KEY_AGGREGATIONS.to_string()), YamlValue::Sequence(aggregations), ); root.insert( - YamlValue::String("metrics".to_string()), + YamlValue::String(KEY_METRICS.to_string()), YamlValue::Mapping(metrics_map), ); @@ -388,15 +410,14 @@ fn build_streaming_yaml( fn build_inference_yaml( cleanup_policy: CleanupPolicy, - cleanup_policy_str: &str, query_keys_map: &IndexMap)>>, id_map: &HashMap, metric_schema: &asap_types::PromQLSchema, ) -> Result { let mut cleanup_map = serde_yaml::Mapping::new(); cleanup_map.insert( - YamlValue::String("name".to_string()), - YamlValue::String(cleanup_policy_str.to_string()), + YamlValue::String(KEY_NAME.to_string()), + YamlValue::String(cleanup_policy.to_string()), ); let queries = build_queries_yaml(cleanup_policy, query_keys_map, id_map); @@ -412,15 +433,15 @@ fn build_inference_yaml( let mut root = serde_yaml::Mapping::new(); root.insert( - YamlValue::String("cleanup_policy".to_string()), + YamlValue::String(KEY_CLEANUP_POLICY.to_string()), YamlValue::Mapping(cleanup_map), ); root.insert( - YamlValue::String("metrics".to_string()), + YamlValue::String(KEY_METRICS.to_string()), YamlValue::Mapping(metrics_map), ); root.insert( - YamlValue::String("queries".to_string()), + YamlValue::String(KEY_QUERIES.to_string()), YamlValue::Sequence(queries), ); diff --git a/asap-planner-rs/src/output/sql_generator.rs b/asap-planner-rs/src/output/sql_generator.rs index 73851f03..23bbda73 100644 --- a/asap-planner-rs/src/output/sql_generator.rs +++ b/asap-planner-rs/src/output/sql_generator.rs @@ -6,7 +6,11 @@ use std::time::{SystemTime, UNIX_EPOCH}; use crate::config::input::SQLControllerConfig; use crate::error::ControllerError; -use crate::output::generator::{build_aggregation_entry, build_queries_yaml, GeneratorOutput}; +use crate::output::generator::{ + build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, + KEY_CLEANUP_POLICY, KEY_METADATA_COLUMNS, KEY_NAME, KEY_QUERIES, KEY_TABLES, KEY_TIME_COLUMN, + KEY_VALUE_COLUMNS, +}; use crate::planner::single_query::IntermediateAggConfig; use crate::planner::sql_single_query::SQLSingleQueryProcessor; use crate::StreamingEngine; @@ -28,14 +32,11 @@ pub fn generate_sql_plan( .as_secs_f64() }); - let cleanup_policy_str = config + let cleanup_policy = config .aggregate_cleanup .as_ref() - .and_then(|c| c.policy.as_deref()) - .unwrap_or("read_based"); - let cleanup_policy = cleanup_policy_str.parse::().map_err(|_| { - ControllerError::PlannerError(format!("Unknown cleanup policy: {}", cleanup_policy_str)) - })?; + .and_then(|c| c.policy) + .unwrap_or(CleanupPolicy::ReadBased); // Validate T % data_ingestion_interval == 0 for qg in &config.query_groups { @@ -94,13 +95,8 @@ pub fn generate_sql_plan( } let streaming_yaml = build_sql_streaming_yaml(config, &dedup_map, &id_map)?; - let inference_yaml = build_sql_inference_yaml( - config, - cleanup_policy, - cleanup_policy_str, - &query_keys_map, - &id_map, - )?; + let inference_yaml = + build_sql_inference_yaml(config, cleanup_policy, &query_keys_map, &id_map)?; Ok(GeneratorOutput { punted_queries: Vec::new(), @@ -118,15 +114,15 @@ fn build_tables_yaml(config: &SQLControllerConfig) -> Vec { .map(|t| { let mut map = serde_yaml::Mapping::new(); map.insert( - YamlValue::String("name".to_string()), + YamlValue::String(KEY_NAME.to_string()), YamlValue::String(t.name.clone()), ); map.insert( - YamlValue::String("time_column".to_string()), + YamlValue::String(KEY_TIME_COLUMN.to_string()), YamlValue::String(t.time_column.clone()), ); map.insert( - YamlValue::String("value_columns".to_string()), + YamlValue::String(KEY_VALUE_COLUMNS.to_string()), YamlValue::Sequence( t.value_columns .iter() @@ -135,7 +131,7 @@ fn build_tables_yaml(config: &SQLControllerConfig) -> Vec { ), ); map.insert( - YamlValue::String("metadata_columns".to_string()), + YamlValue::String(KEY_METADATA_COLUMNS.to_string()), YamlValue::Sequence( t.metadata_columns .iter() @@ -160,11 +156,11 @@ fn build_sql_streaming_yaml( let mut root = serde_yaml::Mapping::new(); root.insert( - YamlValue::String("aggregations".to_string()), + YamlValue::String(KEY_AGGREGATIONS.to_string()), YamlValue::Sequence(aggregations), ); root.insert( - YamlValue::String("tables".to_string()), + YamlValue::String(KEY_TABLES.to_string()), YamlValue::Sequence(build_tables_yaml(config)), ); @@ -174,27 +170,26 @@ fn build_sql_streaming_yaml( fn build_sql_inference_yaml( config: &SQLControllerConfig, cleanup_policy: CleanupPolicy, - cleanup_policy_str: &str, query_keys_map: &IndexMap)>>, id_map: &HashMap, ) -> Result { let mut cleanup_map = serde_yaml::Mapping::new(); cleanup_map.insert( - YamlValue::String("name".to_string()), - YamlValue::String(cleanup_policy_str.to_string()), + YamlValue::String(KEY_NAME.to_string()), + YamlValue::String(cleanup_policy.to_string()), ); let mut root = serde_yaml::Mapping::new(); root.insert( - YamlValue::String("cleanup_policy".to_string()), + YamlValue::String(KEY_CLEANUP_POLICY.to_string()), YamlValue::Mapping(cleanup_map), ); root.insert( - YamlValue::String("queries".to_string()), + YamlValue::String(KEY_QUERIES.to_string()), YamlValue::Sequence(build_queries_yaml(cleanup_policy, query_keys_map, id_map)), ); root.insert( - YamlValue::String("tables".to_string()), + YamlValue::String(KEY_TABLES.to_string()), YamlValue::Sequence(build_tables_yaml(config)), ); diff --git a/asap-planner-rs/src/query_log/converter.rs b/asap-planner-rs/src/query_log/converter.rs index 22babb86..dad93532 100644 --- a/asap-planner-rs/src/query_log/converter.rs +++ b/asap-planner-rs/src/query_log/converter.rs @@ -1,3 +1,5 @@ +use asap_types::enums::CleanupPolicy; + use crate::config::input::{AggregateCleanupConfig, ControllerConfig, QueryGroup}; use super::frequency::{InstantQueryInfo, RangeQueryInfo}; @@ -37,7 +39,7 @@ pub fn to_controller_config( query_groups, sketch_parameters: None, aggregate_cleanup: Some(AggregateCleanupConfig { - policy: Some("read_based".to_string()), + policy: Some(CleanupPolicy::ReadBased), }), metrics: None, } diff --git a/asap-planner-rs/tests/integration.rs b/asap-planner-rs/tests/integration.rs index 3bae948e..5426ac76 100644 --- a/asap-planner-rs/tests/integration.rs +++ b/asap-planner-rs/tests/integration.rs @@ -339,10 +339,11 @@ query_groups: aggregate_cleanup: policy: "not_a_real_policy" "#; - let c = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()).unwrap(); + // Invalid policy is now caught at deserialization time (YamlParse) rather than at + // generate() time (PlannerError), since the field is typed as Option. assert!(matches!( - c.generate(), - Err(ControllerError::PlannerError(_)) + Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()), + Err(ControllerError::YamlParse(_)) )); } diff --git a/asap-planner-rs/tests/sql_integration.rs b/asap-planner-rs/tests/sql_integration.rs index 485a3079..2f48f1b4 100644 --- a/asap-planner-rs/tests/sql_integration.rs +++ b/asap-planner-rs/tests/sql_integration.rs @@ -746,10 +746,12 @@ aggregate_cleanup: policy: not_a_real_policy "# ); - let result = SQLController::from_yaml(&yaml, sql_opts()) - .unwrap() - .generate(); - assert!(matches!(result, Err(ControllerError::PlannerError(_)))); + // Invalid policy is now caught at deserialization time (YamlParse) rather than at + // generate() time (PlannerError), since the field is typed as Option. + assert!(matches!( + SQLController::from_yaml(&yaml, sql_opts()), + Err(ControllerError::YamlParse(_)) + )); } /// T that is not a multiple of data_ingestion_interval is invalid: sketch windows