Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion asap-query-engine/src/bin/precompute_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
fallback: None,
},
};
let http_server = HttpServer::new(http_config, query_engine, store.clone());
let http_server = HttpServer::new(http_config, query_engine, store.clone(), None);
tokio::spawn(async move {
if let Err(e) = http_server.run().await {
tracing::error!("Query server error: {}", e);
Expand Down
2 changes: 1 addition & 1 deletion asap-query-engine/src/bin/test_e2e_precompute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
fallback: None,
},
};
let http_server = HttpServer::new(http_config, query_engine, store.clone());
let http_server = HttpServer::new(http_config, query_engine, store.clone(), None);
tokio::spawn(async move {
if let Err(e) = http_server.run().await {
eprintln!("Query server error: {e}");
Expand Down
24 changes: 23 additions & 1 deletion asap-query-engine/src/drivers/query/servers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tracing::{debug, info};

use crate::drivers::query::adapters::{create_http_adapter, AdapterConfig, HttpProtocolAdapter};
use crate::engines::SimpleEngine;
use crate::query_tracker::QueryTracker;
use crate::stores::Store;

#[derive(Debug, Clone)]
Expand All @@ -30,13 +31,15 @@ pub struct HttpServer {
config: HttpServerConfig,
query_engine: Arc<SimpleEngine>,
store: Arc<dyn Store>,
query_tracker: Option<Arc<QueryTracker>>,
}

#[derive(Clone)]
struct AppState {
config: HttpServerConfig,
query_engine: Arc<SimpleEngine>,
store: Arc<dyn Store>,
query_tracker: Option<Arc<QueryTracker>>,
adapter: Arc<dyn HttpProtocolAdapter>,
fallback: Option<Arc<dyn crate::drivers::query::fallback::FallbackClient>>,
}
Expand All @@ -46,11 +49,13 @@ impl HttpServer {
config: HttpServerConfig,
query_engine: Arc<SimpleEngine>,
store: Arc<dyn Store>,
query_tracker: Option<Arc<QueryTracker>>,
) -> Self {
Self {
config,
query_engine,
store,
query_tracker,
}
}

Expand All @@ -71,6 +76,7 @@ impl HttpServer {
config: self.config.clone(),
query_engine: self.query_engine,
store: self.store,
query_tracker: self.query_tracker,
adapter: adapter.clone(),
fallback: self.config.adapter_config.fallback.clone(),
};
Expand Down Expand Up @@ -108,6 +114,7 @@ impl HttpServer {
config: self.config.clone(),
query_engine: self.query_engine.clone(),
store: self.store.clone(),
query_tracker: self.query_tracker.clone(),
adapter: adapter.clone(),
fallback: self.config.adapter_config.fallback.clone(),
};
Expand Down Expand Up @@ -171,6 +178,11 @@ async fn process_query_request(
}
}

// Record query for passive auto-discovery (if tracker is enabled)
if let Some(tracker) = &state.query_tracker {
tracker.record_instant(&parsed_request.query, parsed_request.time);
}

// Step 2: Execute query with engine (using parsed request)
let query_start_time = Instant::now();
debug!(
Expand Down Expand Up @@ -448,6 +460,16 @@ async fn process_range_query_request(
};
}

// Record query for passive auto-discovery (if tracker is enabled)
if let Some(tracker) = &state.query_tracker {
tracker.record_range(
&parsed_request.query,
parsed_request.start,
parsed_request.end,
parsed_request.step,
);
}

// Execute range query with engine
let query_start_time = Instant::now();
debug!(
Expand Down Expand Up @@ -611,7 +633,7 @@ mod tests {
crate::data_model::QueryLanguage::promql,
));

let server = HttpServer::new(config, query_engine, store);
let server = HttpServer::new(config, query_engine, store, None);
server
.start_test_server()
.await
Expand Down
3 changes: 3 additions & 0 deletions asap-query-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod engines;
pub mod planner_client;
pub mod precompute_engine;
pub mod precompute_operators;
pub mod query_tracker;
pub mod stores;

#[cfg(test)]
Expand Down Expand Up @@ -48,6 +49,8 @@ pub use precompute_engine::config::{LateDataPolicy, PrecomputeEngineConfig};
pub use precompute_engine::output_sink::StoreOutputSink;
pub use precompute_engine::PrecomputeEngine;

pub use query_tracker::{QueryTracker, QueryTrackerConfig};

pub use utils::{normalize_spatial_filter, read_inference_config, read_streaming_config};

pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
41 changes: 40 additions & 1 deletion asap-query-engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ struct Args {
/// Capacity of the channel between router and each worker
#[arg(long, default_value = "10000")]
precompute_channel_buffer_size: usize,

/// Enable automatic query tracking and planning
#[arg(long)]
enable_query_tracker: bool,

/// Query tracker: observation window in seconds before triggering planning
#[arg(long, default_value = "100")]
tracker_observation_window_secs: u64,
}

#[tokio::main]
Expand Down Expand Up @@ -342,7 +350,38 @@ async fn main() -> Result<()> {
adapter_config,
};

let server = HttpServer::new(http_config, engine, store);
let query_tracker = if args.enable_query_tracker {
use query_engine_rust::planner_client::LocalPlannerClient;
use query_engine_rust::QueryTrackerConfig;

let tracker_config = QueryTrackerConfig {
observation_window_secs: args.tracker_observation_window_secs,
prometheus_scrape_interval: args.prometheus_scrape_interval,
};
let runtime_options = asap_planner::RuntimeOptions {
prometheus_scrape_interval: args.prometheus_scrape_interval,
streaming_engine: asap_planner::StreamingEngine::Precompute,
enable_punting: false,
range_duration: 300,
step: args.prometheus_scrape_interval,
};
let planner_client = Arc::new(LocalPlannerClient::new(
runtime_options,
args.query_language,
args.prometheus_server.clone(),
));
let tracker = Arc::new(query_engine_rust::QueryTracker::new(tracker_config));
let _tracker_handle = tracker.start_background_loop(planner_client);
info!(
"Query tracker enabled (observation window: {}s)",
args.tracker_observation_window_secs
);
Some(tracker)
} else {
None
};

let server = HttpServer::new(http_config, engine, store, query_tracker);
info!("Starting HTTP server on port {}", args.http_port);

// Wait for shutdown signal
Expand Down
49 changes: 45 additions & 4 deletions asap-query-engine/src/planner_client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use anyhow::Result;
use asap_planner::{Controller, ControllerConfig, PlannerOutput, RuntimeOptions};
use asap_planner::{
build_schema_from_prometheus, Controller, ControllerConfig, PlannerOutput, RuntimeOptions,
};
use sketch_db_common::enums::QueryLanguage;
use sketch_db_common::inference_config::InferenceConfig;
use sketch_db_common::streaming_config::StreamingConfig;
use tracing::warn;

pub struct PlannerResult {
pub streaming_config: StreamingConfig,
Expand All @@ -18,13 +21,19 @@ pub trait PlannerClient: Send + Sync {
pub struct LocalPlannerClient {
runtime_options: RuntimeOptions,
query_language: QueryLanguage,
prometheus_url: String,
}

impl LocalPlannerClient {
pub fn new(runtime_options: RuntimeOptions, query_language: QueryLanguage) -> Self {
pub fn new(
runtime_options: RuntimeOptions,
query_language: QueryLanguage,
prometheus_url: String,
) -> Self {
Self {
runtime_options,
query_language,
prometheus_url,
}
}
}
Expand All @@ -34,9 +43,35 @@ impl PlannerClient for LocalPlannerClient {
async fn plan(&self, config: ControllerConfig) -> Result<PlannerResult> {
let opts = self.runtime_options.clone();
let query_language = self.query_language;
let prometheus_url = self.prometheus_url.clone();

let output: PlannerOutput = tokio::task::spawn_blocking(move || {
let schema = config.schema_from_hints();
let all_queries: Vec<String> = config
.query_groups
.iter()
.flat_map(|qg| qg.queries.clone())
.collect();
let mut schema = match build_schema_from_prometheus(&prometheus_url, &all_queries) {
Ok(s) => s,
Err(e) => {
warn!(
"Prometheus metric discovery failed, falling back to config hints: {}",
e
);
config.schema_from_hints()
}
};
// Fall back to config-file hints for metrics not found in Prometheus.
if let Some(metric_hints) = &config.metrics {
for hint in metric_hints {
if !schema.config.contains_key(&hint.metric) {
schema = schema.add_metric(
hint.metric.clone(),
promql_utilities::data_model::KeyByLabelNames::new(hint.labels.clone()),
);
}
}
}
let controller = Controller::new(config, schema, opts);
controller.generate()
})
Expand Down Expand Up @@ -127,7 +162,13 @@ mod tests {

#[tokio::test]
async fn test_local_planner_client() {
let client = LocalPlannerClient::new(sample_runtime_options(), QueryLanguage::promql);
// Use a dummy URL; the test config has metric hints so Prometheus discovery
// failures are tolerated via the fallback path.
let client = LocalPlannerClient::new(
sample_runtime_options(),
QueryLanguage::promql,
"http://localhost:9090".to_string(),
);
let config = sample_controller_config();

let result = client.plan(config).await.expect("plan should succeed");
Expand Down
3 changes: 3 additions & 0 deletions asap-query-engine/src/query_tracker/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod tracker;

pub use tracker::{QueryTracker, QueryTrackerConfig};
Loading
Loading