Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions asap-tools/experiments/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ fake_exporter_language: "rust" # choices: ["python", "rust"]
# Cluster data exporter configuration
cluster_data_directory: "/data/cluster_traces" # Path to directory containing Google/Alibaba cluster trace data

# Query language (SQL vs PROMQL)
query_language: "PROMQL" # choices: ["SQL", "PROMQL"]
# Backend configuration for the query engine (aligned with BackendConfig in asap-query-engine/src/engine_config.rs)
backend:
type: "prometheus" # choices: ["prometheus", "clickhouse", "elastic_querydsl", "elastic_sql"]
# prometheus: server URL is built at runtime from the Prometheus service (no extra fields needed here)
# clickhouse: url (e.g. "http://ch-host:8123"), database (e.g. "default")
# elastic_querydsl / elastic_sql: url (e.g. "http://es-host:9200"), index (e.g. "metrics-*")

# Query engine options
query_engine:
Expand Down
17 changes: 14 additions & 3 deletions asap-tools/experiments/experiment_run_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,19 @@ def main(cfg: DictConfig):
# Get http port from query engine service
http_port = query_engine_service.get_http_port()

# Build a fully resolved BackendConfig dict. For the prometheus
# backend the server URL depends on the runtime node IP, so we
# fill it in here rather than in config.yaml.
backend_config = dict(args.backend)
if backend_config["type"] == "prometheus":
prometheus_host = provider.get_node_ip(args.node_offset)
backend_config["server"] = (
f"http://{prometheus_host}:{prometheus_port}"
)
backend_config["forward_unsupported_queries"] = (
args.forward_unsupported_queries
)

query_engine_service.start(
experiment_output_dir=experiment_output_dir,
local_experiment_dir=local_experiment_dir,
Expand All @@ -497,13 +510,11 @@ def main(cfg: DictConfig):
profile_query_engine=args.profile_query_engine,
manual=args.manual_query_engine,
streaming_engine=args.streaming_engine,
forward_unsupported_queries=args.forward_unsupported_queries,
controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR,
compress_json=COMPRESS_JSON,
dump_precomputes=args.dump_precomputes,
lock_strategy=args.lock_strategy,
query_language=args.query_language,
prometheus_port=prometheus_port,
backend_config=backend_config,
http_port=http_port,
remote_write_port=args.remote_write_base_port,
)
Expand Down
16 changes: 13 additions & 3 deletions asap-tools/experiments/experiment_run_grafana_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,18 @@ def main(cfg: DictConfig):
# Get http port from query engine service
http_port = query_engine_service.get_http_port()

# Build a fully resolved BackendConfig dict. For the prometheus
# backend the server URL depends on the runtime node IP, so we
# fill it in here rather than in config.yaml.
backend_config = dict(args.backend)
if backend_config["type"] == "prometheus":
prometheus_host = provider.get_node_ip(args.node_offset)
backend_config["server"] = f"http://{prometheus_host}:{prometheus_port}"
# forward_unsupported_queries is forced True for the Grafana demo (line 63)
backend_config["forward_unsupported_queries"] = (
args.forward_unsupported_queries
)

query_engine_service.start(
experiment_output_dir=experiment_output_dir,
local_experiment_dir=local_experiment_dir,
Expand All @@ -470,13 +482,11 @@ def main(cfg: DictConfig):
profile_query_engine=args.profile_query_engine,
manual=args.manual_query_engine,
streaming_engine=args.streaming_engine,
forward_unsupported_queries=args.forward_unsupported_queries,
controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR,
compress_json=COMPRESS_JSON,
dump_precomputes=args.dump_precomputes,
lock_strategy=args.lock_strategy,
query_language=args.query_language,
prometheus_port=prometheus_port,
backend_config=backend_config,
http_port=http_port,
remote_write_port=args.remote_write_base_port,
)
Expand Down
3 changes: 1 addition & 2 deletions asap-tools/experiments/experiment_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,7 @@ def __init__(self, cfg: DictConfig):
# Fake exporter language
self.fake_exporter_language = cfg.fake_exporter_language

# Query language (SQL vs PROMQL) - only used by Rust query engine
self.query_language = cfg.query_language
self.backend = OmegaConf.to_container(cfg.backend, resolve=True)

