Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion asap-planner-rs/src/config/input.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use asap_types::enums::CleanupPolicy;
use asap_types::PromQLSchema;
use promql_utilities::data_model::KeyByLabelNames;
use serde::Deserialize;
Expand Down Expand Up @@ -58,7 +59,7 @@ pub struct MetricDefinition {

#[derive(Debug, Clone, Deserialize)]
pub struct AggregateCleanupConfig {
pub policy: Option<String>,
pub policy: Option<CleanupPolicy>,
}

#[derive(Debug, Clone, Deserialize, Default)]
Expand Down
195 changes: 91 additions & 104 deletions asap-planner-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ pub use config::input::ControllerConfig;
pub use config::input::SQLControllerConfig;
pub use error::ControllerError;
pub use output::generator::{GeneratorOutput, PuntedQuery};
use output::generator::{
KEY_AGGREGATIONS, KEY_AGG_SUB_TYPE, KEY_AGG_TYPE, KEY_LABELS, KEY_NUM_AGG_TO_RETAIN,
KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD, KEY_TABLE_NAME, KEY_VALUE_COLUMN,
KEY_WINDOW_SIZE,
};
pub use output::sql_generator::SQLRuntimeOptions;
pub use prometheus_client::build_schema_from_prometheus;

Expand Down Expand Up @@ -61,20 +66,28 @@ impl PlannerOutput {
self.query_count
}

pub fn has_aggregation_type(&self, t: &str) -> bool {
fn streaming_aggs_slice(&self) -> Option<&[YamlValue]> {
if let YamlValue::Mapping(root) = &self.streaming_yaml {
if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") {
return aggs.iter().any(|agg| {
if let YamlValue::Mapping(m) = agg {
if let Some(YamlValue::String(agg_type)) = m.get("aggregationType") {
return agg_type == t;
}
}
false
});
if let Some(YamlValue::Sequence(aggs)) = root.get(KEY_AGGREGATIONS) {
return Some(aggs.as_slice());
}
}
false
None
}

fn find_aggregation_by_type(&self, agg_type: &str) -> Option<&serde_yaml::Mapping> {
self.streaming_aggs_slice()?.iter().find_map(|agg| {
if let YamlValue::Mapping(m) = agg {
if matches!(m.get(KEY_AGG_TYPE), Some(YamlValue::String(s)) if s == agg_type) {
return Some(m);
}
}
None
})
}

pub fn has_aggregation_type(&self, t: &str) -> bool {
self.find_aggregation_by_type(t).is_some()
}

pub fn all_tumbling_window_sizes_eq(&self, s: u64) -> bool {
Expand All @@ -86,11 +99,11 @@ impl PlannerOutput {
}

fn check_tumbling_window_sizes(&self, predicate: impl Fn(u64) -> bool) -> bool {
if let YamlValue::Mapping(root) = &self.streaming_yaml {
if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") {
return aggs.iter().all(|agg| {
self.streaming_aggs_slice()
.map(|aggs| {
aggs.iter().all(|agg| {
if let YamlValue::Mapping(m) = agg {
if let Some(val) = m.get("windowSize") {
if let Some(val) = m.get(KEY_WINDOW_SIZE) {
let size = match val {
YamlValue::Number(n) => n.as_u64().unwrap_or(0),
_ => 0,
Expand All @@ -99,59 +112,61 @@ impl PlannerOutput {
}
}
false
});
}
}
false
})
})
.unwrap_or(false)
}

/// Returns the sorted labels for the first aggregation matching `agg_type`,
/// for the given `label_kind` ("rollup", "grouping", or "aggregated").
pub fn aggregation_labels(&self, agg_type: &str, label_kind: &str) -> Vec<String> {
if let YamlValue::Mapping(root) = &self.streaming_yaml {
if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") {
for agg in aggs {
if let YamlValue::Mapping(m) = agg {
if let Some(YamlValue::String(t)) = m.get("aggregationType") {
if t == agg_type {
if let Some(YamlValue::Mapping(labels)) = m.get("labels") {
if let Some(YamlValue::Sequence(seq)) = labels.get(label_kind) {
let mut result: Vec<String> = seq
.iter()
.filter_map(|v| {
if let YamlValue::String(s) = v {
Some(s.clone())
} else {
None
}
})
.collect();
result.sort();
return result;
}
}
}
}
}
let Some(seq) = self
.find_aggregation_by_type(agg_type)
.and_then(|m| m.get(KEY_LABELS))
.and_then(|v| {
if let YamlValue::Mapping(lm) = v {
Some(lm)
} else {
None
}
}
}
vec![]
})
.and_then(|lm| lm.get(label_kind))
.and_then(|v| {
if let YamlValue::Sequence(seq) = v {
Some(seq)
} else {
None
}
})
else {
return vec![];
};
let mut result: Vec<String> = seq
.iter()
.filter_map(|v| {
if let YamlValue::String(s) = v {
Some(s.clone())
} else {
None
}
})
.collect();
result.sort();
result
}

/// Returns the cleanup param (read_count_threshold or num_aggregates_to_retain)
/// for the first aggregation entry of the given query string.
pub fn inference_cleanup_param(&self, query: &str) -> Option<u64> {
if let YamlValue::Mapping(root) = &self.inference_yaml {
if let Some(YamlValue::Sequence(queries)) = root.get("queries") {
if let Some(YamlValue::Sequence(queries)) = root.get(KEY_QUERIES) {
for q in queries {
if let YamlValue::Mapping(qm) = q {
if let Some(YamlValue::String(qs)) = qm.get("query") {
if let Some(YamlValue::String(qs)) = qm.get(KEY_QUERY) {
if qs == query {
if let Some(YamlValue::Sequence(aggs)) = qm.get("aggregations") {
if let Some(YamlValue::Sequence(aggs)) = qm.get(KEY_AGGREGATIONS) {
if let Some(YamlValue::Mapping(agg)) = aggs.first() {
for key in
["read_count_threshold", "num_aggregates_to_retain"]
for key in [KEY_READ_COUNT_THRESHOLD, KEY_NUM_AGG_TO_RETAIN]
{
if let Some(YamlValue::Number(n)) = agg.get(key) {
return n.as_u64();
Expand Down Expand Up @@ -193,72 +208,44 @@ impl PlannerOutput {

/// Returns the table_name field of the first aggregation matching agg_type.
pub fn aggregation_table_name(&self, agg_type: &str) -> Option<String> {
if let YamlValue::Mapping(root) = &self.streaming_yaml {
if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") {
for agg in aggs {
if let YamlValue::Mapping(m) = agg {
if let Some(YamlValue::String(t)) = m.get("aggregationType") {
if t == agg_type {
if let Some(YamlValue::String(name)) = m.get("table_name") {
return Some(name.clone());
}
}
}
}
self.find_aggregation_by_type(agg_type)
.and_then(|m| m.get(KEY_TABLE_NAME))
.and_then(|v| {
if let YamlValue::String(s) = v {
Some(s.clone())
} else {
None
}
}
}
None
})
}

/// Returns the value_column field of the first aggregation matching agg_type.
pub fn aggregation_value_column(&self, agg_type: &str) -> Option<String> {
if let YamlValue::Mapping(root) = &self.streaming_yaml {
if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") {
for agg in aggs {
if let YamlValue::Mapping(m) = agg {
if let Some(YamlValue::String(t)) = m.get("aggregationType") {
if t == agg_type {
if let Some(YamlValue::String(col)) = m.get("value_column") {
return Some(col.clone());
}
}
}
}
self.find_aggregation_by_type(agg_type)
.and_then(|m| m.get(KEY_VALUE_COLUMN))
.and_then(|v| {
if let YamlValue::String(s) = v {
Some(s.clone())
} else {
None
}
}
}
None
})
}

/// Returns true if any aggregation has the matching type AND sub_type.
pub fn has_aggregation_type_and_sub_type(&self, agg_type: &str, sub_type: &str) -> bool {
if let YamlValue::Mapping(root) = &self.streaming_yaml {
if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") {
return aggs.iter().any(|agg| {
self.streaming_aggs_slice()
.map(|aggs| {
aggs.iter().any(|agg| {
if let YamlValue::Mapping(m) = agg {
let type_matches = m.get("aggregationType").and_then(|v| {
if let YamlValue::String(s) = v {
Some(s.as_str())
} else {
None
}
}) == Some(agg_type);
let sub_matches = m.get("aggregationSubType").and_then(|v| {
if let YamlValue::String(s) = v {
Some(s.as_str())
} else {
None
}
}) == Some(sub_type);
type_matches && sub_matches
matches!(m.get(KEY_AGG_TYPE), Some(YamlValue::String(s)) if s == agg_type)
&& matches!(m.get(KEY_AGG_SUB_TYPE), Some(YamlValue::String(s)) if s == sub_type)
} else {
false
}
});
}
}
false
})
})
.unwrap_or(false)
}
}

Expand Down
Loading
Loading