Skip to content

Commit fa4d117

Browse files
Merge remote-tracking branch 'origin/main' into 250-asap-planner-rs-auto-infer-metric-labels-from-prometheus
2 parents 084f1e2 + 6dac082 commit fa4d117

10 files changed

Lines changed: 325 additions & 128 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

asap-common/dependencies/rs/sketch_db_common/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod inference_config;
66
pub mod promql_schema;
77
pub mod query_config;
88
pub mod query_requirements;
9+
pub mod streaming_config;
910
pub mod traits;
1011
pub mod utils;
1112

@@ -17,3 +18,4 @@ pub use inference_config::*;
1718
pub use promql_schema::*;
1819
pub use query_config::*;
1920
pub use query_requirements::*;
21+
pub use streaming_config::*;
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use anyhow::Result;
2+
use core::panic;
3+
use serde::{Deserialize, Serialize};
4+
use serde_yaml::Value;
5+
use std::collections::HashMap;
6+
use std::fs::File;
7+
use std::io::BufReader;
8+
use std::ops::Index;
9+
10+
use crate::aggregation_config::{AggregationConfig, AggregationIdInfo};
11+
use crate::capability_matching::find_compatible_aggregation as common_find_compatible;
12+
use crate::enums::QueryLanguage;
13+
use crate::inference_config::{InferenceConfig, SchemaConfig};
14+
use crate::query_requirements::QueryRequirements;
15+
16+
#[derive(Debug, Clone, Serialize, Deserialize)]
17+
pub struct StreamingConfig {
18+
pub aggregation_configs: HashMap<u64, AggregationConfig>,
19+
}
20+
21+
impl StreamingConfig {
22+
pub fn new(aggregation_configs: HashMap<u64, AggregationConfig>) -> Self {
23+
Self {
24+
aggregation_configs,
25+
}
26+
}
27+
28+
pub fn get_aggregation_config(&self, aggregation_id: u64) -> Option<&AggregationConfig> {
29+
self.aggregation_configs.get(&aggregation_id)
30+
}
31+
32+
pub fn get_all_aggregation_configs(&self) -> &HashMap<u64, AggregationConfig> {
33+
&self.aggregation_configs
34+
}
35+
36+
pub fn contains(&self, aggregation_id: u64) -> bool {
37+
self.aggregation_configs.contains_key(&aggregation_id)
38+
}
39+
40+
pub fn from_yaml_file(yaml_file: &str) -> Result<Self> {
41+
let file = File::open(yaml_file)?;
42+
let reader = BufReader::new(file);
43+
let data: Value = serde_yaml::from_reader(reader)?;
44+
45+
Self::from_yaml_data(&data, None)
46+
}
47+
48+
pub fn from_yaml_data(
49+
data: &Value,
50+
inference_config: Option<&InferenceConfig>,
51+
) -> Result<Self> {
52+
let mut retention_map: HashMap<u64, u64> = HashMap::new();
53+
let mut read_count_threshold_map: HashMap<u64, u64> = HashMap::new();
54+
55+
if let Some(inference_config) = inference_config {
56+
for query_config in &inference_config.query_configs {
57+
for aggregation in &query_config.aggregations {
58+
let aggregation_id = aggregation.aggregation_id;
59+
if let Some(num_aggregates) = aggregation.num_aggregates_to_retain {
60+
// OLD: Keep last value only (for backwards compatibility)
61+
retention_map.insert(aggregation_id, num_aggregates);
62+
63+
// NEW: Sum up num_aggregates_to_retain across all queries
64+
*read_count_threshold_map.entry(aggregation_id).or_insert(0) +=
65+
num_aggregates;
66+
}
67+
}
68+
}
69+
}
70+
71+
// Derive query_language from inference_config schema
72+
let query_language = inference_config
73+
.map(|ic| match &ic.schema {
74+
SchemaConfig::PromQL(_) => QueryLanguage::promql,
75+
SchemaConfig::SQL(_) => QueryLanguage::sql,
76+
SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl,
77+
SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql,
78+
})
79+
.unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config
80+
81+
let mut aggregation_configs: HashMap<u64, AggregationConfig> = HashMap::new();
82+
83+
if let Some(aggregations) = data.get("aggregations").and_then(|v| v.as_sequence()) {
84+
for aggregation_data in aggregations {
85+
if let Some(aggregation_id) = aggregation_data.get("aggregationId") {
86+
let aggregation_id_u64 = aggregation_id.as_u64().or_else(|| panic!()).unwrap();
87+
let num_aggregates_to_retain = retention_map.get(&aggregation_id_u64);
88+
let read_count_threshold = read_count_threshold_map.get(&aggregation_id_u64);
89+
let config = AggregationConfig::from_yaml_data(
90+
aggregation_data,
91+
num_aggregates_to_retain.copied(),
92+
read_count_threshold.copied(),
93+
query_language,
94+
)?;
95+
aggregation_configs.insert(aggregation_id_u64, config);
96+
}
97+
}
98+
}
99+
100+
Ok(Self::new(aggregation_configs))
101+
}
102+
}
103+
104+
impl StreamingConfig {
105+
/// Find a compatible aggregation for the given requirements using capability-based matching.
106+
/// Delegates to `sketch_db_common::find_compatible_aggregation`.
107+
pub fn find_compatible_aggregation(
108+
&self,
109+
requirements: &QueryRequirements,
110+
) -> Option<AggregationIdInfo> {
111+
common_find_compatible(&self.aggregation_configs, requirements)
112+
}
113+
}
114+
115+
impl Index<u64> for StreamingConfig {
116+
type Output = AggregationConfig;
117+
118+
fn index(&self, aggregation_id: u64) -> &Self::Output {
119+
&self.aggregation_configs[&aggregation_id]
120+
}
121+
}
122+
123+
impl Default for StreamingConfig {
124+
fn default() -> Self {
125+
Self::new(HashMap::new())
126+
}
127+
}

