Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions asap-common/dependencies/rs/asap_types/src/aggregation_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,26 @@ use serde_json::Value;
use serde_yaml;
use std::collections::HashMap;

use crate::enums::QueryLanguage;
use crate::enums::{QueryLanguage, WindowType};
use crate::traits::SerializableToSink;
use crate::utils::normalize_spatial_filter;
use promql_utilities::data_model::KeyByLabelNames;
use promql_utilities::query_logics::enums::AggregationType;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AggregationConfig {
pub aggregation_id: u64,
pub aggregation_type: String,
pub aggregation_type: AggregationType,
pub aggregation_sub_type: String,
pub parameters: HashMap<String, Value>,
pub grouping_labels: KeyByLabelNames,
pub aggregated_labels: KeyByLabelNames,
pub rollup_labels: KeyByLabelNames,
pub original_yaml: String,

pub window_size: u64, // Window size in seconds (e.g., 900s for 15m)
pub slide_interval: u64, // Slide/hop interval in seconds (e.g., 30s)
pub window_type: String, // "tumbling" or "sliding"
pub window_size: u64, // Window size in seconds (e.g., 900s for 15m)
pub slide_interval: u64, // Slide/hop interval in seconds (e.g., 30s)
pub window_type: WindowType, // Tumbling or Sliding

pub spatial_filter: String,
pub spatial_filter_normalized: String,
Expand All @@ -41,8 +42,8 @@ pub struct AggregationConfig {
pub struct AggregationIdInfo {
pub aggregation_id_for_key: u64,
pub aggregation_id_for_value: u64,
pub aggregation_type_for_key: String,
pub aggregation_type_for_value: String,
pub aggregation_type_for_key: AggregationType,
pub aggregation_type_for_value: AggregationType,
}

// TODO: need to implement deserialization methods
Expand All @@ -51,7 +52,7 @@ impl AggregationConfig {
#[allow(clippy::too_many_arguments)]
pub fn new(
aggregation_id: u64,
aggregation_type: String,
aggregation_type: AggregationType,
aggregation_sub_type: String,
parameters: HashMap<String, Value>,
grouping_labels: KeyByLabelNames,
Expand All @@ -60,7 +61,7 @@ impl AggregationConfig {
original_yaml: String,
window_size: u64,
slide_interval: u64,
window_type: String,
window_type: WindowType,
spatial_filter: String,
metric: String,
num_aggregates_to_retain: Option<u64>,
Expand Down Expand Up @@ -116,10 +117,11 @@ impl AggregationConfig {
.as_u64()
.ok_or("Missing aggregationId")?;

let aggregation_type = data["aggregationType"]
let aggregation_type: AggregationType = data["aggregationType"]
.as_str()
.ok_or("Missing aggregationType")?
.to_string();
.parse()
.map_err(|e: String| e)?;

let aggregation_sub_type = data["aggregationSubType"]
.as_str()
Expand Down Expand Up @@ -148,7 +150,8 @@ impl AggregationConfig {
.get("windowType")
.and_then(|v| v.as_str())
.unwrap_or("tumbling")
.to_string();
.parse::<WindowType>()
.unwrap_or_default();

let slide_interval = data
.get("slideInterval")
Expand Down Expand Up @@ -240,10 +243,11 @@ impl AggregationConfig {
.collect(),
);

let aggregation_type = aggregation_data["aggregationType"]
let aggregation_type: AggregationType = aggregation_data["aggregationType"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing aggregationType"))?
.to_string();
.parse()
.map_err(|e: String| anyhow::anyhow!(e))?;

let aggregation_sub_type = aggregation_data["aggregationSubType"]
.as_str()
Expand All @@ -270,7 +274,8 @@ impl AggregationConfig {
.get("windowType")
.and_then(|v| v.as_str())
.unwrap_or("tumbling")
.to_string();
.parse::<WindowType>()
.unwrap_or_default();

let slide_interval = aggregation_data
.get("slideInterval")
Expand Down Expand Up @@ -348,7 +353,7 @@ impl SerializableToSink for AggregationConfig {
"originalYaml": self.original_yaml,
"windowSize": self.window_size,
"slideInterval": self.slide_interval,
"windowType": self.window_type,
"windowType": self.window_type.to_string(),
"spatialFilter": self.spatial_filter,
"metric": self.metric,
});
Expand Down
67 changes: 32 additions & 35 deletions asap-common/dependencies/rs/asap_types/src/capability_matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,35 @@ use promql_utilities::query_logics::enums::Statistic;
use tracing::{debug, warn};

use crate::aggregation_config::{AggregationConfig, AggregationIdInfo};
use crate::enums::WindowType;
use crate::query_requirements::QueryRequirements;
use crate::utils::normalize_spatial_filter;
use promql_utilities::query_logics::enums::AggregationType;

// ---------------------------------------------------------------------------
// Pure compatibility helpers
// ---------------------------------------------------------------------------

/// Returns the aggregation_type strings that can serve this statistic.
pub fn compatible_agg_types(stat: Statistic) -> &'static [&'static str] {
/// Returns the aggregation types that can serve this statistic.
pub fn compatible_agg_types(stat: Statistic) -> &'static [AggregationType] {
match stat {
Statistic::Sum => &["Sum", "MultipleSumAccumulator"],
Statistic::Sum => &[AggregationType::Sum, AggregationType::MultipleSum],
Statistic::Count => &[
"CountMinSketch",
"CountMinSketchWithHeap",
"CountMinSketchWithHeapAccumulator",
AggregationType::CountMinSketch,
AggregationType::CountMinSketchWithHeap,
],
Statistic::Min => &["MinMax", "MultipleMinMaxAccumulator"],
Statistic::Max => &["MinMax", "MultipleMinMaxAccumulator"],
Statistic::Quantile => &["DatasketchesKLL", "HydraKLL"],
Statistic::Rate | Statistic::Increase => &["Increase", "MultipleIncreaseAccumulator"],
Statistic::Cardinality => &["SetAggregator", "DeltaSetAggregator"],
Statistic::Topk => &[
"CountMinSketchWithHeap",
"CountMinSketchWithHeapAccumulator",
Statistic::Min | Statistic::Max => {
&[AggregationType::MinMax, AggregationType::MultipleMinMax]
}
Statistic::Quantile => &[AggregationType::DatasketchesKLL, AggregationType::HydraKLL],
Statistic::Rate | Statistic::Increase => {
&[AggregationType::Increase, AggregationType::MultipleIncrease]
}
Statistic::Cardinality => &[
AggregationType::SetAggregator,
AggregationType::DeltaSetAggregator,
],
Statistic::Topk => &[AggregationType::CountMinSketchWithHeap],
}
}

Expand All @@ -46,20 +50,13 @@ pub fn required_sub_type(stat: Statistic) -> Option<&'static str> {

/// Whether this value aggregation type requires a paired key aggregation
/// (`SetAggregator` or `DeltaSetAggregator`).
pub fn is_multi_population_value_type(agg_type: &str) -> bool {
matches!(
agg_type,
"MultipleSumAccumulator"
| "MultipleMinMaxAccumulator"
| "MultipleIncreaseAccumulator"
| "CountMinSketchWithHeap"
| "CountMinSketchWithHeapAccumulator"
)
pub fn is_multi_population_value_type(agg_type: AggregationType) -> bool {
agg_type.is_multi_population_value_type()
}

/// Whether this type is a key aggregation (tracks which label-value combinations exist).
fn is_key_agg_type(agg_type: &str) -> bool {
matches!(agg_type, "SetAggregator" | "DeltaSetAggregator")
fn is_key_agg_type(agg_type: AggregationType) -> bool {
agg_type.is_key_agg_type()
}

/// Window compatibility: can `config` serve a query needing `data_range_ms`?
Expand All @@ -76,9 +73,9 @@ pub fn window_compatible(config: &AggregationConfig, data_range_ms: Option<u64>)
if window_ms == 0 || range == 0 {
return false;
}
match config.window_type.as_str() {
"sliding" => range == window_ms,
_ => range % window_ms == 0, // tumbling (or unknown — treat as tumbling)
match config.window_type {
WindowType::Sliding => range == window_ms,
WindowType::Tumbling => range % window_ms == 0,
}
}

Expand Down Expand Up @@ -152,7 +149,7 @@ pub fn find_compatible_aggregation(
.values()
.filter(|c| {
let ok = c.metric == requirements.metric
&& types.contains(&c.aggregation_type.as_str())
&& types.contains(&c.aggregation_type)
&& sub_type.is_none_or(|st| c.aggregation_sub_type == st)
&& window_compatible(c, requirements.data_range_ms)
&& labels_compatible(&c.grouping_labels, &requirements.grouping_labels)
Expand Down Expand Up @@ -218,11 +215,11 @@ pub fn find_compatible_aggregation(
}

// If value type is multi-population, find the paired key aggregation.
let key_agg: &AggregationConfig = if is_multi_population_value_type(&value_agg.aggregation_type)
let key_agg: &AggregationConfig = if is_multi_population_value_type(value_agg.aggregation_type)
{
let ka = configs
.values()
.find(|c| c.metric == requirements.metric && is_key_agg_type(&c.aggregation_type));
.find(|c| c.metric == requirements.metric && is_key_agg_type(c.aggregation_type));
if ka.is_none() {
warn!(
metric = %requirements.metric,
Expand All @@ -246,9 +243,9 @@ pub fn find_compatible_aggregation(

Some(AggregationIdInfo {
aggregation_id_for_value: value_agg.aggregation_id,
aggregation_type_for_value: value_agg.aggregation_type.clone(),
aggregation_type_for_value: value_agg.aggregation_type,
aggregation_id_for_key: key_agg.aggregation_id,
aggregation_type_for_key: key_agg.aggregation_type.clone(),
aggregation_type_for_key: key_agg.aggregation_type,
})
}

Expand Down Expand Up @@ -279,7 +276,7 @@ mod tests {
let spatial_filter_normalized = normalize_spatial_filter(spatial_filter);
AggregationConfig {
aggregation_id: id,
aggregation_type: agg_type.to_string(),
aggregation_type: agg_type.parse::<AggregationType>().expect("valid agg type"),
aggregation_sub_type: sub_type.to_string(),
parameters: HashMap::new(),
grouping_labels,
Expand All @@ -288,7 +285,7 @@ mod tests {
original_yaml: String::new(),
window_size: window_size_s,
slide_interval: window_size_s,
window_type: window_type.to_string(),
window_type: window_type.parse::<WindowType>().unwrap_or_default(),
spatial_filter: spatial_filter.to_string(),
spatial_filter_normalized,
metric: metric.to_string(),
Expand Down
61 changes: 61 additions & 0 deletions asap-common/dependencies/rs/asap_types/src/enums.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
use std::fmt;
use std::str::FromStr;

// Re-export AggregationType from promql_utilities (defined there to avoid circular deps).
pub use promql_utilities::query_logics::enums::AggregationType;

#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq)]
#[allow(non_camel_case_types)]
pub enum QueryLanguage {
Expand All @@ -23,3 +29,58 @@ pub enum CleanupPolicy {
/// Never clean up aggregates
NoCleanup,
}

impl fmt::Display for CleanupPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CleanupPolicy::CircularBuffer => write!(f, "circular_buffer"),
CleanupPolicy::ReadBased => write!(f, "read_based"),
CleanupPolicy::NoCleanup => write!(f, "no_cleanup"),
}
}
}

impl FromStr for CleanupPolicy {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"circular_buffer" => Ok(CleanupPolicy::CircularBuffer),
"read_based" => Ok(CleanupPolicy::ReadBased),
"no_cleanup" => Ok(CleanupPolicy::NoCleanup),
_ => Err(format!("Unknown cleanup policy: '{s}'")),
}
}
}

/// Window type for streaming aggregations.
#[derive(
Clone, Debug, Copy, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum WindowType {
#[default]
Tumbling,
Sliding,
}

impl fmt::Display for WindowType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WindowType::Tumbling => write!(f, "tumbling"),
WindowType::Sliding => write!(f, "sliding"),
}
}
}

impl FromStr for WindowType {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"tumbling" => Ok(WindowType::Tumbling),
"sliding" => Ok(WindowType::Sliding),
_ => Err(format!("Unknown window type: '{s}'")),
}
}
}
11 changes: 4 additions & 7 deletions asap-common/dependencies/rs/asap_types/src/inference_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,12 @@ impl InferenceConfig {
)
})?;

match name {
"circular_buffer" => Ok(CleanupPolicy::CircularBuffer),
"read_based" => Ok(CleanupPolicy::ReadBased),
"no_cleanup" => Ok(CleanupPolicy::NoCleanup),
_ => Err(anyhow::anyhow!(
name.parse::<CleanupPolicy>().map_err(|_| {
anyhow::anyhow!(
"Invalid cleanup policy: '{}'. Valid options: circular_buffer, read_based, no_cleanup",
name
)),
}
)
})
}

fn parse_query_configs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,11 @@ impl PromQLPattern {
debug!("Collecting aggregation token as: {}", collect_as);
let modifier = match &agg.modifier {
Some(LabelModifier::Include(labels)) => Some(AggregationModifier {
modifier_type: "by".to_string(),
modifier_type: AggregationModifierType::By,
labels: labels.labels.clone(),
}),
Some(LabelModifier::Exclude(labels)) => Some(AggregationModifier {
modifier_type: "without".to_string(),
modifier_type: AggregationModifierType::Without,
labels: labels.labels.clone(),
}),
None => None,
Expand Down Expand Up @@ -844,16 +844,24 @@ impl Default for PromQLMatchResult {
}
}

/// Whether a PromQL aggregation modifier is `by` or `without`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum AggregationModifierType {
By,
Without,
}

/// Represents aggregation modifiers like "by" or "without"
#[derive(Debug, Clone, Serialize)]
pub struct AggregationModifier {
pub modifier_type: String, // "by" or "without"
pub modifier_type: AggregationModifierType,
pub labels: Vec<String>,
}

impl AggregationModifier {
/// Create a new AggregationModifier
pub fn new(modifier_type: String, labels: Vec<String>) -> Self {
pub fn new(modifier_type: AggregationModifierType, labels: Vec<String>) -> Self {
Self {
modifier_type,
labels,
Expand Down
Loading
Loading