From 3d08fde85fe6e11492be3859b3c4a232876c9fe5 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Thu, 7 May 2026 20:36:08 -0400 Subject: [PATCH] chore(experiments): added backend config option to experiment infra --- asap-tools/experiments/config/config.yaml | 8 +- asap-tools/experiments/experiment_run_e2e.py | 17 +++- .../experiment_run_grafana_demo.py | 16 +++- .../experiments/experiment_utils/config.py | 3 +- .../experiment_utils/services/query_engine.py | 88 +++++-------------- 5 files changed, 57 insertions(+), 75 deletions(-) diff --git a/asap-tools/experiments/config/config.yaml b/asap-tools/experiments/config/config.yaml index f25f283..8b6894b 100644 --- a/asap-tools/experiments/config/config.yaml +++ b/asap-tools/experiments/config/config.yaml @@ -78,8 +78,12 @@ fake_exporter_language: "rust" # choices: ["python", "rust"] # Cluster data exporter configuration cluster_data_directory: "/data/cluster_traces" # Path to directory containing Google/Alibaba cluster trace data -# Query language (SQL vs PROMQL) -query_language: "PROMQL" # choices: ["SQL", "PROMQL"] +# Backend configuration for the query engine (aligned with BackendConfig in asap-query-engine/src/engine_config.rs) +backend: + type: "prometheus" # choices: ["prometheus", "clickhouse", "elastic_querydsl", "elastic_sql"] + # prometheus: server URL is built at runtime from the Prometheus service (no extra fields needed here) + # clickhouse: url (e.g. "http://ch-host:8123"), database (e.g. "default") + # elastic_querydsl / elastic_sql: url (e.g. "http://es-host:9200"), index (e.g. "metrics-*") # Query engine options query_engine: diff --git a/asap-tools/experiments/experiment_run_e2e.py b/asap-tools/experiments/experiment_run_e2e.py index 132c989..4f31539 100644 --- a/asap-tools/experiments/experiment_run_e2e.py +++ b/asap-tools/experiments/experiment_run_e2e.py @@ -488,6 +488,19 @@ def main(cfg: DictConfig): # Get http port from query engine service http_port = query_engine_service.get_http_port() + # Build a fully resolved BackendConfig dict. For the prometheus + # backend the server URL depends on the runtime node IP, so we + # fill it in here rather than in config.yaml. + backend_config = dict(args.backend) + if backend_config["type"] == "prometheus": + prometheus_host = provider.get_node_ip(args.node_offset) + backend_config["server"] = ( + f"http://{prometheus_host}:{prometheus_port}" + ) + backend_config["forward_unsupported_queries"] = ( + args.forward_unsupported_queries + ) + query_engine_service.start( experiment_output_dir=experiment_output_dir, local_experiment_dir=local_experiment_dir, @@ -497,13 +510,11 @@ def main(cfg: DictConfig): profile_query_engine=args.profile_query_engine, manual=args.manual_query_engine, streaming_engine=args.streaming_engine, - forward_unsupported_queries=args.forward_unsupported_queries, controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR, compress_json=COMPRESS_JSON, dump_precomputes=args.dump_precomputes, lock_strategy=args.lock_strategy, - query_language=args.query_language, - prometheus_port=prometheus_port, + backend_config=backend_config, http_port=http_port, remote_write_port=args.remote_write_base_port, ) diff --git a/asap-tools/experiments/experiment_run_grafana_demo.py b/asap-tools/experiments/experiment_run_grafana_demo.py index 58ae42e..90282fe 100644 --- a/asap-tools/experiments/experiment_run_grafana_demo.py +++ b/asap-tools/experiments/experiment_run_grafana_demo.py @@ -461,6 +461,18 @@ def main(cfg: DictConfig): # Get http port from query engine service http_port = query_engine_service.get_http_port() + # Build a fully resolved BackendConfig dict. For the prometheus + # backend the server URL depends on the runtime node IP, so we + # fill it in here rather than in config.yaml. + backend_config = dict(args.backend) + if backend_config["type"] == "prometheus": + prometheus_host = provider.get_node_ip(args.node_offset) + backend_config["server"] = f"http://{prometheus_host}:{prometheus_port}" + # forward_unsupported_queries is forced True for the Grafana demo (line 63) + backend_config["forward_unsupported_queries"] = ( + args.forward_unsupported_queries + ) + query_engine_service.start( experiment_output_dir=experiment_output_dir, local_experiment_dir=local_experiment_dir, @@ -470,13 +482,11 @@ def main(cfg: DictConfig): profile_query_engine=args.profile_query_engine, manual=args.manual_query_engine, streaming_engine=args.streaming_engine, - forward_unsupported_queries=args.forward_unsupported_queries, controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR, compress_json=COMPRESS_JSON, dump_precomputes=args.dump_precomputes, lock_strategy=args.lock_strategy, - query_language=args.query_language, - prometheus_port=prometheus_port, + backend_config=backend_config, http_port=http_port, remote_write_port=args.remote_write_base_port, ) diff --git a/asap-tools/experiments/experiment_utils/config.py b/asap-tools/experiments/experiment_utils/config.py index eae7593..69b0134 100644 --- a/asap-tools/experiments/experiment_utils/config.py +++ b/asap-tools/experiments/experiment_utils/config.py @@ -437,8 +437,7 @@ def __init__(self, cfg: DictConfig): # Fake exporter language self.fake_exporter_language = cfg.fake_exporter_language - # Query language (SQL vs PROMQL) - only used by Rust query engine - self.query_language = cfg.query_language + self.backend = OmegaConf.to_container(cfg.backend, resolve=True) # Query engine options self.dump_precomputes = cfg.query_engine.dump_precomputes diff --git a/asap-tools/experiments/experiment_utils/services/query_engine.py b/asap-tools/experiments/experiment_utils/services/query_engine.py index ca121ef..eb3d5e7 100644 --- a/asap-tools/experiments/experiment_utils/services/query_engine.py +++ b/asap-tools/experiments/experiment_utils/services/query_engine.py @@ -75,14 +75,12 @@ def _build_engine_config( prometheus_scrape_interval: int, log_level: str, streaming_engine: str, - forward_unsupported_queries: bool, controller_config_dir: str, compress_json: bool, - prometheus_server: str, + backend: dict, http_port: int, remote_write_port: int, dump_precomputes: bool, - query_language: str, lock_strategy: str, profile_query_engine: bool, kafka_broker: str, @@ -96,15 +94,17 @@ def _build_engine_config( prometheus_scrape_interval: Prometheus scraping interval in seconds log_level: Logging level streaming_engine: 'arroyo' (Kafka ingest) or 'precompute' (HTTP remote write) - forward_unsupported_queries: Whether to forward unsupported queries to backend controller_config_dir: Directory containing inference_config.yaml and streaming_config.yaml compress_json: Whether incoming JSON is gzip-compressed (arroyo/Kafka only) - prometheus_server: Full Prometheus URL, e.g. http://host:9090 + backend: BackendConfig dict with type tag and backend-specific fields. + For prometheus: {"type": "prometheus", "server": "http://...", ...} + For clickhouse: {"type": "clickhouse", "url": "...", "database": "...", ...} + For elastic_querydsl/elastic_sql: {"type": "...", "url": "...", "index": "...", ...} + Must include "forward_unsupported_queries" key. http_port: Port for the query engine's HTTP API server remote_write_port: Port to listen on for Prometheus remote write (precompute only); should match streaming.remote_write.base_port in the Hydra config dump_precomputes: Whether to dump received precomputes to output_dir for debugging - query_language: 'PROMQL' → prometheus backend, 'SQL' → clickhouse backend lock_strategy: Lock strategy for SimpleMapStore ('global' or 'per-key') profile_query_engine: Whether to enable do_profiling in the engine kafka_broker: Kafka broker address, e.g. '10.10.1.1:9092' (arroyo only) @@ -112,21 +112,6 @@ def _build_engine_config( Returns: Dict matching the EngineConfig YAML schema """ - # Map query_language to the backend type (determines PromQL vs SQL API) - if query_language.upper() == "PROMQL": - backend: dict = { - "type": "prometheus", - "server": prometheus_server, - "forward_unsupported_queries": forward_unsupported_queries, - } - else: - # SQL mode: use clickhouse backend with the same host as prometheus_server - backend = { - "type": "clickhouse", - "url": prometheus_server, - "forward_unsupported_queries": forward_unsupported_queries, - } - # Ingest config depends on the streaming engine. # Both flink and arroyo produce to the same Kafka topic. if streaming_engine in ("arroyo", "flink"): @@ -155,7 +140,7 @@ def _build_engine_config( "streaming_engine": streaming_engine, "do_profiling": profile_query_engine, "http_server": {"port": http_port}, - "backend": backend, + "backend": backend, # already fully resolved by caller "store": {"lock_strategy": lock_strategy}, "ingest": ingest, "precompute_engine": {"dump_precomputes": dump_precomputes}, @@ -217,13 +202,13 @@ def start( profile_query_engine: bool, manual: bool, streaming_engine: str, - forward_unsupported_queries: bool, controller_remote_output_dir: str, compress_json: bool, dump_precomputes: bool, lock_strategy: str, - query_language: str = "PROMQL", - **kwargs, + backend_config: dict, + http_port: int, + remote_write_port: int = 8080, ) -> None: """ Start the Rust query engine. @@ -239,29 +224,18 @@ def start( profile_query_engine: Whether to enable profiling manual: Whether to run in manual mode streaming_engine: Type of streaming engine ('arroyo' or 'precompute') - forward_unsupported_queries: Whether to forward unsupported queries controller_remote_output_dir: Controller output directory compress_json: Whether JSON is compressed (arroyo/Kafka only) dump_precomputes: Whether to dump precomputed values lock_strategy: Lock strategy for SimpleMapStore (global or per-key) - query_language: Query language (SQL or PROMQL), defaults to PROMQL - **kwargs: Additional configuration. - Required: prometheus_port, http_port. - Optional: prometheus_host (defaults to coordinator node IP), - remote_write_port (port the precompute engine listens on - for Prometheus remote write; should match - streaming.remote_write.base_port, defaults to 8080). + backend_config: Fully resolved BackendConfig dict with type tag and all + backend-specific fields (url/server/database/index as needed) + plus forward_unsupported_queries. Matches the BackendConfig + tagged union in asap-query-engine/src/engine_config.rs. + http_port: Port for the query engine's HTTP API server + remote_write_port: Port the precompute engine listens on for Prometheus remote + write; should match streaming.remote_write.base_port (default 8080) """ - # Extract prometheus configuration - prometheus_host = kwargs.get( - "prometheus_host", self.provider.get_node_ip(self.node_offset) - ) - prometheus_port = kwargs["prometheus_port"] # Required, no default - http_port = kwargs["http_port"] # Required, no default - # Port the precompute engine listens on for Prometheus remote write. - # Should match streaming.remote_write.base_port in the Hydra config. - remote_write_port = kwargs.get("remote_write_port", 8080) - if self.use_container: self._start_containerized( experiment_output_dir, @@ -272,15 +246,12 @@ def start( profile_query_engine, manual, streaming_engine, - forward_unsupported_queries, controller_remote_output_dir, compress_json, - prometheus_host, - prometheus_port, + backend_config, http_port, remote_write_port, dump_precomputes, - query_language, lock_strategy, ) else: @@ -293,15 +264,12 @@ def start( profile_query_engine, manual, streaming_engine, - forward_unsupported_queries, controller_remote_output_dir, compress_json, - prometheus_host, - prometheus_port, + backend_config, http_port, remote_write_port, dump_precomputes, - query_language, lock_strategy, ) @@ -315,15 +283,12 @@ def _start_bare_metal( profile_query_engine: bool, manual: bool, streaming_engine: str, - forward_unsupported_queries: bool, controller_remote_output_dir: str, compress_json: bool, - prometheus_host: str, - prometheus_port: int, + backend_config: dict, http_port: int, remote_write_port: int, dump_precomputes: bool, - query_language: str, lock_strategy: str, ) -> None: """Start Rust QueryEngine using bare metal deployment.""" @@ -336,14 +301,12 @@ def _start_bare_metal( prometheus_scrape_interval=prometheus_scrape_interval, log_level=log_level, streaming_engine=streaming_engine, - forward_unsupported_queries=forward_unsupported_queries, controller_config_dir=controller_remote_output_dir, compress_json=compress_json, - prometheus_server=f"http://{prometheus_host}:{prometheus_port}", + backend=backend_config, http_port=http_port, remote_write_port=remote_write_port, dump_precomputes=dump_precomputes, - query_language=query_language, lock_strategy=lock_strategy, profile_query_engine=profile_query_engine, kafka_broker=f"{self.provider.get_node_ip(self.node_offset)}:9092", @@ -382,15 +345,12 @@ def _start_containerized( profile_query_engine: bool, manual: bool, streaming_engine: str, - forward_unsupported_queries: bool, controller_remote_output_dir: str, compress_json: bool, - prometheus_host: str, - prometheus_port: int, + backend_config: dict, http_port: int, remote_write_port: int, dump_precomputes: bool, - query_language: str, lock_strategy: str, ) -> None: """Start Rust QueryEngine using containerized deployment with Jinja template.""" @@ -409,14 +369,12 @@ def _start_containerized( prometheus_scrape_interval=prometheus_scrape_interval, log_level=log_level, streaming_engine=streaming_engine, - forward_unsupported_queries=forward_unsupported_queries, controller_config_dir=container_controller_dir, compress_json=compress_json, - prometheus_server=f"http://{prometheus_host}:{prometheus_port}", + backend=backend_config, http_port=http_port, remote_write_port=remote_write_port, dump_precomputes=dump_precomputes, - query_language=query_language, lock_strategy=lock_strategy, profile_query_engine=profile_query_engine, kafka_broker=f"{self.provider.get_node_ip(self.node_offset)}:9092",