Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions asap-common/dependencies/rs/sketch_db_common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod inference_config;
pub mod promql_schema;
pub mod query_config;
pub mod query_requirements;
pub mod streaming_config;
pub mod traits;
pub mod utils;

Expand All @@ -17,3 +18,4 @@ pub use inference_config::*;
pub use promql_schema::*;
pub use query_config::*;
pub use query_requirements::*;
pub use streaming_config::*;
127 changes: 127 additions & 0 deletions asap-common/dependencies/rs/sketch_db_common/src/streaming_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use anyhow::Result;
use core::panic;
use serde::{Deserialize, Serialize};
use serde_yaml::Value;
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::ops::Index;

use crate::aggregation_config::{AggregationConfig, AggregationIdInfo};
use crate::capability_matching::find_compatible_aggregation as common_find_compatible;
use crate::enums::QueryLanguage;
use crate::inference_config::{InferenceConfig, SchemaConfig};
use crate::query_requirements::QueryRequirements;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingConfig {
pub aggregation_configs: HashMap<u64, AggregationConfig>,
}

impl StreamingConfig {
pub fn new(aggregation_configs: HashMap<u64, AggregationConfig>) -> Self {
Self {
aggregation_configs,
}
}

pub fn get_aggregation_config(&self, aggregation_id: u64) -> Option<&AggregationConfig> {
self.aggregation_configs.get(&aggregation_id)
}

pub fn get_all_aggregation_configs(&self) -> &HashMap<u64, AggregationConfig> {
&self.aggregation_configs
}

pub fn contains(&self, aggregation_id: u64) -> bool {
self.aggregation_configs.contains_key(&aggregation_id)
}

pub fn from_yaml_file(yaml_file: &str) -> Result<Self> {
let file = File::open(yaml_file)?;
let reader = BufReader::new(file);
let data: Value = serde_yaml::from_reader(reader)?;

Self::from_yaml_data(&data, None)
}

pub fn from_yaml_data(
data: &Value,
inference_config: Option<&InferenceConfig>,
) -> Result<Self> {
let mut retention_map: HashMap<u64, u64> = HashMap::new();
let mut read_count_threshold_map: HashMap<u64, u64> = HashMap::new();

if let Some(inference_config) = inference_config {
for query_config in &inference_config.query_configs {
for aggregation in &query_config.aggregations {
let aggregation_id = aggregation.aggregation_id;
if let Some(num_aggregates) = aggregation.num_aggregates_to_retain {
// OLD: Keep last value only (for backwards compatibility)
retention_map.insert(aggregation_id, num_aggregates);

// NEW: Sum up num_aggregates_to_retain across all queries
*read_count_threshold_map.entry(aggregation_id).or_insert(0) +=
num_aggregates;
}
}
}
}

// Derive query_language from inference_config schema
let query_language = inference_config
.map(|ic| match &ic.schema {
SchemaConfig::PromQL(_) => QueryLanguage::promql,
SchemaConfig::SQL(_) => QueryLanguage::sql,
SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl,
SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql,
})
.unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config

let mut aggregation_configs: HashMap<u64, AggregationConfig> = HashMap::new();

if let Some(aggregations) = data.get("aggregations").and_then(|v| v.as_sequence()) {
for aggregation_data in aggregations {
if let Some(aggregation_id) = aggregation_data.get("aggregationId") {
let aggregation_id_u64 = aggregation_id.as_u64().or_else(|| panic!()).unwrap();
let num_aggregates_to_retain = retention_map.get(&aggregation_id_u64);
let read_count_threshold = read_count_threshold_map.get(&aggregation_id_u64);
let config = AggregationConfig::from_yaml_data(
aggregation_data,
num_aggregates_to_retain.copied(),
read_count_threshold.copied(),
query_language,
)?;
aggregation_configs.insert(aggregation_id_u64, config);
}
}
}

Ok(Self::new(aggregation_configs))
}
}

impl StreamingConfig {
/// Find a compatible aggregation for the given requirements using capability-based matching.
/// Delegates to `sketch_db_common::find_compatible_aggregation`.
pub fn find_compatible_aggregation(
&self,
requirements: &QueryRequirements,
) -> Option<AggregationIdInfo> {
common_find_compatible(&self.aggregation_configs, requirements)
}
}

