From 89c45f84d4c839ae649583c9be30897274c2ac30 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Fri, 1 May 2026 09:58:28 -0400 Subject: [PATCH] fix: added fallback for range queries --- asap-dropin/README.md | 2 + .../src/drivers/query/fallback/mod.rs | 17 +- .../src/drivers/query/fallback/prometheus.rs | 66 ++++++- .../src/drivers/query/servers/http.rs | 91 +++++++-- .../src/tests/prometheus_forwarding_tests.rs | 178 +++++++++++++++++- 5 files changed, 332 insertions(+), 22 deletions(-) diff --git a/asap-dropin/README.md b/asap-dropin/README.md index 4a3a8448..ce463466 100644 --- a/asap-dropin/README.md +++ b/asap-dropin/README.md @@ -168,6 +168,8 @@ kill -HUP $(pgrep prometheus) See the [Prometheus configuration docs](https://prometheus.io/docs/prometheus/latest/configuration/configuration/) for more details on reloading. +> **Note:** Most editors (vim, VS Code, etc.) replace the file on save rather than writing in place, which changes the inode. If Prometheus is running in Docker, its bind-mount tracks the original inode, so `/-/reload` will silently reload the old content. To avoid this: mount the config **directory** instead of the file (e.g. `-v ./prometheus-config:/etc/prometheus`), which lets Docker follow file replacements correctly. Alternatively, restart the Prometheus container after editing. + ### Step 4 — Add an ASAPQuery datasource in Grafana Create a new datasource in Grafana pointing at ASAPQuery, then switch your dashboards to use it. diff --git a/asap-query-engine/src/drivers/query/fallback/mod.rs b/asap-query-engine/src/drivers/query/fallback/mod.rs index 6b42f516..5add3333 100644 --- a/asap-query-engine/src/drivers/query/fallback/mod.rs +++ b/asap-query-engine/src/drivers/query/fallback/mod.rs @@ -6,7 +6,7 @@ use axum::{ use serde_json::Value; use std::collections::HashMap; -use crate::drivers::query::adapters::ParsedQueryRequest; +use crate::drivers::query::adapters::{ParsedQueryRequest, ParsedRangeQueryRequest}; /// Response format from fallback backend #[derive(Debug, Clone)] @@ -69,6 +69,21 @@ pub trait FallbackClient: Send + Sync { Ok(serde_json::json!({})) } + async fn execute_range_query( + &self, + _request: &ParsedRangeQueryRequest, + ) -> Result { + Err(StatusCode::NOT_IMPLEMENTED) + } + + async fn execute_range_query_with_headers( + &self, + request: &ParsedRangeQueryRequest, + _headers: HashMap, + ) -> Result { + self.execute_range_query(request).await + } + async fn get_runtime_info_with_headers( &self, headers: HashMap, diff --git a/asap-query-engine/src/drivers/query/fallback/prometheus.rs b/asap-query-engine/src/drivers/query/fallback/prometheus.rs index 45f040e2..36a1200a 100644 --- a/asap-query-engine/src/drivers/query/fallback/prometheus.rs +++ b/asap-query-engine/src/drivers/query/fallback/prometheus.rs @@ -1,5 +1,5 @@ use super::{FallbackClient, FallbackResponse}; -use crate::drivers::query::adapters::ParsedQueryRequest; +use crate::drivers::query::adapters::{ParsedQueryRequest, ParsedRangeQueryRequest}; use async_trait::async_trait; use axum::http::StatusCode; use reqwest::Client; @@ -95,6 +95,70 @@ impl FallbackClient for PrometheusHttpFallback { } } + async fn execute_range_query( + &self, + request: &ParsedRangeQueryRequest, + ) -> Result { + debug!("=== FORWARDING RANGE QUERY TO PROMETHEUS ==="); + debug!( + "Forwarding range query: '{}', start: {}, end: {}, step: {}", + request.query, request.start, request.end, request.step + ); + + let full_url = format!("{}/api/v1/query_range", self.base_url.trim_end_matches('/')); + + let query_params = vec![ + ("query", request.query.clone()), + ("start", request.start.to_string()), + ("end", request.end.to_string()), + ("step", request.step.to_string()), + ]; + + match self + .client + .get(&full_url) + .query(&query_params) + .timeout(std::time::Duration::from_secs(30)) + .send() + .await + { + Ok(response) => { + let status = response.status(); + debug!( + "Received range query response from Prometheus, status: {}", + status + ); + match response.json::().await { + Ok(prometheus_response) => { + debug!("=== PROMETHEUS RANGE FORWARD SUCCESS ==="); + Ok(FallbackResponse::Json(prometheus_response)) + } + Err(parse_err) => { + error!( + "Failed to parse Prometheus range query response: {}", + parse_err + ); + use crate::drivers::query::adapters::PrometheusResponse; + let error = PrometheusResponse::error( + "internal", + "Failed to parse Prometheus range query response", + ); + Ok(FallbackResponse::Json(serde_json::to_value(error).unwrap())) + } + } + } + Err(req_err) => { + error!("Failed to forward range query to Prometheus: {}", req_err); + use crate::drivers::query::adapters::PrometheusResponse; + let error = PrometheusResponse::error( + "internal", + &format!("Failed to forward range query to Prometheus: {}", req_err), + ); + Ok(FallbackResponse::Json(serde_json::to_value(error).unwrap())) + } + } + } + async fn get_runtime_info(&self) -> Result { debug!("Fetching runtime info from Prometheus fallback"); diff --git a/asap-query-engine/src/drivers/query/servers/http.rs b/asap-query-engine/src/drivers/query/servers/http.rs index 7eee5880..a3fb49a3 100644 --- a/asap-query-engine/src/drivers/query/servers/http.rs +++ b/asap-query-engine/src/drivers/query/servers/http.rs @@ -458,22 +458,34 @@ async fn process_range_query_request( state: &AppState, parsed_request: &ParsedRangeQueryRequest, start_time: Instant, + headers: HashMap, ) -> Response { // Check if handling is enabled if !state.config.handle_http_requests { debug!("HTTP request handling is disabled for range query"); - // For now, return error - fallback for range queries can be added later - use crate::drivers::query::adapters::AdapterError; - return match state - .adapter - .format_error_response(&AdapterError::ProtocolError( - "Range query handling is disabled".to_string(), - )) - .await - { - Ok(json) => json.into_response(), - Err(status) => status.into_response(), - }; + if let Some(fallback) = &state.fallback { + debug!("Forwarding range query to fallback due to disabled handling"); + return match fallback + .execute_range_query_with_headers(parsed_request, headers) + .await + { + Ok(response) => response.into_response(), + Err(status) => status.into_response(), + }; + } else { + debug!("Returning error - both handling and forwarding disabled"); + use crate::drivers::query::adapters::AdapterError; + return match state + .adapter + .format_error_response(&AdapterError::ProtocolError( + "Range query handling is disabled".to_string(), + )) + .await + { + Ok(json) => json.into_response(), + Err(status) => status.into_response(), + }; + } } // Record query for passive auto-discovery (if tracker is enabled) @@ -523,10 +535,34 @@ async fn process_range_query_request( } } None => { + let total_duration = start_time.elapsed(); debug!("Range query returned None - query not supported"); - match state.adapter.format_unsupported_query_response().await { - Ok(json) => json.into_response(), - Err(status) => status.into_response(), + + if let Some(fallback) = &state.fallback { + debug!("Range query not supported locally, forwarding to fallback"); + info!( + "query='{}' destination=prometheus total_latency_ms={:.2}", + parsed_request.query, + total_duration.as_secs_f64() * 1000.0 + ); + match fallback + .execute_range_query_with_headers(parsed_request, headers) + .await + { + Ok(response) => response.into_response(), + Err(status) => status.into_response(), + } + } else { + debug!("Range query not supported and forwarding disabled, returning error"); + info!( + "query='{}' destination=none_unsupported total_latency_ms={:.2}", + parsed_request.query, + total_duration.as_secs_f64() * 1000.0 + ); + match state.adapter.format_unsupported_query_response().await { + Ok(json) => json.into_response(), + Err(status) => status.into_response(), + } } } } @@ -535,11 +571,19 @@ async fn process_range_query_request( async fn handle_range_query( query_params: Query>, State(state): State, + headers: axum::http::HeaderMap, ) -> Response { let start_time = Instant::now(); debug!("=== INCOMING RANGE QUERY GET REQUEST ==="); debug!("Raw query params: {:?}", query_params.0); + let mut forwarding_headers = HashMap::new(); + if let Some(auth) = headers.get(axum::http::header::AUTHORIZATION) { + if let Ok(auth_str) = auth.to_str() { + forwarding_headers.insert("Authorization".to_string(), auth_str.to_string()); + } + } + let parsed_request = match state.adapter.parse_range_get_request(query_params).await { Ok(req) => { debug!( @@ -557,13 +601,24 @@ async fn handle_range_query( } }; - process_range_query_request(&state, &parsed_request, start_time).await + process_range_query_request(&state, &parsed_request, start_time, forwarding_headers).await } -async fn handle_range_query_post(State(state): State, body: Bytes) -> Response { +async fn handle_range_query_post( + State(state): State, + headers: axum::http::HeaderMap, + body: Bytes, +) -> Response { let start_time = Instant::now(); debug!("=== INCOMING RANGE QUERY POST REQUEST ==="); + let mut forwarding_headers = HashMap::new(); + if let Some(auth) = headers.get(axum::http::header::AUTHORIZATION) { + if let Ok(auth_str) = auth.to_str() { + forwarding_headers.insert("Authorization".to_string(), auth_str.to_string()); + } + } + // Parse the body as form data let body_str = match String::from_utf8(body.to_vec()) { Ok(s) => s, @@ -607,7 +662,7 @@ async fn handle_range_query_post(State(state): State, body: Bytes) -> } }; - process_range_query_request(&state, &parsed_request, start_time).await + process_range_query_request(&state, &parsed_request, start_time, forwarding_headers).await } #[cfg(test)] diff --git a/asap-query-engine/src/tests/prometheus_forwarding_tests.rs b/asap-query-engine/src/tests/prometheus_forwarding_tests.rs index c65f5a61..25d645e6 100644 --- a/asap-query-engine/src/tests/prometheus_forwarding_tests.rs +++ b/asap-query-engine/src/tests/prometheus_forwarding_tests.rs @@ -19,7 +19,6 @@ async fn start_mock_prometheus_server(port: u16) -> Result<(), Box>) -> Json { let query = params.get("query").unwrap_or(&"".to_string()).clone(); - // Simulate different types of queries if query.contains("unsupported_metric") { Json(json!({ "status": "success", @@ -50,7 +49,44 @@ async fn start_mock_prometheus_server(port: u16) -> Result<(), Box>, + ) -> Json { + let query = params.get("query").unwrap_or(&"".to_string()).clone(); + + if query.contains("unsupported_metric") { + Json(json!({ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": {"__name__": "unsupported_metric"}, + "values": [[1672531200, "42.0"], [1672531260, "43.0"]] + } + ] + } + })) + } else if query.contains("error_query") { + Json(json!({ + "status": "error", + "errorType": "bad_data", + "error": "invalid query syntax" + })) + } else { + Json(json!({ + "status": "success", + "data": { + "resultType": "matrix", + "result": [] + } + })) + } + } + + let app = Router::new() + .route("/api/v1/query", get(mock_query_handler)) + .route("/api/v1/query_range", get(mock_range_query_handler)); let listener = TcpListener::bind(format!("127.0.0.1:{port}")).await?; @@ -269,3 +305,141 @@ async fn test_prometheus_server_unreachable() { || (status == reqwest::StatusCode::OK && response_json["status"] == "error") ); } + +#[tokio::test] +async fn test_prometheus_forwarding_range_query() { + let prometheus_port = 19094; + start_mock_prometheus_server(prometheus_port).await.unwrap(); + + let (_server, server_port) = setup_test_server(prometheus_port).await; + + let client = Client::new(); + + let response = client + .get(format!("http://127.0.0.1:{server_port}/api/v1/query_range")) + .query(&[ + ("query", "unsupported_metric"), + ("start", "1672531200"), + ("end", "1672531260"), + ("step", "60"), + ]) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(response.status(), reqwest::StatusCode::OK); + + let json_response: Value = response.json().await.expect("Failed to parse JSON"); + assert_eq!(json_response["status"], "success"); + assert_eq!(json_response["data"]["resultType"], "matrix"); + + let result = &json_response["data"]["result"][0]; + assert_eq!(result["metric"]["__name__"], "unsupported_metric"); + assert_eq!(result["values"][0][1], "42.0"); + assert_eq!(result["values"][1][1], "43.0"); +} + +#[tokio::test] +async fn test_range_query_forwarding_disabled() { + let config = HttpServerConfig { + port: 0, + handle_http_requests: true, + adapter_config: AdapterConfig::prometheus_promql( + "http://127.0.0.1:19095".to_string(), + false, // Forwarding disabled + ), + }; + + let inference_config = InferenceConfig::new(QueryLanguage::promql, CleanupPolicy::NoCleanup); + let streaming_config = Arc::new(StreamingConfig::default()); + let store = Arc::new(SimpleMapStore::new( + streaming_config.clone(), + CleanupPolicy::NoCleanup, + )); + let query_engine = Arc::new(SimpleEngine::new( + store.clone(), + inference_config, + streaming_config.clone(), + 15000, + crate::data_model::QueryLanguage::promql, + )); + + let server = HttpServer::new(config, query_engine, store, None); + let server_port = server + .start_test_server() + .await + .expect("Failed to start test server"); + + let client = Client::new(); + + let response = client + .get(format!("http://127.0.0.1:{server_port}/api/v1/query_range")) + .query(&[ + ("query", "unsupported_metric"), + ("start", "1672531200"), + ("end", "1672531260"), + ("step", "60"), + ]) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(response.status(), reqwest::StatusCode::OK); + + let json_response: Value = response.json().await.expect("Failed to parse JSON"); + assert_eq!(json_response["status"], "error"); +} + +#[tokio::test] +async fn test_range_query_server_unreachable() { + let config = HttpServerConfig { + port: 0, + handle_http_requests: true, + adapter_config: AdapterConfig::prometheus_promql( + "http://127.0.0.1:99998".to_string(), // Unreachable port + true, + ), + }; + + let inference_config = InferenceConfig::new(QueryLanguage::promql, CleanupPolicy::NoCleanup); + let streaming_config = Arc::new(StreamingConfig::default()); + let store = Arc::new(SimpleMapStore::new( + streaming_config.clone(), + CleanupPolicy::NoCleanup, + )); + let query_engine = Arc::new(SimpleEngine::new( + store.clone(), + inference_config, + streaming_config.clone(), + 15000, + crate::data_model::QueryLanguage::promql, + )); + + let server = HttpServer::new(config, query_engine, store, None); + let server_port = server + .start_test_server() + .await + .expect("Failed to start test server"); + + let client = Client::new(); + + let response = client + .get(format!("http://127.0.0.1:{server_port}/api/v1/query_range")) + .query(&[ + ("query", "unsupported_metric"), + ("start", "1672531200"), + ("end", "1672531260"), + ("step", "60"), + ]) + .send() + .await + .expect("Failed to send request"); + + let status = response.status(); + let json_response: Value = response.json().await.expect("Failed to parse JSON"); + + assert!( + status.is_server_error() + || (status == reqwest::StatusCode::OK && json_response["status"] == "error") + ); +}