diff --git a/README.md b/README.md index e46a5d9..db9e059 100644 --- a/README.md +++ b/README.md @@ -627,7 +627,7 @@ Phase 1 is complete when all of the following pass on 10 consecutive test incide | Phase 1.5 | S1: Structured logging — structlog, `run_id`, lifecycle events, RunRecord | ✅ Done | | Phase 1.5 | S2: Monitoring foundation — run store, REST API, Alpine.js dashboard, mode scaffold | ✅ Done | | Phase 1.5 | S3: Docker + `ARIA_CONFIG_PATH` + `VertexAILLMClient` + LLM provider DI (incl. #84 security fix) | ✅ Done | -| Phase 1.5 | S4: Testing infrastructure — UC1/UC2/UC3 cluster wiring, KB runbooks, CMDB validation | 🔜 Planned | +| Phase 1.5 | S4: Testing infrastructure — UC1/UC2/UC3 cluster wiring, KB runbooks, CMDB validation | 🔄 In progress | | Phase 1.5 | S5: Round 2 acceptance testing — 30 incidents on UC1 + UC2 real infrastructure | 🔜 Planned | | Phase 1.5 | S6: GCP native connectors — BQ, Cloud Functions, Pub/Sub, GCS | 🔜 Planned | | Phase 2 | Human validation gate + write-back to ServiceNow | 💡 Planned | diff --git a/api/dependencies.py b/api/dependencies.py index 12ce1fd..240035e 100644 --- a/api/dependencies.py +++ b/api/dependencies.py @@ -144,7 +144,10 @@ def get_agent3() -> ClassifierAgent: "ARIA_AGENT3_MODEL env var is not set " "(or ARIA_GLOBAL_MODEL when ARIA_LLM_MODE=global)" ) - return ClassifierAgent(llm_client=_get_llm_client(model)) + return ClassifierAgent( + llm_client=_get_llm_client(model), + analyser_kb_dir=cfg.analyser_kb_dir(), + ) @lru_cache(maxsize=1) @@ -240,12 +243,17 @@ def get_agent2() -> LogExtractorAgent: registry = { PlatformTag.CDP: SSHLogConnector( vault, - ssh_key_secret="CDP_SSH_KEY", + ssh_key_secret=cfg.cdp_ssh_key_secret(), ssh_user=cfg.cdp_ssh_user(), host_key_secret="CDP_HOST_KEY" if os.environ.get("CDP_HOST_KEY") else None, log_dirs=cfg.cdp_log_dirs(), ), - PlatformTag.GCP: GCPLogConnector(vault), + # resource_types scopes Cloud Logging queries to Dataproc cluster and job logs (UC2). + # S6 will generalise this into a configurable resource_type_templates dict. + PlatformTag.GCP: GCPLogConnector( + vault, + resource_types=["cloud_dataproc_cluster", "cloud_dataproc_job"], + ), } llm = None model = _resolve_model("2") diff --git a/conf_template.yaml b/conf_template.yaml index fc8b2f3..069f81b 100644 --- a/conf_template.yaml +++ b/conf_template.yaml @@ -56,12 +56,29 @@ gcp: # only required when connectors.log = gcp cdp: ssh_user: hadoop # OS user with read access to log directories below - log_dirs: # directories searched for logs on CDP cluster nodes - - /var/log/hadoop-hdfs # leave unset to use these defaults - - /var/log/hadoop-yarn + ssh_key_secret: CDP_SSH_KEY # vault key name for the SSH private key PEM. + # EnvVarVault: reads os.environ[ssh_key_secret] directly. + # GCPSecretManagerVault: normalises underscores→hyphens and + # prepends 'aria-', so 'CDP_SSH_KEY' → GCP secret 'aria-cdp-ssh-key'. + # UC1 (TF): TF provisions secret 'aria-uc1-ssh-private-key'. + # Set ssh_key_secret: CDP_UC1_SSH_PRIVATE_KEY to match, or + # rename the TF secret to 'aria-cdp-ssh-key'. + log_dirs: # directories searched for logs on CDP cluster nodes. + # UC1 (TF Hadoop VMs) use subdirectory paths — set these + # at deployment time from TF startup script: + - /var/log/hadoop/hdfs # TF UC1 NameNode / DataNode logs + - /var/log/hadoop/yarn # TF UC1 ResourceManager / NodeManager logs - /var/log/hive - - /var/log/oozie - /var/log/spark + - /var/log/kafka + - /var/log/zookeeper + - /var/log/oozie + - /var/log/nifi + +knowledge_base: + analyser_kb_dir: data/knowledge_base/analyser_kb # labeled log excerpts for Agent 3 few-shot prompting + # doubles as fine-tuning corpus for the future + # specialist Agent 3 model (roadmap: post-POC) slack: channel_id: # channel where Agent 4 posts notifications diff --git a/core/agents/classifier.py b/core/agents/classifier.py index 41e1495..5d58dde 100644 --- a/core/agents/classifier.py +++ b/core/agents/classifier.py @@ -10,6 +10,8 @@ """ import json +import time +from pathlib import Path from typing import Any from core.exceptions import ClassificationError @@ -23,6 +25,21 @@ {"oom", "cpu", "disk", "network", "auth", "db_lock", "pipeline", "unknown"} ) + +def _load_analyser_kb(directory: str) -> str: + """Load analyser_kb files and join them as a single few-shot reference block. + + Files are sorted alphabetically so the ordering is deterministic. Returns an + empty string when the directory is missing — the classifier degrades gracefully. + """ + path = Path(directory) + if not path.is_dir(): + return "" + return "\n\n---\n\n".join( + f.read_text(encoding="utf-8").strip() for f in sorted(path.glob("*.md")) + ) + + _SYSTEM_PROMPT = """\ You are ARIA, an AI operations assistant. Classify the root cause of the incident \ from the metadata and log evidence below. @@ -58,14 +75,23 @@ class ClassifierAgent: falls back to stub behaviour (dry-run compatibility). """ - def __init__(self, llm_client: LLMClientInterface | None = None) -> None: + def __init__( + self, + llm_client: LLMClientInterface | None = None, + analyser_kb_dir: str | None = None, + ) -> None: """Initialise the classifier. Args: llm_client: LLM client used to call the model. When None, the agent falls back to stub behaviour (error_class='unknown', LOW confidence). + analyser_kb_dir: Path to analyser_kb directory of labeled log excerpts. + When provided, examples are injected into every LLM call as + few-shot reference context. Also serves as the training data + corpus for the future fine-tuned Agent 3 model. """ self._llm = llm_client + self._few_shot_examples: str = _load_analyser_kb(analyser_kb_dir) if analyser_kb_dir else "" @log_agent_lifecycle("agent3") def run(self, state: PipelineState) -> PipelineState: @@ -109,16 +135,36 @@ def run(self, state: PipelineState) -> PipelineState: logger.info("classifier: running for %s", state.incident_number) messages = self._build_messages(state) - try: - raw = self._llm.complete( - messages, - system=_SYSTEM_PROMPT, - temperature=0.0, - max_tokens=1024, + # Retry once with a 1-second backoff before surfacing as ClassificationError (#83). + # A transient LLM failure must not kill Agent 4 — the notify-only guarantee requires + # Agent 4 to always run, even when Agent 3 cannot classify. + _llm_exc: Exception | None = None + for attempt in range(2): + try: + raw = self._llm.complete( + messages, + system=_SYSTEM_PROMPT, + temperature=0.0, + max_tokens=1024, + ) + _llm_exc = None + break + except Exception as exc: + _llm_exc = exc + if attempt == 0: + logger.warning( + "classifier: LLM call failed for %s (attempt 1/2), retrying: %s", + state.incident_number, + exc, + ) + time.sleep(1) + if _llm_exc is not None: + logger.error( + "classifier: LLM call failed for %s after 2 attempts: %s", + state.incident_number, + _llm_exc, ) - except Exception as exc: - logger.error("classifier: LLM call failed for %s: %s", state.incident_number, exc) - raise ClassificationError(f"LLM call failed: {exc}") from exc + raise ClassificationError(f"LLM call failed: {_llm_exc}") from _llm_exc try: classification, log_request = self._parse_response(raw) @@ -193,6 +239,8 @@ def _build_messages(self, state: PipelineState) -> list[dict[str, str]]: f"Affected CI: {affected_ci}\n\n" f"Log evidence — {log_section}" ) + if self._few_shot_examples: + content += f"\n\n## Reference log examples\n\n{self._few_shot_examples}" return [{"role": "user", "content": content}] def _parse_response(self, raw: str) -> tuple[ClassificationResult | None, LogRequest | None]: diff --git a/core/agents/log_extractor.py b/core/agents/log_extractor.py index f901cac..a08400c 100644 --- a/core/agents/log_extractor.py +++ b/core/agents/log_extractor.py @@ -242,9 +242,12 @@ def _plan_with_llm(self, state: PipelineState) -> LogQueryPlan: ) data = json.loads(response) + raw_paths = list(data.get("log_paths", [])) return LogQueryPlan( connector_name=str(data["connector_name"]), - log_paths=list(data.get("log_paths", [])), + # Validate LLM-planned paths against safe prefixes to prevent path-injection + # from adversarial log content coercing the LLM toward /etc/ or ~/ targets (#85). + log_paths=_validate_log_paths(raw_paths), keywords=list(data.get("keywords", [])), time_window_minutes=int(data.get("time_window_minutes", _DEFAULT_WINDOW)), reasoning=str(data.get("reasoning", "")), @@ -496,3 +499,27 @@ def _empty(host: str, platform_tag: PlatformTag) -> LogQueryResult: total_scanned=0, confidence=ConfidenceBand.LOW, ) + + +# Allowed log path prefixes for LLM-planned paths (#85). +# Paths outside these prefixes are dropped before being passed to connectors. +# Mirrors the CI-name allowlist used for host resolution in _resolve_ci_from_request(). +_SAFE_LOG_PREFIXES: tuple[str, ...] = ("/var/log/",) + + +def _validate_log_paths(paths: list[str]) -> list[str]: + """Filter LLM-planned log paths to known-safe directory prefixes. + + Prevents adversarial log content from coercing the LLM into planning paths + that point outside log directories (e.g. /etc/ssh/, ~/.ssh/). Any path that + does not start with an allowed prefix is dropped with a warning. + """ + safe = [p for p in paths if any(p.startswith(prefix) for prefix in _SAFE_LOG_PREFIXES)] + dropped = len(paths) - len(safe) + if dropped: + logger.warning( + "Agent 2: dropped %d LLM-planned log path(s) outside allowed prefixes %s", + dropped, + _SAFE_LOG_PREFIXES, + ) + return safe diff --git a/core/config.py b/core/config.py index bb70801..2f7c3a9 100644 --- a/core/config.py +++ b/core/config.py @@ -141,6 +141,20 @@ def cdp_ssh_user() -> str: return _get(["cdp", "ssh_user"], "CDP_SSH_USER", "hadoop") +def cdp_ssh_key_secret() -> str: + """Return the vault key name used to retrieve the CDP SSH private key. + + For EnvVarVault: the key name is read directly from the environment. + For GCPSecretManagerVault: underscores are normalised to hyphens and an + 'aria-' prefix is added — e.g. 'CDP_SSH_KEY' → GCP secret 'aria-cdp-ssh-key'. + The TF-provisioned secret name for UC1 is 'aria-uc1-ssh-private-key', which + requires setting cdp.ssh_key_secret: CDP_UC1_SSH_PRIVATE_KEY in conf.yaml + (resolves to 'aria-cdp-uc1-ssh-private-key') or renaming the TF secret. + Defaults to 'CDP_SSH_KEY' (backward-compatible with pre-S4 deployments). + """ + return _get(["cdp", "ssh_key_secret"], "CDP_SSH_KEY_SECRET", "CDP_SSH_KEY") + + def cdp_log_dirs() -> list[str]: """Return directories to search for logs on CDP cluster nodes. @@ -212,6 +226,19 @@ def run_db_path() -> str: return _get(["runs", "db_path"], "ARIA_RUN_DB_PATH", "data/runs.db") +# ── Knowledge Base ──────────────────────────────────────────────────────────── + + +def analyser_kb_dir() -> str | None: + """Return path to the analyser_kb directory of labeled log excerpts for Agent 3 few-shot prompting. + + Reads knowledge_base.analyser_kb_dir from conf.yaml / ARIA_ANALYSER_KB_DIR env var. + None when not configured — Agent 3 classifies without few-shot examples. + """ + val = _get(["knowledge_base", "analyser_kb_dir"], "ARIA_ANALYSER_KB_DIR") + return val or None + + # ── GCP ─────────────────────────────────────────────────────────────────────── diff --git a/core/orchestrator/pipeline.py b/core/orchestrator/pipeline.py index b8fe49d..d8b1eaf 100644 --- a/core/orchestrator/pipeline.py +++ b/core/orchestrator/pipeline.py @@ -21,6 +21,7 @@ from langgraph.graph.state import CompiledStateGraph import core.config as cfg +from core.exceptions import ClassificationError from core.interfaces.run_state_store import RunStateStoreInterface from core.interfaces.run_store import RunStoreInterface from core.logging_config import configure_logging @@ -129,13 +130,30 @@ def _agent2_node(self, state: PipelineState) -> dict: } def _agent3_node(self, state: PipelineState) -> dict: - """LangGraph node wrapper for Agent 3. Returns classification and any pending log request.""" + """LangGraph node wrapper for Agent 3. Returns classification and any pending log request. + + ClassificationError is caught here rather than propagated — if Agent 3 cannot + classify, the error is recorded in state and the pipeline routes to Agent 4 so + the notify-only guarantee is never violated (#83). + """ self._track_agent(state, "agent3") - result = self._agent3.run(state) - return { - "classification": result.classification, - "pending_log_request": result.pending_log_request, - } + try: + result = self._agent3.run(state) + return { + "classification": result.classification, + "pending_log_request": result.pending_log_request, + } + except ClassificationError as exc: + logger.error( + "agent3: ClassificationError for %s — routing to agent4 with error: %s", + state.incident_number, + exc, + ) + return { + "classification": None, + "pending_log_request": None, + "error": str(exc), + } def _agent4_node(self, state: PipelineState) -> dict: """LangGraph node wrapper for Agent 4. Returns notification_sent and any delivery error.""" diff --git a/data/cluster_hosts.json b/data/cluster_hosts.json index 70ae954..d53a69f 100644 --- a/data/cluster_hosts.json +++ b/data/cluster_hosts.json @@ -1,16 +1,8 @@ { - "_comment": "CI name → IP mapping for Agent 2 ReAct loop. Used when Agent 3 requests logs from a service different from the primary affected_ci. All test CIs resolve to localhost (127.0.0.1) since this VPS simulates the full cluster.", - "cdp-dn-01": "127.0.0.1", - "cdp-nn-01": "127.0.0.1", - "cdp-hms-01": "127.0.0.1", - "cdp-rm-01": "127.0.0.1", - "cdp-zk-01": "127.0.0.1", - "cdp-dn-02": "127.0.0.1", - "cdp-nn-02": "127.0.0.1", - "cdp-hs2-02": "127.0.0.1", - "cdp-dn-03": "127.0.0.1", - "cdp-cluster-prod": "127.0.0.1", - "cdp-rm-02": "127.0.0.1", - "cdp-hms-02": "127.0.0.1", - "cdp-hs2-03": "127.0.0.1" + "_comment": "CI name → IP mapping for Agent 2 ReAct loop cross-service resolution. Keys must match the CI names in ServiceNow CMDB. IPs are populated from Terraform output after UC1 cluster apply: terraform -chdir=infra/terraform/uc_testing/uc1-hadoop-onprem output -json node_internal_ips", + "cdp-master-01": "POPULATE_FROM_TF_OUTPUT", + "cdp-data-01": "POPULATE_FROM_TF_OUTPUT", + "cdp-data-02": "POPULATE_FROM_TF_OUTPUT", + "cdp-utility-01": "POPULATE_FROM_TF_OUTPUT", + "cdp-bus-01": "POPULATE_FROM_TF_OUTPUT" } diff --git a/implementations/clusters/cloud/gcp/log_connector.py b/implementations/clusters/cloud/gcp/log_connector.py index 366abd0..c0836a6 100644 --- a/implementations/clusters/cloud/gcp/log_connector.py +++ b/implementations/clusters/cloud/gcp/log_connector.py @@ -54,6 +54,7 @@ def __init__( vault: VaultInterface, sa_key_secret: str = "GCP_SA_JSON", project_id: str | None = None, + resource_types: list[str] | None = None, ) -> None: """Initialise the GCP log connector. @@ -63,10 +64,17 @@ def __init__( Defaults to 'GCP_SA_JSON'. project_id: Override the GCP project ID. When None, the project_id field from the service account JSON is used instead. + resource_types: Optional list of resource.type values to scope Cloud Logging + queries (e.g. ['cloud_dataproc_cluster', 'cloud_dataproc_job'] + for UC2 Dataproc). When provided, a resource.type OR-clause is + added to the filter and resource.labels.cluster_name is included + as an additional host label alias. S6 will replace this with a + configurable resource_type_templates dict. """ self._vault = vault self._sa_key_secret = sa_key_secret self._project_id = project_id + self._resource_types = resource_types # ── LogStoreInterface ───────────────────────────────────────────────────── @@ -112,7 +120,7 @@ def query_logs( logger.warning("GCPLogConnector auth failed: %s", exc) raise LogStoreUnavailableError(f"GCP Cloud Logging auth failed: {exc}") from exc - filter_str = _build_filter(host, start_time, end_time, keywords) + filter_str = _build_filter(host, start_time, end_time, keywords, self._resource_types) logger.debug("GCPLogConnector filter: %s", filter_str) try: @@ -192,11 +200,18 @@ def _escape_filter_string(value: str) -> str: def _build_filter( - host: str, start_time: datetime, end_time: datetime, keywords: list[str] | None + host: str, + start_time: datetime, + end_time: datetime, + keywords: list[str] | None, + resource_types: list[str] | None = None, ) -> str: """Build a GCP Cloud Logging filter string for the given host, time window, and keywords. Filters on timestamp range, severity >= WARNING, and resource labels for the host. + When resource_types is provided, an OR-combined resource.type clause is added and + resource.labels.cluster_name is included as an additional host label alias (required + for Dataproc, which uses cluster_name rather than instance_id). Keywords are applied as textPayload contains clauses (OR-combined, max 10, max 100 chars each). The host value is validated against a safe character regex before being embedded in the filter to prevent log filter injection. @@ -206,6 +221,8 @@ def _build_filter( start_time: Start of the query window. end_time: End of the query window. keywords: Optional list of keywords to match in the log text. + resource_types: Optional list of resource.type values to scope the query + (e.g. ['cloud_dataproc_cluster', 'cloud_dataproc_job']). Returns: A GCP log filter string ready for list_entries(filter_=...). @@ -218,14 +235,21 @@ def _build_filter( f'timestamp <= "{_rfc3339(end_time)}"', "severity >= WARNING", ] + if resource_types: + safe_types = [_escape_filter_string(rt) for rt in resource_types if rt] + if safe_types: + rt_parts = " OR ".join(f'resource.type="{rt}"' for rt in safe_types) + parts.append(f"({rt_parts})") if host: if not _SAFE_HOST_RE.match(host): raise ValueError(f"Invalid host value for GCP filter: {host!r}") safe_host = _escape_filter_string(host) + # cluster_name is included for Dataproc; instance_id/pod_name/job_id for VMs/GKE/CF. parts.append( f'(resource.labels.instance_id="{safe_host}" OR ' f'resource.labels.pod_name="{safe_host}" OR ' - f'resource.labels.job_id="{safe_host}")' + f'resource.labels.job_id="{safe_host}" OR ' + f'resource.labels.cluster_name="{safe_host}")' ) if keywords: safe_kws = [_escape_filter_string(k) for k in keywords[:10] if k and len(k) <= 100] diff --git a/implementations/knowledge_base/file_kb.py b/implementations/knowledge_base/file_kb.py index bd3b382..4f5cfaa 100644 --- a/implementations/knowledge_base/file_kb.py +++ b/implementations/knowledge_base/file_kb.py @@ -23,7 +23,9 @@ _LOG_PATH_RE = re.compile(r"/[\w./-]*\.log[\w/.-]*|/var/log/[\w/.-]+") _KEYWORD_RE = re.compile( r"\b(ERROR|WARN|FATAL|OOM|OutOfMemory|disk full|timeout|" - r"connection refused|HDFS|YARN|Hive|Spark|Oozie|safe mode)\b", + r"connection refused|HDFS|YARN|Hive|Spark|Oozie|safe mode|" + r"Kafka|ZooKeeper|NiFi|AuthenticationException|DiskOutOfSpaceException|" + r"GC overhead)\b", re.IGNORECASE, ) diff --git a/tests/fixtures/knowledge_base/analyser_kb/incident_auth.md b/tests/fixtures/knowledge_base/analyser_kb/incident_auth.md new file mode 100644 index 0000000..1e2a460 --- /dev/null +++ b/tests/fixtures/knowledge_base/analyser_kb/incident_auth.md @@ -0,0 +1,7 @@ +# Auth Failure — Example Log +label: incident | type: auth + +2024-01-17 09:45:33 ERROR KerberosAuthenticator: AuthenticationException: SASL authentication failed for user hive +2024-01-17 09:45:33 FATAL HiveServer2: Unable to obtain Kerberos TGT — ticket expired or missing keytab +2024-01-17 09:45:34 WARN HiveMetaStore: Connection refused — authentication rejected for principal aria-svc@REALM.COM +2024-01-17 09:45:35 ERROR ZooKeeperClient: Session expired — Kerberos credentials invalid diff --git a/tests/fixtures/knowledge_base/analyser_kb/incident_disk.md b/tests/fixtures/knowledge_base/analyser_kb/incident_disk.md new file mode 100644 index 0000000..e2f7e64 --- /dev/null +++ b/tests/fixtures/knowledge_base/analyser_kb/incident_disk.md @@ -0,0 +1,7 @@ +# Disk Full — Example Log +label: incident | type: disk + +2024-01-16 14:22:11 FATAL NameNode: DiskOutOfSpaceException: No space left for replica block blk_1073741825 +2024-01-16 14:22:11 ERROR NameNode: HDFS entering safe mode — not enough DataNodes reporting healthy +2024-01-16 14:22:12 WARN DataNode: Failed to append to /var/log/hadoop/hdfs — disk quota exceeded +2024-01-16 14:22:13 ERROR DataNode: IOException: No space left on device /data/hdfs/dn diff --git a/tests/fixtures/knowledge_base/analyser_kb/incident_oom.md b/tests/fixtures/knowledge_base/analyser_kb/incident_oom.md new file mode 100644 index 0000000..b9b2d4d --- /dev/null +++ b/tests/fixtures/knowledge_base/analyser_kb/incident_oom.md @@ -0,0 +1,7 @@ +# OOM — Example Log +label: incident | type: oom + +2024-01-15 03:12:44 FATAL NodeManager: java.lang.OutOfMemoryError: Java heap space +2024-01-15 03:12:44 ERROR NodeManager: Container killed by YARN for exceeding memory limits — exitCode=137 +2024-01-15 03:12:45 WARN ResourceManager: Node cdp-data-01 marked unhealthy — insufficient memory +2024-01-15 03:12:46 ERROR NodeManager: GC overhead limit exceeded in yarn_nm_app_log_dir diff --git a/tests/fixtures/knowledge_base/analyser_kb/incident_yarn_safemode.md b/tests/fixtures/knowledge_base/analyser_kb/incident_yarn_safemode.md new file mode 100644 index 0000000..dc01cb6 --- /dev/null +++ b/tests/fixtures/knowledge_base/analyser_kb/incident_yarn_safemode.md @@ -0,0 +1,7 @@ +# YARN / HDFS Safe Mode — Example Log +label: incident | type: pipeline + +2024-01-18 22:10:05 FATAL NameNode: HDFS is in safe mode — replica count below minimum threshold +2024-01-18 22:10:05 ERROR NameNode: Only 2 of 3 DataNodes reporting — waiting for data node recovery +2024-01-18 22:10:06 WARN ResourceManager: YARN cluster degraded — 1 node lost, rescheduling containers +2024-01-18 22:10:07 ERROR JobHistoryServer: Cannot write to /var/log/hadoop/yarn — HDFS safe mode active diff --git a/tests/fixtures/knowledge_base/analyser_kb/ok_baseline.md b/tests/fixtures/knowledge_base/analyser_kb/ok_baseline.md new file mode 100644 index 0000000..32ea258 --- /dev/null +++ b/tests/fixtures/knowledge_base/analyser_kb/ok_baseline.md @@ -0,0 +1,7 @@ +# Healthy Baseline — Example Log +label: ok + +2024-01-19 08:00:01 INFO NameNode: Block report from DataNode cdp-data-01 — 12500 blocks registered +2024-01-19 08:00:02 INFO ResourceManager: NodeManager cdp-data-01 heartbeat — 16GB available, 4 cores free +2024-01-19 08:00:03 INFO DataNode: Successfully replicated block blk_1073741826 to 3 nodes +2024-01-19 08:00:04 INFO HiveServer2: Query completed in 1.23s — rows returned: 4521 diff --git a/tests/fixtures/knowledge_base/hdfs_namenode.md b/tests/fixtures/knowledge_base/hdfs_namenode.md deleted file mode 100644 index 9bd0201..0000000 --- a/tests/fixtures/knowledge_base/hdfs_namenode.md +++ /dev/null @@ -1,23 +0,0 @@ -# HDFS NameNode Runbook - -service: hdfs-namenode -platform: cdp -cluster: cdp-cluster - -## Common errors - -- OutOfMemory: NameNode heap exhausted -- ERROR HDFS disk full: DataNode storage exhausted -- FATAL NameNode entering safe mode -- connection refused: NameNode RPC port 8020 unreachable -- timeout connecting to namenode - -## Log paths - -/var/log/hadoop-hdfs/hadoop-hdfs-namenode.log -/var/log/hadoop-hdfs/hadoop-hdfs-secondarynamenode.log -/var/log/hadoop/hdfs/namenode.log - -## Keywords - -ERROR WARN FATAL OOM OutOfMemory disk full HDFS NameNode safe mode timeout diff --git a/tests/fixtures/knowledge_base/resource_kb/aria_uc2_cluster.md b/tests/fixtures/knowledge_base/resource_kb/aria_uc2_cluster.md new file mode 100644 index 0000000..2080809 --- /dev/null +++ b/tests/fixtures/knowledge_base/resource_kb/aria_uc2_cluster.md @@ -0,0 +1,15 @@ +# aria-uc2-cluster (UC2 — GCP Dataproc) +Platform: GCP Dataproc + +## Physical Resources +None — fully managed GCP service. + +## Logical Resources + +### cloud_dataproc_cluster +Log source: Cloud Logging API +resource.type: cloud_dataproc_cluster + +### cloud_dataproc_job +Log source: Cloud Logging API +resource.type: cloud_dataproc_job diff --git a/tests/fixtures/knowledge_base/resource_kb/cdp_cluster.md b/tests/fixtures/knowledge_base/resource_kb/cdp_cluster.md new file mode 100644 index 0000000..7d6761f --- /dev/null +++ b/tests/fixtures/knowledge_base/resource_kb/cdp_cluster.md @@ -0,0 +1,36 @@ +# cdp-cluster (UC1 — Hadoop on-prem VMs) +Platform: CDP + +## Physical Resources + +### cdp-master-01 +IP: POPULATE_FROM_TF_OUTPUT +Log paths: + - /var/log/hadoop/hdfs + - /var/log/hadoop/yarn + +### cdp-data-01 +IP: POPULATE_FROM_TF_OUTPUT +Log paths: + - /var/log/hadoop/hdfs + - /var/log/hadoop/yarn + +### cdp-data-02 +IP: POPULATE_FROM_TF_OUTPUT +Log paths: + - /var/log/hadoop/hdfs + - /var/log/hadoop/yarn + +### cdp-utility-01 +IP: POPULATE_FROM_TF_OUTPUT +Log paths: + - /var/log/hive + - /var/log/spark + - /var/log/oozie + - /var/log/nifi + +### cdp-bus-01 +IP: POPULATE_FROM_TF_OUTPUT +Log paths: + - /var/log/kafka/server.log + - /var/log/zookeeper/zookeeper.log diff --git a/tests/fixtures/knowledge_base/resource_kb/gcp_native.md b/tests/fixtures/knowledge_base/resource_kb/gcp_native.md new file mode 100644 index 0000000..62ac518 --- /dev/null +++ b/tests/fixtures/knowledge_base/resource_kb/gcp_native.md @@ -0,0 +1,4 @@ +# gcp-native (UC3 — GCP native services) +Platform: GCP +Services: BigQuery, Cloud Functions, Pub/Sub, GCS +Status: No connector implemented — graceful degradation. diff --git a/tests/fixtures/knowledge_base/yarn_resourcemanager.md b/tests/fixtures/knowledge_base/yarn_resourcemanager.md deleted file mode 100644 index aa2d02b..0000000 --- a/tests/fixtures/knowledge_base/yarn_resourcemanager.md +++ /dev/null @@ -1,23 +0,0 @@ -# YARN ResourceManager Runbook - -service: yarn-resourcemanager -platform: cdp -cluster: cdp-cluster - -## Common errors - -- OutOfMemory: ResourceManager heap too small -- ERROR YARN container allocation failure -- WARN NodeManager lost contact -- connection refused: ResourceManager port 8032 -- disk full on NodeManager local dirs - -## Log paths - -/var/log/hadoop-yarn/yarn-yarn-resourcemanager.log -/var/log/hadoop-yarn/yarn-yarn-nodemanager.log -/var/log/hadoop/yarn/resourcemanager.log - -## Keywords - -ERROR WARN FATAL OOM OutOfMemory YARN ResourceManager NodeManager container disk full timeout diff --git a/tests/unit/test_classifier.py b/tests/unit/test_classifier.py index 766ffa0..54a9e43 100644 --- a/tests/unit/test_classifier.py +++ b/tests/unit/test_classifier.py @@ -274,3 +274,78 @@ def test_log_request_field_absent_classifies_normally() -> None: assert state.classification is not None assert state.classification.error_class == "oom" assert state.pending_log_request is None + + +# ── Retry logic (#83) ───────────────────────────────────────────────────────── + + +def test_llm_retry_succeeds_on_second_attempt() -> None: + """Transient LLM failure recovers on retry — ClassificationError is not raised (#83).""" + llm = MagicMock(spec=LLMClientInterface) + llm.complete.side_effect = [ + Exception("503 Service Unavailable"), + json.dumps(_oom_response()), + ] + + with __import__("unittest.mock", fromlist=["patch"]).patch("time.sleep"): + agent = ClassifierAgent(llm_client=llm) + state = agent.run(_make_state()) + + assert state.classification is not None + assert state.classification.error_class == "oom" + assert llm.complete.call_count == 2 + + +def test_llm_retry_exhausted_raises_classification_error() -> None: + """Two consecutive LLM failures raise ClassificationError after retry (#83).""" + llm = MagicMock(spec=LLMClientInterface) + llm.complete.side_effect = Exception("persistent failure") + + with __import__("unittest.mock", fromlist=["patch"]).patch("time.sleep"): + agent = ClassifierAgent(llm_client=llm) + with pytest.raises(ClassificationError, match="LLM call failed"): + agent.run(_make_state()) + + assert llm.complete.call_count == 2 + + +# ── Few-shot injection (analyser_kb) ───────────────────────────────────────── + + +def test_few_shot_examples_injected_in_message() -> None: + """analyser_kb examples appear in the user message when analyser_kb_dir is configured.""" + import os + + analyser_kb_dir = os.path.join( + os.path.dirname(__file__), "../fixtures/knowledge_base/analyser_kb" + ) + agent = ClassifierAgent( + llm_client=_mock_llm(_oom_response()), + analyser_kb_dir=analyser_kb_dir, + ) + messages = agent._build_messages(_make_state()) + content = messages[0]["content"] + + assert "Reference log examples" in content + assert "label: incident" in content + + +def test_no_analyser_kb_dir_produces_clean_message() -> None: + """Without analyser_kb_dir, the user message contains no few-shot section.""" + agent = ClassifierAgent(llm_client=_mock_llm(_oom_response())) + messages = agent._build_messages(_make_state()) + content = messages[0]["content"] + + assert "Reference log examples" not in content + + +def test_missing_analyser_kb_dir_degrades_gracefully() -> None: + """A non-existent analyser_kb_dir is silently ignored — no crash, no examples.""" + agent = ClassifierAgent( + llm_client=_mock_llm(_oom_response()), + analyser_kb_dir="/nonexistent/path/analyser_kb", + ) + messages = agent._build_messages(_make_state()) + content = messages[0]["content"] + + assert "Reference log examples" not in content diff --git a/tests/unit/test_file_kb.py b/tests/unit/test_file_kb.py index 340a581..5d38e3f 100644 --- a/tests/unit/test_file_kb.py +++ b/tests/unit/test_file_kb.py @@ -1,4 +1,12 @@ -"""Unit tests for FileKnowledgeBase (ARI-59).""" +"""Unit tests for FileKnowledgeBase (ARI-59). + +The KB fixture files are organised in two subfolders: + resource_kb/ — lean per-cluster resource catalogs (Agent 2) + analyser_kb/ — labeled log excerpts for few-shot prompting (Agent 3) + +These tests cover resource_kb only. Agent 3 few-shot loading is tested in +test_classifier.py. +""" import os @@ -8,7 +16,7 @@ from core.models import PlatformTag from implementations.knowledge_base.file_kb import FileKnowledgeBase -FIXTURE_DIR = os.path.join(os.path.dirname(__file__), "../fixtures/knowledge_base") +FIXTURE_DIR = os.path.join(os.path.dirname(__file__), "../fixtures/knowledge_base/resource_kb") @pytest.fixture @@ -17,75 +25,145 @@ def kb(): class TestFileKnowledgeBaseInit: - """Tests that FileKnowledgeBase initialises correctly from a directory of runbook files.""" + """FileKnowledgeBase initialisation.""" def test_loads_fixture_files(self, kb): - """Verify that two fixture runbook files are loaded on initialisation.""" - assert len(kb._files) == 2 + """Verify all resource_kb fixture files loaded (3 cluster files).""" + assert len(kb._files) == 3 def test_raises_on_missing_directory(self): - """Verify that a non-existent directory path raises KnowledgeBaseError.""" + """Non-existent directory path raises KnowledgeBaseError.""" with pytest.raises(KnowledgeBaseError): FileKnowledgeBase("/nonexistent/path") class TestGetServiceHints: - """Tests for FileKnowledgeBase.get_service_hints — keyword matching against runbook files.""" + """FileKnowledgeBase.get_service_hints — cluster resolution from incident description.""" - def test_returns_hdfs_for_hdfs_incident(self, kb): - """Verify that an HDFS disk-full description returns hdfs-namenode as the top hint.""" + def test_returns_cdp_cluster_for_cdp_incident(self, kb): + """CDP HDFS incident resolves to cdp-cluster as top hint.""" hints = kb.get_service_hints( - cluster="cdp-cluster-01", - description="HDFS NameNode disk full, safe mode triggered", + cluster="cdp-cluster", + description="HDFS NameNode disk full safe mode triggered", ) assert len(hints) > 0 - assert hints[0] == "hdfs-namenode" + assert hints[0] == "cdp-cluster" - def test_returns_yarn_for_yarn_incident(self, kb): - """Verify that a YARN OOM description returns yarn-resourcemanager as the top hint.""" + def test_returns_uc2_cluster_for_dataproc_incident(self, kb): + """GCP Dataproc incident resolves to aria-uc2-cluster as top hint.""" hints = kb.get_service_hints( - cluster="cdp-cluster-01", - description="YARN ResourceManager OutOfMemory NodeManager lost", + cluster="aria-uc2-cluster", + description="Dataproc job failed OutOfMemory Spark driver crash", ) assert len(hints) > 0 - assert hints[0] == "yarn-resourcemanager" + assert hints[0] == "aria-uc2-cluster" def test_returns_empty_on_no_match(self, kb): - """Verify that an unrecognised incident description returns an empty hint list.""" + """Unrecognised cluster/description returns empty hint list.""" hints = kb.get_service_hints( cluster="oracle-rac-01", - description="ORA-12541 tnsnames listener ora-prod-01 tablespace", + description="ORA-12541 tnsnames listener tablespace ora-prod-01", ) assert hints == [] class TestGetLogHints: - """Tests for FileKnowledgeBase.get_log_hints — log path and keyword extraction.""" + """FileKnowledgeBase.get_log_hints — log path and keyword extraction.""" - def test_returns_log_paths_for_hdfs(self, kb): - """Verify that the HDFS runbook yields at least one /var/log path.""" - hint = kb.get_log_hints("hdfs-namenode", PlatformTag.CDP) + def test_returns_log_paths_for_cdp_cluster(self, kb): + """CDP cluster runbook yields /var/log paths for all node types.""" + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) assert len(hint.log_paths) > 0 - assert all("/var/log" in p or ".log" in p for p in hint.log_paths) - - def test_returns_keywords_for_hdfs(self, kb): - """Verify that the HDFS runbook yields at least one keyword for log filtering.""" - hint = kb.get_log_hints("hdfs-namenode", PlatformTag.CDP) - assert len(hint.keywords) > 0 + assert any("/var/log" in p for p in hint.log_paths) + + def test_returns_no_failure_vocab_in_resource_kb(self, kb): + """resource_kb has no failure vocabulary — error-class patterns must not be extracted. + + Service/technology names (HDFS, YARN, Kafka) may appear because _KEYWORD_RE + matches them via log path text. That is acceptable — they are not error signals. + What must be absent is actual failure vocabulary: FATAL, OOM, OutOfMemory, etc. + """ + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) + failure_vocab = { + "OutOfMemory", + "FATAL", + "OOM", + "AuthenticationException", + "DiskOutOfSpaceException", + "GC overhead", + "disk full", + "connection refused", + "safe mode", + "timeout", + } + assert not any(k in failure_vocab for k in hint.keywords) def test_high_confidence_on_strong_match(self, kb): - """Verify that a well-matched service name yields a confidence of at least 0.5.""" - hint = kb.get_log_hints("hdfs-namenode", PlatformTag.CDP) + """Well-matched cluster name yields confidence >= 0.5.""" + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) assert hint.confidence >= 0.5 def test_returns_empty_hint_on_no_match(self, kb): - """Verify that an unrecognised service returns empty paths, keywords, and zero confidence.""" + """Unrecognised service returns empty paths, empty keywords, and zero confidence.""" hint = kb.get_log_hints("oracle-listener", PlatformTag.ORACLE) assert hint.log_paths == [] assert hint.keywords == [] assert hint.confidence == 0.0 def test_platform_tag_preserved(self, kb): - """Verify that the platform_tag supplied to get_log_hints is echoed in the returned hint.""" - hint = kb.get_log_hints("yarn-resourcemanager", PlatformTag.CDP) + """platform_tag supplied to get_log_hints is echoed in the returned hint.""" + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) assert hint.platform_tag == PlatformTag.CDP + + +class TestUC1ResourceAcceptance: + """Acceptance tests for UC1 (CDP cluster) resource_kb — validates all node log paths present. + + The single cdp_cluster.md file must enumerate log paths for all 5 UC1 nodes + so Agent 2 knows where to grep regardless of which node type is implicated. + """ + + def test_cdp_cluster_all_node_log_paths(self, kb): + """cdp-cluster resource entry covers log paths for all 5 UC1 node types.""" + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) + # NameNode / DataNode (master + data nodes) + assert any("/var/log/hadoop/hdfs" in p for p in hint.log_paths) + assert any("/var/log/hadoop/yarn" in p for p in hint.log_paths) + # Bus node + assert any("/var/log/kafka" in p for p in hint.log_paths) + assert any("/var/log/zookeeper" in p for p in hint.log_paths) + # Utility node + assert any("/var/log/hive" in p for p in hint.log_paths) + assert any("/var/log/spark" in p for p in hint.log_paths) + + def test_cdp_cluster_no_failure_vocabulary(self, kb): + """resource_kb must not contain error-class patterns — those belong in analyser_kb.""" + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) + failure_vocab = { + "OutOfMemory", + "FATAL", + "OOM", + "AuthenticationException", + "DiskOutOfSpaceException", + "GC overhead", + } + assert not any(k in failure_vocab for k in hint.keywords) + + +class TestUC2ResourceAcceptance: + """Acceptance tests for UC2 (GCP Dataproc) resource_kb — Cloud Logging API, no local paths.""" + + def test_uc2_cluster_resolves(self, kb): + """aria-uc2-cluster resolves to a result with non-zero confidence.""" + hint = kb.get_log_hints("aria-uc2-cluster", PlatformTag.GCP) + assert hint.confidence > 0 + + def test_uc2_cluster_no_local_paths(self, kb): + """Dataproc uses Cloud Logging API — no local log paths should be returned.""" + hint = kb.get_log_hints("aria-uc2-cluster", PlatformTag.GCP) + assert hint.log_paths == [] + + def test_uc2_cluster_no_failure_vocabulary(self, kb): + """resource_kb UC2 entry must not contain failure keywords.""" + hint = kb.get_log_hints("aria-uc2-cluster", PlatformTag.GCP) + assert hint.keywords == [] diff --git a/tests/unit/test_gcp_log_connector.py b/tests/unit/test_gcp_log_connector.py index 4a8ef8b..5fb141d 100644 --- a/tests/unit/test_gcp_log_connector.py +++ b/tests/unit/test_gcp_log_connector.py @@ -197,3 +197,39 @@ def test_vault_key_name_used(): connector.query_logs(_HOST, PlatformTag.GCP, _START, _END) vault.get_secret.assert_called_once_with("MY_GCP_SA") + + +# ── resource_types filter (UC2 Dataproc) ──────────────────────────────────── + + +def test_filter_resource_types_clause(): + """Verify that resource_types produces an OR-combined resource.type clause in the filter.""" + f = _build_filter( + _HOST, + _START, + _END, + keywords=None, + resource_types=["cloud_dataproc_cluster", "cloud_dataproc_job"], + ) + assert 'resource.type="cloud_dataproc_cluster"' in f + assert 'resource.type="cloud_dataproc_job"' in f + + +def test_filter_resource_types_includes_cluster_name_label(): + """Verify that cluster_name is included as a host label alias when resource_types is set.""" + f = _build_filter( + _HOST, + _START, + _END, + keywords=None, + resource_types=["cloud_dataproc_cluster"], + ) + assert f'resource.labels.cluster_name="{_HOST}"' in f + + +def test_filter_no_resource_types_preserves_existing_labels(): + """Verify that omitting resource_types leaves the filter unchanged (backward compat).""" + f_with = _build_filter(_HOST, _START, _END, keywords=None, resource_types=None) + f_without = _build_filter(_HOST, _START, _END, keywords=None) + assert f_with == f_without + assert "resource.type" not in f_with diff --git a/tests/unit/test_log_extractor.py b/tests/unit/test_log_extractor.py index 1b1902e..dc49dcc 100644 --- a/tests/unit/test_log_extractor.py +++ b/tests/unit/test_log_extractor.py @@ -4,7 +4,7 @@ from datetime import datetime from unittest.mock import MagicMock -from core.agents.log_extractor import LogExtractorAgent +from core.agents.log_extractor import LogExtractorAgent, _validate_log_paths from core.interfaces.knowledge_base import KnowledgeBaseInterface from core.interfaces.llm_client import LLMClientInterface from core.interfaces.log_store import LogStoreInterface @@ -418,3 +418,30 @@ def test_pending_log_request_unknown_ci_returns_state_unchanged(): connector.query_logs.assert_not_called() assert result.log_result is original_result # unchanged assert result.error is None + + +# ── _validate_log_paths (#85) ───────────────────────────────────────────────── + + +def test_validate_log_paths_keeps_safe_paths(): + """Paths under /var/log/ are kept unchanged.""" + paths = ["/var/log/hadoop/hdfs/hadoop.log", "/var/log/kafka/server.log"] + assert _validate_log_paths(paths) == paths + + +def test_validate_log_paths_drops_unsafe_paths(): + """Paths outside /var/log/ are dropped to prevent path-injection (#85).""" + paths = ["/etc/ssh/authorized_keys", "/home/aria/.ssh/id_rsa", "/var/log/hadoop/hdfs.log"] + result = _validate_log_paths(paths) + assert result == ["/var/log/hadoop/hdfs.log"] + + +def test_validate_log_paths_empty_list(): + """Empty input produces empty output.""" + assert _validate_log_paths([]) == [] + + +def test_validate_log_paths_all_unsafe_returns_empty(): + """All-unsafe input produces an empty list — connector receives no paths.""" + paths = ["/etc/passwd", "~/.bashrc", "../../../etc/shadow"] + assert _validate_log_paths(paths) == [] diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index efa99f2..bc120af 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock from core.agents.classifier import ClassifierAgent +from core.exceptions import ClassificationError from core.models import ( ClassificationResult, ConfidenceBand, @@ -259,3 +260,22 @@ def crashing_run(state: PipelineState) -> PipelineState: assert result.error is not None assert "unexpected crash" in result.error assert result.notification_sent is False + + +def test_agent3_classification_error_still_routes_to_agent4(): + """Verify Agent 4 runs even when Agent 3 raises ClassificationError (#83). + + A ClassificationError must not abort the pipeline — the notify-only guarantee + requires Agent 4 to always produce a notification (possibly a partial one with + the error surfaced). If Agent 4 is skipped, AC-06 cannot be met for any run + where Agent 3 fails due to a transient LLM error. + """ + a3 = MagicMock() + a3.run.side_effect = ClassificationError("LLM call failed: 503 Service Unavailable") + + pipeline = _make_pipeline(agent3=a3) + result = pipeline.run("INC0000001") + + assert result.notification_sent is True + assert result.error is not None + assert "LLM call failed" in result.error