# Query engine options
self.dump_precomputes = cfg.query_engine.dump_precomputes
Expand Down
88 changes: 23 additions & 65 deletions asap-tools/experiments/experiment_utils/services/query_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,12 @@ def _build_engine_config(
prometheus_scrape_interval: int,
log_level: str,
streaming_engine: str,
forward_unsupported_queries: bool,
controller_config_dir: str,
compress_json: bool,
prometheus_server: str,
backend: dict,
http_port: int,
remote_write_port: int,
dump_precomputes: bool,
query_language: str,
lock_strategy: str,
profile_query_engine: bool,
kafka_broker: str,
Expand All @@ -96,37 +94,24 @@ def _build_engine_config(
prometheus_scrape_interval: Prometheus scraping interval in seconds
log_level: Logging level
streaming_engine: 'arroyo' (Kafka ingest) or 'precompute' (HTTP remote write)
forward_unsupported_queries: Whether to forward unsupported queries to backend
controller_config_dir: Directory containing inference_config.yaml and streaming_config.yaml
compress_json: Whether incoming JSON is gzip-compressed (arroyo/Kafka only)
prometheus_server: Full Prometheus URL, e.g. http://host:9090
backend: BackendConfig dict with type tag and backend-specific fields.
For prometheus: {"type": "prometheus", "server": "http://...", ...}
For clickhouse: {"type": "clickhouse", "url": "...", "database": "...", ...}
For elastic_querydsl/elastic_sql: {"type": "...", "url": "...", "index": "...", ...}
Must include "forward_unsupported_queries" key.
http_port: Port for the query engine's HTTP API server
remote_write_port: Port to listen on for Prometheus remote write (precompute only);
should match streaming.remote_write.base_port in the Hydra config
dump_precomputes: Whether to dump received precomputes to output_dir for debugging
query_language: 'PROMQL' → prometheus backend, 'SQL' → clickhouse backend
lock_strategy: Lock strategy for SimpleMapStore ('global' or 'per-key')
profile_query_engine: Whether to enable do_profiling in the engine
kafka_broker: Kafka broker address, e.g. '10.10.1.1:9092' (arroyo only)

Returns:
Dict matching the EngineConfig YAML schema
"""
# Map query_language to the backend type (determines PromQL vs SQL API)
if query_language.upper() == "PROMQL":
backend: dict = {
"type": "prometheus",
"server": prometheus_server,
"forward_unsupported_queries": forward_unsupported_queries,
}
else:
# SQL mode: use clickhouse backend with the same host as prometheus_server
backend = {
"type": "clickhouse",
"url": prometheus_server,
"forward_unsupported_queries": forward_unsupported_queries,
}

# Ingest config depends on the streaming engine.
# Both flink and arroyo produce to the same Kafka topic.
if streaming_engine in ("arroyo", "flink"):
Expand Down Expand Up @@ -155,7 +140,7 @@ def _build_engine_config(
"streaming_engine": streaming_engine,
"do_profiling": profile_query_engine,
"http_server": {"port": http_port},
"backend": backend,
"backend": backend, # already fully resolved by caller
"store": {"lock_strategy": lock_strategy},
"ingest": ingest,
"precompute_engine": {"dump_precomputes": dump_precomputes},
Expand Down Expand Up @@ -217,13 +202,13 @@ def start(
profile_query_engine: bool,
manual: bool,
streaming_engine: str,
forward_unsupported_queries: bool,
controller_remote_output_dir: str,
compress_json: bool,
dump_precomputes: bool,
lock_strategy: str,
query_language: str = "PROMQL",
**kwargs,
backend_config: dict,
http_port: int,
remote_write_port: int = 8080,
) -> None:
"""
Start the Rust query engine.
Expand All @@ -239,29 +224,18 @@ def start(
profile_query_engine: Whether to enable profiling
manual: Whether to run in manual mode
streaming_engine: Type of streaming engine ('arroyo' or 'precompute')
forward_unsupported_queries: Whether to forward unsupported queries
controller_remote_output_dir: Controller output directory
compress_json: Whether JSON is compressed (arroyo/Kafka only)
dump_precomputes: Whether to dump precomputed values
lock_strategy: Lock strategy for SimpleMapStore (global or per-key)
query_language: Query language (SQL or PROMQL), defaults to PROMQL
**kwargs: Additional configuration.
Required: prometheus_port, http_port.
Optional: prometheus_host (defaults to coordinator node IP),
remote_write_port (port the precompute engine listens on
for Prometheus remote write; should match
streaming.remote_write.base_port, defaults to 8080).
backend_config: Fully resolved BackendConfig dict with type tag and all
backend-specific fields (url/server/database/index as needed)
plus forward_unsupported_queries. Matches the BackendConfig
tagged union in asap-query-engine/src/engine_config.rs.
http_port: Port for the query engine's HTTP API server
remote_write_port: Port the precompute engine listens on for Prometheus remote
write; should match streaming.remote_write.base_port (default 8080)
"""
# Extract prometheus configuration
prometheus_host = kwargs.get(
"prometheus_host", self.provider.get_node_ip(self.node_offset)
)
prometheus_port = kwargs["prometheus_port"] # Required, no default
http_port = kwargs["http_port"] # Required, no default
# Port the precompute engine listens on for Prometheus remote write.
# Should match streaming.remote_write.base_port in the Hydra config.
remote_write_port = kwargs.get("remote_write_port", 8080)

if self.use_container:
self._start_containerized(
experiment_output_dir,
Expand All @@ -272,15 +246,12 @@ def start(
profile_query_engine,
manual,
streaming_engine,
forward_unsupported_queries,
controller_remote_output_dir,
compress_json,
prometheus_host,
prometheus_port,
backend_config,
http_port,
remote_write_port,
dump_precomputes,
query_language,
lock_strategy,
)
else:
Expand All @@ -293,15 +264,12 @@ def start(
profile_query_engine,
manual,
streaming_engine,
forward_unsupported_queries,
controller_remote_output_dir,
compress_json,
prometheus_host,
prometheus_port,
backend_config,
http_port,
remote_write_port,
dump_precomputes,
query_language,
lock_strategy,
)

Expand All @@ -315,15 +283,12 @@ def _start_bare_metal(
profile_query_engine: bool,
manual: bool,
streaming_engine: str,
forward_unsupported_queries: bool,
controller_remote_output_dir: str,
compress_json: bool,
prometheus_host: str,
prometheus_port: int,
backend_config: dict,
http_port: int,
remote_write_port: int,
dump_precomputes: bool,
query_language: str,
lock_strategy: str,
) -> None:
"""Start Rust QueryEngine using bare metal deployment."""
Expand All @@ -336,14 +301,12 @@ def _start_bare_metal(
prometheus_scrape_interval=prometheus_scrape_interval,
log_level=log_level,
streaming_engine=streaming_engine,
forward_unsupported_queries=forward_unsupported_queries,
controller_config_dir=controller_remote_output_dir,
compress_json=compress_json,
prometheus_server=f"http://{prometheus_host}:{prometheus_port}",
backend=backend_config,
http_port=http_port,
remote_write_port=remote_write_port,
dump_precomputes=dump_precomputes,
query_language=query_language,
lock_strategy=lock_strategy,
profile_query_engine=profile_query_engine,
kafka_broker=f"{self.provider.get_node_ip(self.node_offset)}:9092",
Expand Down Expand Up @@ -382,15 +345,12 @@ def _start_containerized(
profile_query_engine: bool,
manual: bool,
streaming_engine: str,
forward_unsupported_queries: bool,
controller_remote_output_dir: str,
compress_json: bool,
prometheus_host: str,
prometheus_port: int,
backend_config: dict,
http_port: int,
remote_write_port: int,
dump_precomputes: bool,
query_language: str,
lock_strategy: str,
) -> None:
"""Start Rust QueryEngine using containerized deployment with Jinja template."""
Expand All @@ -409,14 +369,12 @@ def _start_containerized(
prometheus_scrape_interval=prometheus_scrape_interval,
log_level=log_level,
streaming_engine=streaming_engine,
forward_unsupported_queries=forward_unsupported_queries,
controller_config_dir=container_controller_dir,
compress_json=compress_json,
prometheus_server=f"http://{prometheus_host}:{prometheus_port}",
backend=backend_config,
http_port=http_port,
remote_write_port=remote_write_port,
dump_precomputes=dump_precomputes,
query_language=query_language,
lock_strategy=lock_strategy,
profile_query_engine=profile_query_engine,
kafka_broker=f"{self.provider.get_node_ip(self.node_offset)}:9092",
Expand Down
Loading