impl Index<u64> for StreamingConfig {
type Output = AggregationConfig;

fn index(&self, aggregation_id: u64) -> &Self::Output {
&self.aggregation_configs[&aggregation_id]
}
}

impl Default for StreamingConfig {
fn default() -> Self {
Self::new(HashMap::new())
}
}
27 changes: 27 additions & 0 deletions asap-planner-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ pub mod planner;
pub mod query_log;

use serde_yaml::Value as YamlValue;
use sketch_db_common::enums::QueryLanguage;
use sketch_db_common::inference_config::InferenceConfig;
use sketch_db_common::streaming_config::StreamingConfig;
use std::path::Path;

pub use config::input::ControllerConfig;
Expand All @@ -17,6 +20,7 @@ pub use output::sql_generator::SQLRuntimeOptions;
pub enum StreamingEngine {
Arroyo,
Flink,
Precompute,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -166,6 +170,21 @@ impl PlannerOutput {
Ok(serde_yaml::to_string(&self.inference_yaml)?)
}

pub fn to_streaming_config(
&self,
query_language: QueryLanguage,
) -> Result<StreamingConfig, anyhow::Error> {
let inference_config = self.to_inference_config(query_language)?;
StreamingConfig::from_yaml_data(&self.streaming_yaml, Some(&inference_config))
}

pub fn to_inference_config(
&self,
query_language: QueryLanguage,
) -> Result<InferenceConfig, anyhow::Error> {
InferenceConfig::from_yaml_data(&self.inference_yaml, query_language)
}

/// Returns the table_name field of the first aggregation matching agg_type.
pub fn aggregation_table_name(&self, agg_type: &str) -> Option<String> {
if let YamlValue::Mapping(root) = &self.streaming_yaml {
Expand Down Expand Up @@ -243,6 +262,10 @@ pub struct SQLController {
}

impl SQLController {
pub fn new(config: SQLControllerConfig, options: SQLRuntimeOptions) -> Self {
Self { config, options }
}

pub fn from_file(path: &Path, opts: SQLRuntimeOptions) -> Result<Self, ControllerError> {
let yaml_str = std::fs::read_to_string(path)?;
Self::from_yaml(&yaml_str, opts)
Expand Down Expand Up @@ -279,6 +302,10 @@ impl SQLController {
}

impl Controller {
pub fn new(config: ControllerConfig, options: RuntimeOptions) -> Self {
Self { config, options }
}

pub fn from_file(path: &Path, opts: RuntimeOptions) -> Result<Self, ControllerError> {
let yaml_str = std::fs::read_to_string(path)?;
Self::from_yaml(&yaml_str, opts)
Expand Down
2 changes: 2 additions & 0 deletions asap-planner-rs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ struct Args {
enum EngineArg {
Arroyo,
Flink,
Precompute,
}

fn main() -> anyhow::Result<()> {
Expand All @@ -66,6 +67,7 @@ fn main() -> anyhow::Result<()> {
let engine = match args.streaming_engine {
EngineArg::Arroyo => StreamingEngine::Arroyo,
EngineArg::Flink => StreamingEngine::Flink,
EngineArg::Precompute => StreamingEngine::Precompute,
};

match args.query_language {
Expand Down
1 change: 1 addition & 0 deletions asap-query-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ promql_utilities.workspace = true
sql_utilities.workspace = true
sketch_db_common.workspace = true
datafusion_summary_library.workspace = true
asap_planner.workspace = true

# Shared external (workspace)
serde.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion asap-query-engine/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ RUN cargo build --release && rm -rf src/

# Copy source code
COPY asap-query-engine/src ./src
COPY asap-planner-rs/src /code/asap-planner-rs/src

# Build the actual application
RUN touch src/main.rs && cargo build --release
RUN touch src/main.rs && touch /code/asap-planner-rs/src/lib.rs && cargo build --release

# Runtime stage with Ubuntu 24.04 (has newer glibc/libstdc++)
FROM ubuntu:24.04
Expand Down
128 changes: 1 addition & 127 deletions asap-query-engine/src/data_model/streaming_config.rs
Original file line number Diff line number Diff line change
@@ -1,127 +1 @@
use anyhow::Result;
use core::panic;
use serde::{Deserialize, Serialize};
use serde_yaml::Value;
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::ops::Index;

use crate::data_model::aggregation_config::{AggregationConfig, AggregationIdInfo};
use crate::data_model::enums::QueryLanguage;
use crate::data_model::inference_config::{InferenceConfig, SchemaConfig};
use sketch_db_common::capability_matching::find_compatible_aggregation as common_find_compatible;
use sketch_db_common::query_requirements::QueryRequirements;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingConfig {
pub aggregation_configs: HashMap<u64, AggregationConfig>,
}

impl StreamingConfig {
pub fn new(aggregation_configs: HashMap<u64, AggregationConfig>) -> Self {
Self {
aggregation_configs,
}
}

pub fn get_aggregation_config(&self, aggregation_id: u64) -> Option<&AggregationConfig> {
self.aggregation_configs.get(&aggregation_id)
}

pub fn get_all_aggregation_configs(&self) -> &HashMap<u64, AggregationConfig> {
&self.aggregation_configs
}

pub fn contains(&self, aggregation_id: u64) -> bool {
self.aggregation_configs.contains_key(&aggregation_id)
}

pub fn from_yaml_file(yaml_file: &str) -> Result<Self> {
let file = File::open(yaml_file)?;
let reader = BufReader::new(file);
let data: Value = serde_yaml::from_reader(reader)?;

Self::from_yaml_data(&data, None)
}

pub fn from_yaml_data(
data: &Value,
inference_config: Option<&InferenceConfig>,
) -> Result<Self> {
let mut retention_map: HashMap<u64, u64> = HashMap::new();
let mut read_count_threshold_map: HashMap<u64, u64> = HashMap::new();

if let Some(inference_config) = inference_config {
for query_config in &inference_config.query_configs {
for aggregation in &query_config.aggregations {
let aggregation_id = aggregation.aggregation_id;
if let Some(num_aggregates) = aggregation.num_aggregates_to_retain {
// OLD: Keep last value only (for backwards compatibility)
retention_map.insert(aggregation_id, num_aggregates);

// NEW: Sum up num_aggregates_to_retain across all queries
*read_count_threshold_map.entry(aggregation_id).or_insert(0) +=
num_aggregates;
}
}
}
}

// Derive query_language from inference_config schema
let query_language = inference_config
.map(|ic| match &ic.schema {
SchemaConfig::PromQL(_) => QueryLanguage::promql,
SchemaConfig::SQL(_) => QueryLanguage::sql,
SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl,
SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql,
})
.unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config

let mut aggregation_configs: HashMap<u64, AggregationConfig> = HashMap::new();

if let Some(aggregations) = data.get("aggregations").and_then(|v| v.as_sequence()) {
for aggregation_data in aggregations {
if let Some(aggregation_id) = aggregation_data.get("aggregationId") {
let aggregation_id_u64 = aggregation_id.as_u64().or_else(|| panic!()).unwrap();
let num_aggregates_to_retain = retention_map.get(&aggregation_id_u64);
let read_count_threshold = read_count_threshold_map.get(&aggregation_id_u64);
let config = AggregationConfig::from_yaml_data(
aggregation_data,
num_aggregates_to_retain.copied(),
read_count_threshold.copied(),
query_language,
)?;
aggregation_configs.insert(aggregation_id_u64, config);
}
}
}

Ok(Self::new(aggregation_configs))
}
}

impl StreamingConfig {
/// Find a compatible aggregation for the given requirements using capability-based matching.
/// Delegates to `sketch_db_common::find_compatible_aggregation`.
pub fn find_compatible_aggregation(
&self,
requirements: &QueryRequirements,
) -> Option<AggregationIdInfo> {
common_find_compatible(&self.aggregation_configs, requirements)
}
}

impl Index<u64> for StreamingConfig {
type Output = AggregationConfig;

fn index(&self, aggregation_id: u64) -> &Self::Output {
&self.aggregation_configs[&aggregation_id]
}
}

impl Default for StreamingConfig {
fn default() -> Self {
Self::new(HashMap::new())
}
}
pub use sketch_db_common::streaming_config::*;
1 change: 1 addition & 0 deletions asap-query-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fn init_sketch_backend_for_tests() {
pub mod data_model;
pub mod drivers;
pub mod engines;
pub mod planner_client;
pub mod precompute_engine;
pub mod precompute_operators;
pub mod stores;
Expand Down
Loading
Loading