asap-planner-rs/src/lib.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ pub mod query_log;
77

88
use promql_utilities::data_model::KeyByLabelNames;
99
use serde_yaml::Value as YamlValue;
10+
use sketch_db_common::enums::QueryLanguage;
11+
use sketch_db_common::inference_config::InferenceConfig;
12+
use sketch_db_common::streaming_config::StreamingConfig;
1013
use std::path::Path;
1114
use tracing::debug;
1215

@@ -22,6 +25,7 @@ pub use sketch_db_common::PromQLSchema;
2225
pub enum StreamingEngine {
2326
Arroyo,
2427
Flink,
28+
Precompute,
2529
}
2630

2731
#[derive(Debug, Clone)]
@@ -172,6 +176,21 @@ impl PlannerOutput {
172176
Ok(serde_yaml::to_string(&self.inference_yaml)?)
173177
}
174178

179+
pub fn to_streaming_config(
180+
&self,
181+
query_language: QueryLanguage,
182+
) -> Result<StreamingConfig, anyhow::Error> {
183+
let inference_config = self.to_inference_config(query_language)?;
184+
StreamingConfig::from_yaml_data(&self.streaming_yaml, Some(&inference_config))
185+
}
186+
187+
pub fn to_inference_config(
188+
&self,
189+
query_language: QueryLanguage,
190+
) -> Result<InferenceConfig, anyhow::Error> {
191+
InferenceConfig::from_yaml_data(&self.inference_yaml, query_language)
192+
}
193+
175194
/// Returns the table_name field of the first aggregation matching agg_type.
176195
pub fn aggregation_table_name(&self, agg_type: &str) -> Option<String> {
177196
if let YamlValue::Mapping(root) = &self.streaming_yaml {
@@ -249,6 +268,10 @@ pub struct SQLController {
249268
}
250269

251270
impl SQLController {
271+
pub fn new(config: SQLControllerConfig, options: SQLRuntimeOptions) -> Self {
272+
Self { config, options }
273+
}
274+
252275
pub fn from_file(path: &Path, opts: SQLRuntimeOptions) -> Result<Self, ControllerError> {
253276
let yaml_str = std::fs::read_to_string(path)?;
254277
Self::from_yaml(&yaml_str, opts)
@@ -285,6 +308,14 @@ impl SQLController {
285308
}
286309

287310
impl Controller {
311+
pub fn new(config: ControllerConfig, schema: PromQLSchema, options: RuntimeOptions) -> Self {
312+
Self {
313+
config,
314+
schema,
315+
options,
316+
}
317+
}
318+
288319
/// Build a `Controller` from a config file, fetching metric labels from Prometheus.
289320
///
290321
/// `prometheus_url` is queried via `GET /api/v1/series?match[]=<metric>` for each metric

asap-planner-rs/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ struct Args {
5151
enum EngineArg {
5252
Arroyo,
5353
Flink,
54+
Precompute,
5455
}
5556

5657
fn main() -> anyhow::Result<()> {
@@ -67,6 +68,7 @@ fn main() -> anyhow::Result<()> {
6768
let engine = match args.streaming_engine {
6869
EngineArg::Arroyo => StreamingEngine::Arroyo,
6970
EngineArg::Flink => StreamingEngine::Flink,
71+
EngineArg::Precompute => StreamingEngine::Precompute,
7072
};
7173

7274
match args.query_language {

asap-query-engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ promql_utilities.workspace = true
1010
sql_utilities.workspace = true
1111
sketch_db_common.workspace = true
1212
datafusion_summary_library.workspace = true
13+
asap_planner.workspace = true
1314

1415
# Shared external (workspace)
1516
serde.workspace = true

asap-query-engine/Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ RUN cargo build --release && rm -rf src/
4141

4242
# Copy source code
4343
COPY asap-query-engine/src ./src
44+
COPY asap-planner-rs/src /code/asap-planner-rs/src
4445

4546
# Build the actual application
46-
RUN touch src/main.rs && cargo build --release
47+
RUN touch src/main.rs && touch /code/asap-planner-rs/src/lib.rs && cargo build --release
4748

4849
# Runtime stage with Ubuntu 24.04 (has newer glibc/libstdc++)
4950
FROM ubuntu:24.04
Lines changed: 1 addition & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -1,127 +1 @@
1-
use anyhow::Result;
2-
use core::panic;
3-
use serde::{Deserialize, Serialize};
4-
use serde_yaml::Value;
5-
use std::collections::HashMap;
6-
use std::fs::File;
7-
use std::io::BufReader;
8-
use std::ops::Index;
9-
10-
use crate::data_model::aggregation_config::{AggregationConfig, AggregationIdInfo};
11-
use crate::data_model::enums::QueryLanguage;
12-
use crate::data_model::inference_config::{InferenceConfig, SchemaConfig};
13-
use sketch_db_common::capability_matching::find_compatible_aggregation as common_find_compatible;
14-
use sketch_db_common::query_requirements::QueryRequirements;
15-
16-
#[derive(Debug, Clone, Serialize, Deserialize)]
17-
pub struct StreamingConfig {
18-
pub aggregation_configs: HashMap<u64, AggregationConfig>,
19-
}
20-
21-
impl StreamingConfig {
22-
pub fn new(aggregation_configs: HashMap<u64, AggregationConfig>) -> Self {
23-
Self {
24-
aggregation_configs,
25-
}
26-
}
27-
28-
pub fn get_aggregation_config(&self, aggregation_id: u64) -> Option<&AggregationConfig> {
29-
self.aggregation_configs.get(&aggregation_id)
30-
}
31-
32-
pub fn get_all_aggregation_configs(&self) -> &HashMap<u64, AggregationConfig> {
33-
&self.aggregation_configs
34-
}
35-
36-
pub fn contains(&self, aggregation_id: u64) -> bool {
37-
self.aggregation_configs.contains_key(&aggregation_id)
38-
}
39-
40-
pub fn from_yaml_file(yaml_file: &str) -> Result<Self> {
41-
let file = File::open(yaml_file)?;
42-
let reader = BufReader::new(file);
43-
let data: Value = serde_yaml::from_reader(reader)?;
44-
45-
Self::from_yaml_data(&data, None)
46-
}
47-
48-
pub fn from_yaml_data(
49-
data: &Value,
50-
inference_config: Option<&InferenceConfig>,
51-
) -> Result<Self> {
52-
let mut retention_map: HashMap<u64, u64> = HashMap::new();
53-
let mut read_count_threshold_map: HashMap<u64, u64> = HashMap::new();
54-
55-
if let Some(inference_config) = inference_config {
56-
for query_config in &inference_config.query_configs {
57-
for aggregation in &query_config.aggregations {
58-
let aggregation_id = aggregation.aggregation_id;
59-
if let Some(num_aggregates) = aggregation.num_aggregates_to_retain {
60-
// OLD: Keep last value only (for backwards compatibility)
61-
retention_map.insert(aggregation_id, num_aggregates);
62-
63-
// NEW: Sum up num_aggregates_to_retain across all queries
64-
*read_count_threshold_map.entry(aggregation_id).or_insert(0) +=
65-
num_aggregates;
66-
}
67-
}
68-
}
69-
}
70-
71-
// Derive query_language from inference_config schema
72-
let query_language = inference_config
73-
.map(|ic| match &ic.schema {
74-
SchemaConfig::PromQL(_) => QueryLanguage::promql,
75-
SchemaConfig::SQL(_) => QueryLanguage::sql,
76-
SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl,
77-
SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql,
78-
})
79-
.unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config
80-
81-
let mut aggregation_configs: HashMap<u64, AggregationConfig> = HashMap::new();
82-
83-
if let Some(aggregations) = data.get("aggregations").and_then(|v| v.as_sequence()) {
84-
for aggregation_data in aggregations {
85-
if let Some(aggregation_id) = aggregation_data.get("aggregationId") {
86-
let aggregation_id_u64 = aggregation_id.as_u64().or_else(|| panic!()).unwrap();
87-
let num_aggregates_to_retain = retention_map.get(&aggregation_id_u64);
88-
let read_count_threshold = read_count_threshold_map.get(&aggregation_id_u64);
89-
let config = AggregationConfig::from_yaml_data(
90-
aggregation_data,
91-
num_aggregates_to_retain.copied(),
92-
read_count_threshold.copied(),
93-
query_language,
94-
)?;
95-
aggregation_configs.insert(aggregation_id_u64, config);
96-
}
97-
}
98-
}
99-
100-
Ok(Self::new(aggregation_configs))
101-
}
102-
}
103-
104-
impl StreamingConfig {
105-
/// Find a compatible aggregation for the given requirements using capability-based matching.
106-
/// Delegates to `sketch_db_common::find_compatible_aggregation`.
107-
pub fn find_compatible_aggregation(
108-
&self,
109-
requirements: &QueryRequirements,
110-
) -> Option<AggregationIdInfo> {
111-
common_find_compatible(&self.aggregation_configs, requirements)
112-
}
113-
}
114-
115-
impl Index<u64> for StreamingConfig {
116-
type Output = AggregationConfig;
117-
118-
fn index(&self, aggregation_id: u64) -> &Self::Output {
119-
&self.aggregation_configs[&aggregation_id]
120-
}
121-
}
122-
123-
impl Default for StreamingConfig {
124-
fn default() -> Self {
125-
Self::new(HashMap::new())
126-
}
127-
}
1+
pub use sketch_db_common::streaming_config::*;

asap-query-engine/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ fn init_sketch_backend_for_tests() {
1414
pub mod data_model;
1515
pub mod drivers;
1616
pub mod engines;
17+
pub mod planner_client;
1718
pub mod precompute_engine;
1819
pub mod precompute_operators;
1920
pub mod stores;

0 commit comments

Comments
 (0)