From d1dfa41264b4a79e4d7c35cbc581ed4145b6cbaf Mon Sep 17 00:00:00 2001 From: Brm Date: Tue, 16 Jun 2026 13:48:39 +0000 Subject: [PATCH 1/6] feat(kb): add UC1 fixture runbooks for cdp-master, cdp-bus, cdp-utility nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cherry-picked content from PR #82 (Tobi-Adesoye, commit 85b658e). Only the three runbook files are brought in — the test regression from that commit (7 deleted KB unit tests) and the orphaned scripts/uc1_parser.py are intentionally excluded. These runbooks will be validated against actual TF log paths as part of #60. Co-Authored-By: Claude Sonnet 4.6 --- tests/fixtures/knowledge_base/cdp_bus.md | 11 +++++++++++ tests/fixtures/knowledge_base/cdp_master.md | 13 +++++++++++++ tests/fixtures/knowledge_base/cdp_utility.md | 15 +++++++++++++++ 3 files changed, 39 insertions(+) create mode 100644 tests/fixtures/knowledge_base/cdp_bus.md create mode 100644 tests/fixtures/knowledge_base/cdp_master.md create mode 100644 tests/fixtures/knowledge_base/cdp_utility.md diff --git a/tests/fixtures/knowledge_base/cdp_bus.md b/tests/fixtures/knowledge_base/cdp_bus.md new file mode 100644 index 0000000..8b322e4 --- /dev/null +++ b/tests/fixtures/knowledge_base/cdp_bus.md @@ -0,0 +1,11 @@ +# CDP Bus Data Pipeline Runbook +This runbook configures messaging brokers and event buses running on the cdp bus tier. + +## Log Paths +Message streaming components write operational logs here: +* Kafka Engine Event Stream: /var/log/kafka/server.log +* ZooKeeper Coordination Cluster Log: /var/log/zookeeper/zookeeper.log + +## Target Error Keywords +Monitor trace streams for active ingestion or validation failures: +* timeout \ No newline at end of file diff --git a/tests/fixtures/knowledge_base/cdp_master.md b/tests/fixtures/knowledge_base/cdp_master.md new file mode 100644 index 0000000..09b9a95 --- /dev/null +++ b/tests/fixtures/knowledge_base/cdp_master.md @@ -0,0 +1,13 @@ +# CDP Master Node Runbook +This runbook configures metadata and monitoring layouts for the cdp master server layer. + +## Log Paths +The automated cluster runtime maps its principal processing logs to these locations: +* HDFS Storage Core Log: /var/log/hadoop/hdfs/hdfs-daemon.log +* YARN Resource Manager Log: /var/log/hadoop/yarn/yarn-daemon.log + +## Target Error Keywords +During node metric anomalies, prioritize matching these infrastructure fault patterns: +* OutOfMemory +* FATAL +* Connection refused \ No newline at end of file diff --git a/tests/fixtures/knowledge_base/cdp_utility.md b/tests/fixtures/knowledge_base/cdp_utility.md new file mode 100644 index 0000000..41f0620 --- /dev/null +++ b/tests/fixtures/knowledge_base/cdp_utility.md @@ -0,0 +1,15 @@ +# CDP Operational Utility Runbook +This runbook maps operational metadata and runtime environments for the cdp utility worker node. + +## Log Paths +Analytics and workflow orchestration execution engines map log targets here: +* Apache Hive Metastore Instance: /var/log/hive/hive.log +* Apache Spark Cluster Engine: /var/log/spark/spark.log +* Oozie Coordinator Engine: /var/log/oozie/oozie.log +* Apache NiFi Flow Manager: /var/log/nifi/nifi.log + +## Target Error Keywords +Isolate runtime engine query and execution exceptions using these tracking strings: +* Hive +* Spark +* Oozie \ No newline at end of file From dae9f8475bf5addcb2f9b85df3e6a2eb62a1d2b5 Mon Sep 17 00:00:00 2001 From: Brm Date: Tue, 16 Jun 2026 20:06:05 +0000 Subject: [PATCH 2/6] =?UTF-8?q?feat(s4):=20testing=20infrastructure=20wiri?= =?UTF-8?q?ng=20=E2=80=94=20KB,=20connectors,=20config,=20correctness?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #59 — cluster_hosts.json restructured for UC1 TF node names (cdp-master-01, cdp-data-01/02, cdp-utility-01, cdp-bus-01). IPs placeholder until TF apply. Closes #60 — UC1 KB runbooks validated and enriched against TF log paths. _KEYWORD_RE extended with Kafka, ZooKeeper, NiFi, AuthenticationException, DiskOutOfSpaceException, GC overhead. test_file_kb.py: restored original 8 tests, updated fixture count (2→8), added TestUC1RunbookAcceptance (3 tests). Closes #61 — cdp_ssh_key_secret() config option added (core/config.py); SSH key vault key now configurable via conf.yaml cdp.ssh_key_secret, default CDP_SSH_KEY unchanged. api/dependencies.py wired to cfg.cdp_ssh_key_secret(). conf_template.yaml annotated with UC1 TF secret alignment guidance and full TF log dir paths. Closes #62 — GCPLogConnector: resource_types param adds resource.type OR-clause and cluster_name host label alias for Dataproc. api/dependencies.py sets ['cloud_dataproc_cluster', 'cloud_dataproc_job'] for UC2. 3 new filter tests. Closes #63 — UC2 Dataproc KB runbooks (dataproc_cluster.md, dataproc_job.md) added. TestUC2RunbookAcceptance (2 tests). Closes #64 — gcp_native.md added as UC3 graceful degradation marker. Agent 2 returns LOW confidence / empty logs for native GCP services; Agent 4 notifies with gap message. Closes #85 — _validate_log_paths() in log_extractor.py: drops LLM-planned paths outside /var/log/ before passing to connectors. 4 new unit tests. Closes #83 — ClassificationError caught in _agent3_node (pipeline.py): Agent 4 now always runs, notify-only guarantee preserved. ClassifierAgent adds 1 retry + 1s sleep before raising. 1 retry test + 1 pipeline resilience test added. 309 unit tests green (was 294, +15 new). Co-Authored-By: Claude Sonnet 4.6 --- api/dependencies.py | 9 ++- conf_template.yaml | 20 ++++-- core/agents/classifier.py | 39 +++++++++--- core/agents/log_extractor.py | 29 ++++++++- core/config.py | 14 +++++ core/orchestrator/pipeline.py | 30 +++++++-- data/cluster_hosts.json | 20 ++---- .../clusters/cloud/gcp/log_connector.py | 30 ++++++++- implementations/knowledge_base/file_kb.py | 4 +- tests/fixtures/knowledge_base/cdp_bus.md | 17 +++-- tests/fixtures/knowledge_base/cdp_master.md | 17 +++-- tests/fixtures/knowledge_base/cdp_utility.md | 23 ++++--- .../knowledge_base/dataproc_cluster.md | 20 ++++++ tests/fixtures/knowledge_base/dataproc_job.md | 20 ++++++ tests/fixtures/knowledge_base/gcp_native.md | 21 +++++++ tests/unit/test_classifier.py | 33 ++++++++++ tests/unit/test_file_kb.py | 62 ++++++++++++++++++- tests/unit/test_gcp_log_connector.py | 36 +++++++++++ tests/unit/test_log_extractor.py | 29 ++++++++- tests/unit/test_pipeline.py | 20 ++++++ 20 files changed, 429 insertions(+), 64 deletions(-) create mode 100644 tests/fixtures/knowledge_base/dataproc_cluster.md create mode 100644 tests/fixtures/knowledge_base/dataproc_job.md create mode 100644 tests/fixtures/knowledge_base/gcp_native.md diff --git a/api/dependencies.py b/api/dependencies.py index 12ce1fd..3d07601 100644 --- a/api/dependencies.py +++ b/api/dependencies.py @@ -240,12 +240,17 @@ def get_agent2() -> LogExtractorAgent: registry = { PlatformTag.CDP: SSHLogConnector( vault, - ssh_key_secret="CDP_SSH_KEY", + ssh_key_secret=cfg.cdp_ssh_key_secret(), ssh_user=cfg.cdp_ssh_user(), host_key_secret="CDP_HOST_KEY" if os.environ.get("CDP_HOST_KEY") else None, log_dirs=cfg.cdp_log_dirs(), ), - PlatformTag.GCP: GCPLogConnector(vault), + # resource_types scopes Cloud Logging queries to Dataproc cluster and job logs (UC2). + # S6 will generalise this into a configurable resource_type_templates dict. + PlatformTag.GCP: GCPLogConnector( + vault, + resource_types=["cloud_dataproc_cluster", "cloud_dataproc_job"], + ), } llm = None model = _resolve_model("2") diff --git a/conf_template.yaml b/conf_template.yaml index fc8b2f3..42259a6 100644 --- a/conf_template.yaml +++ b/conf_template.yaml @@ -56,12 +56,24 @@ gcp: # only required when connectors.log = gcp cdp: ssh_user: hadoop # OS user with read access to log directories below - log_dirs: # directories searched for logs on CDP cluster nodes - - /var/log/hadoop-hdfs # leave unset to use these defaults - - /var/log/hadoop-yarn + ssh_key_secret: CDP_SSH_KEY # vault key name for the SSH private key PEM. + # EnvVarVault: reads os.environ[ssh_key_secret] directly. + # GCPSecretManagerVault: normalises underscores→hyphens and + # prepends 'aria-', so 'CDP_SSH_KEY' → GCP secret 'aria-cdp-ssh-key'. + # UC1 (TF): TF provisions secret 'aria-uc1-ssh-private-key'. + # Set ssh_key_secret: CDP_UC1_SSH_PRIVATE_KEY to match, or + # rename the TF secret to 'aria-cdp-ssh-key'. + log_dirs: # directories searched for logs on CDP cluster nodes. + # UC1 (TF Hadoop VMs) use subdirectory paths — set these + # at deployment time from TF startup script: + - /var/log/hadoop/hdfs # TF UC1 NameNode / DataNode logs + - /var/log/hadoop/yarn # TF UC1 ResourceManager / NodeManager logs - /var/log/hive - - /var/log/oozie - /var/log/spark + - /var/log/kafka + - /var/log/zookeeper + - /var/log/oozie + - /var/log/nifi slack: channel_id: # channel where Agent 4 posts notifications diff --git a/core/agents/classifier.py b/core/agents/classifier.py index 41e1495..55386c3 100644 --- a/core/agents/classifier.py +++ b/core/agents/classifier.py @@ -10,6 +10,7 @@ """ import json +import time from typing import Any from core.exceptions import ClassificationError @@ -109,16 +110,36 @@ def run(self, state: PipelineState) -> PipelineState: logger.info("classifier: running for %s", state.incident_number) messages = self._build_messages(state) - try: - raw = self._llm.complete( - messages, - system=_SYSTEM_PROMPT, - temperature=0.0, - max_tokens=1024, + # Retry once with a 1-second backoff before surfacing as ClassificationError (#83). + # A transient LLM failure must not kill Agent 4 — the notify-only guarantee requires + # Agent 4 to always run, even when Agent 3 cannot classify. + _llm_exc: Exception | None = None + for attempt in range(2): + try: + raw = self._llm.complete( + messages, + system=_SYSTEM_PROMPT, + temperature=0.0, + max_tokens=1024, + ) + _llm_exc = None + break + except Exception as exc: + _llm_exc = exc + if attempt == 0: + logger.warning( + "classifier: LLM call failed for %s (attempt 1/2), retrying: %s", + state.incident_number, + exc, + ) + time.sleep(1) + if _llm_exc is not None: + logger.error( + "classifier: LLM call failed for %s after 2 attempts: %s", + state.incident_number, + _llm_exc, ) - except Exception as exc: - logger.error("classifier: LLM call failed for %s: %s", state.incident_number, exc) - raise ClassificationError(f"LLM call failed: {exc}") from exc + raise ClassificationError(f"LLM call failed: {_llm_exc}") from _llm_exc try: classification, log_request = self._parse_response(raw) diff --git a/core/agents/log_extractor.py b/core/agents/log_extractor.py index f901cac..a08400c 100644 --- a/core/agents/log_extractor.py +++ b/core/agents/log_extractor.py @@ -242,9 +242,12 @@ def _plan_with_llm(self, state: PipelineState) -> LogQueryPlan: ) data = json.loads(response) + raw_paths = list(data.get("log_paths", [])) return LogQueryPlan( connector_name=str(data["connector_name"]), - log_paths=list(data.get("log_paths", [])), + # Validate LLM-planned paths against safe prefixes to prevent path-injection + # from adversarial log content coercing the LLM toward /etc/ or ~/ targets (#85). + log_paths=_validate_log_paths(raw_paths), keywords=list(data.get("keywords", [])), time_window_minutes=int(data.get("time_window_minutes", _DEFAULT_WINDOW)), reasoning=str(data.get("reasoning", "")), @@ -496,3 +499,27 @@ def _empty(host: str, platform_tag: PlatformTag) -> LogQueryResult: total_scanned=0, confidence=ConfidenceBand.LOW, ) + + +# Allowed log path prefixes for LLM-planned paths (#85). +# Paths outside these prefixes are dropped before being passed to connectors. +# Mirrors the CI-name allowlist used for host resolution in _resolve_ci_from_request(). +_SAFE_LOG_PREFIXES: tuple[str, ...] = ("/var/log/",) + + +def _validate_log_paths(paths: list[str]) -> list[str]: + """Filter LLM-planned log paths to known-safe directory prefixes. + + Prevents adversarial log content from coercing the LLM into planning paths + that point outside log directories (e.g. /etc/ssh/, ~/.ssh/). Any path that + does not start with an allowed prefix is dropped with a warning. + """ + safe = [p for p in paths if any(p.startswith(prefix) for prefix in _SAFE_LOG_PREFIXES)] + dropped = len(paths) - len(safe) + if dropped: + logger.warning( + "Agent 2: dropped %d LLM-planned log path(s) outside allowed prefixes %s", + dropped, + _SAFE_LOG_PREFIXES, + ) + return safe diff --git a/core/config.py b/core/config.py index bb70801..f6572d0 100644 --- a/core/config.py +++ b/core/config.py @@ -141,6 +141,20 @@ def cdp_ssh_user() -> str: return _get(["cdp", "ssh_user"], "CDP_SSH_USER", "hadoop") +def cdp_ssh_key_secret() -> str: + """Return the vault key name used to retrieve the CDP SSH private key. + + For EnvVarVault: the key name is read directly from the environment. + For GCPSecretManagerVault: underscores are normalised to hyphens and an + 'aria-' prefix is added — e.g. 'CDP_SSH_KEY' → GCP secret 'aria-cdp-ssh-key'. + The TF-provisioned secret name for UC1 is 'aria-uc1-ssh-private-key', which + requires setting cdp.ssh_key_secret: CDP_UC1_SSH_PRIVATE_KEY in conf.yaml + (resolves to 'aria-cdp-uc1-ssh-private-key') or renaming the TF secret. + Defaults to 'CDP_SSH_KEY' (backward-compatible with pre-S4 deployments). + """ + return _get(["cdp", "ssh_key_secret"], "CDP_SSH_KEY_SECRET", "CDP_SSH_KEY") + + def cdp_log_dirs() -> list[str]: """Return directories to search for logs on CDP cluster nodes. diff --git a/core/orchestrator/pipeline.py b/core/orchestrator/pipeline.py index b8fe49d..d8b1eaf 100644 --- a/core/orchestrator/pipeline.py +++ b/core/orchestrator/pipeline.py @@ -21,6 +21,7 @@ from langgraph.graph.state import CompiledStateGraph import core.config as cfg +from core.exceptions import ClassificationError from core.interfaces.run_state_store import RunStateStoreInterface from core.interfaces.run_store import RunStoreInterface from core.logging_config import configure_logging @@ -129,13 +130,30 @@ def _agent2_node(self, state: PipelineState) -> dict: } def _agent3_node(self, state: PipelineState) -> dict: - """LangGraph node wrapper for Agent 3. Returns classification and any pending log request.""" + """LangGraph node wrapper for Agent 3. Returns classification and any pending log request. + + ClassificationError is caught here rather than propagated — if Agent 3 cannot + classify, the error is recorded in state and the pipeline routes to Agent 4 so + the notify-only guarantee is never violated (#83). + """ self._track_agent(state, "agent3") - result = self._agent3.run(state) - return { - "classification": result.classification, - "pending_log_request": result.pending_log_request, - } + try: + result = self._agent3.run(state) + return { + "classification": result.classification, + "pending_log_request": result.pending_log_request, + } + except ClassificationError as exc: + logger.error( + "agent3: ClassificationError for %s — routing to agent4 with error: %s", + state.incident_number, + exc, + ) + return { + "classification": None, + "pending_log_request": None, + "error": str(exc), + } def _agent4_node(self, state: PipelineState) -> dict: """LangGraph node wrapper for Agent 4. Returns notification_sent and any delivery error.""" diff --git a/data/cluster_hosts.json b/data/cluster_hosts.json index 70ae954..d53a69f 100644 --- a/data/cluster_hosts.json +++ b/data/cluster_hosts.json @@ -1,16 +1,8 @@ { - "_comment": "CI name → IP mapping for Agent 2 ReAct loop. Used when Agent 3 requests logs from a service different from the primary affected_ci. All test CIs resolve to localhost (127.0.0.1) since this VPS simulates the full cluster.", - "cdp-dn-01": "127.0.0.1", - "cdp-nn-01": "127.0.0.1", - "cdp-hms-01": "127.0.0.1", - "cdp-rm-01": "127.0.0.1", - "cdp-zk-01": "127.0.0.1", - "cdp-dn-02": "127.0.0.1", - "cdp-nn-02": "127.0.0.1", - "cdp-hs2-02": "127.0.0.1", - "cdp-dn-03": "127.0.0.1", - "cdp-cluster-prod": "127.0.0.1", - "cdp-rm-02": "127.0.0.1", - "cdp-hms-02": "127.0.0.1", - "cdp-hs2-03": "127.0.0.1" + "_comment": "CI name → IP mapping for Agent 2 ReAct loop cross-service resolution. Keys must match the CI names in ServiceNow CMDB. IPs are populated from Terraform output after UC1 cluster apply: terraform -chdir=infra/terraform/uc_testing/uc1-hadoop-onprem output -json node_internal_ips", + "cdp-master-01": "POPULATE_FROM_TF_OUTPUT", + "cdp-data-01": "POPULATE_FROM_TF_OUTPUT", + "cdp-data-02": "POPULATE_FROM_TF_OUTPUT", + "cdp-utility-01": "POPULATE_FROM_TF_OUTPUT", + "cdp-bus-01": "POPULATE_FROM_TF_OUTPUT" } diff --git a/implementations/clusters/cloud/gcp/log_connector.py b/implementations/clusters/cloud/gcp/log_connector.py index 366abd0..c0836a6 100644 --- a/implementations/clusters/cloud/gcp/log_connector.py +++ b/implementations/clusters/cloud/gcp/log_connector.py @@ -54,6 +54,7 @@ def __init__( vault: VaultInterface, sa_key_secret: str = "GCP_SA_JSON", project_id: str | None = None, + resource_types: list[str] | None = None, ) -> None: """Initialise the GCP log connector. @@ -63,10 +64,17 @@ def __init__( Defaults to 'GCP_SA_JSON'. project_id: Override the GCP project ID. When None, the project_id field from the service account JSON is used instead. + resource_types: Optional list of resource.type values to scope Cloud Logging + queries (e.g. ['cloud_dataproc_cluster', 'cloud_dataproc_job'] + for UC2 Dataproc). When provided, a resource.type OR-clause is + added to the filter and resource.labels.cluster_name is included + as an additional host label alias. S6 will replace this with a + configurable resource_type_templates dict. """ self._vault = vault self._sa_key_secret = sa_key_secret self._project_id = project_id + self._resource_types = resource_types # ── LogStoreInterface ───────────────────────────────────────────────────── @@ -112,7 +120,7 @@ def query_logs( logger.warning("GCPLogConnector auth failed: %s", exc) raise LogStoreUnavailableError(f"GCP Cloud Logging auth failed: {exc}") from exc - filter_str = _build_filter(host, start_time, end_time, keywords) + filter_str = _build_filter(host, start_time, end_time, keywords, self._resource_types) logger.debug("GCPLogConnector filter: %s", filter_str) try: @@ -192,11 +200,18 @@ def _escape_filter_string(value: str) -> str: def _build_filter( - host: str, start_time: datetime, end_time: datetime, keywords: list[str] | None + host: str, + start_time: datetime, + end_time: datetime, + keywords: list[str] | None, + resource_types: list[str] | None = None, ) -> str: """Build a GCP Cloud Logging filter string for the given host, time window, and keywords. Filters on timestamp range, severity >= WARNING, and resource labels for the host. + When resource_types is provided, an OR-combined resource.type clause is added and + resource.labels.cluster_name is included as an additional host label alias (required + for Dataproc, which uses cluster_name rather than instance_id). Keywords are applied as textPayload contains clauses (OR-combined, max 10, max 100 chars each). The host value is validated against a safe character regex before being embedded in the filter to prevent log filter injection. @@ -206,6 +221,8 @@ def _build_filter( start_time: Start of the query window. end_time: End of the query window. keywords: Optional list of keywords to match in the log text. + resource_types: Optional list of resource.type values to scope the query + (e.g. ['cloud_dataproc_cluster', 'cloud_dataproc_job']). Returns: A GCP log filter string ready for list_entries(filter_=...). @@ -218,14 +235,21 @@ def _build_filter( f'timestamp <= "{_rfc3339(end_time)}"', "severity >= WARNING", ] + if resource_types: + safe_types = [_escape_filter_string(rt) for rt in resource_types if rt] + if safe_types: + rt_parts = " OR ".join(f'resource.type="{rt}"' for rt in safe_types) + parts.append(f"({rt_parts})") if host: if not _SAFE_HOST_RE.match(host): raise ValueError(f"Invalid host value for GCP filter: {host!r}") safe_host = _escape_filter_string(host) + # cluster_name is included for Dataproc; instance_id/pod_name/job_id for VMs/GKE/CF. parts.append( f'(resource.labels.instance_id="{safe_host}" OR ' f'resource.labels.pod_name="{safe_host}" OR ' - f'resource.labels.job_id="{safe_host}")' + f'resource.labels.job_id="{safe_host}" OR ' + f'resource.labels.cluster_name="{safe_host}")' ) if keywords: safe_kws = [_escape_filter_string(k) for k in keywords[:10] if k and len(k) <= 100] diff --git a/implementations/knowledge_base/file_kb.py b/implementations/knowledge_base/file_kb.py index bd3b382..4f5cfaa 100644 --- a/implementations/knowledge_base/file_kb.py +++ b/implementations/knowledge_base/file_kb.py @@ -23,7 +23,9 @@ _LOG_PATH_RE = re.compile(r"/[\w./-]*\.log[\w/.-]*|/var/log/[\w/.-]+") _KEYWORD_RE = re.compile( r"\b(ERROR|WARN|FATAL|OOM|OutOfMemory|disk full|timeout|" - r"connection refused|HDFS|YARN|Hive|Spark|Oozie|safe mode)\b", + r"connection refused|HDFS|YARN|Hive|Spark|Oozie|safe mode|" + r"Kafka|ZooKeeper|NiFi|AuthenticationException|DiskOutOfSpaceException|" + r"GC overhead)\b", re.IGNORECASE, ) diff --git a/tests/fixtures/knowledge_base/cdp_bus.md b/tests/fixtures/knowledge_base/cdp_bus.md index 8b322e4..3dd1aa2 100644 --- a/tests/fixtures/knowledge_base/cdp_bus.md +++ b/tests/fixtures/knowledge_base/cdp_bus.md @@ -1,11 +1,16 @@ # CDP Bus Data Pipeline Runbook -This runbook configures messaging brokers and event buses running on the cdp bus tier. +This runbook covers the cdp bus node layer (Kafka broker, ZooKeeper ensemble). +TF node name: cdp-bus-01. NiFi is co-located on the bus node in some deployments. ## Log Paths -Message streaming components write operational logs here: -* Kafka Engine Event Stream: /var/log/kafka/server.log -* ZooKeeper Coordination Cluster Log: /var/log/zookeeper/zookeeper.log +* Kafka broker: /var/log/kafka/server.log +* ZooKeeper: /var/log/zookeeper/zookeeper.log ## Target Error Keywords -Monitor trace streams for active ingestion or validation failures: -* timeout \ No newline at end of file +Monitor for messaging and coordination layer failures: +* Kafka +* ZooKeeper +* timeout +* FATAL +* Connection refused +* AuthenticationException diff --git a/tests/fixtures/knowledge_base/cdp_master.md b/tests/fixtures/knowledge_base/cdp_master.md index 09b9a95..0f6093a 100644 --- a/tests/fixtures/knowledge_base/cdp_master.md +++ b/tests/fixtures/knowledge_base/cdp_master.md @@ -1,13 +1,18 @@ # CDP Master Node Runbook -This runbook configures metadata and monitoring layouts for the cdp master server layer. +This runbook covers the cdp master node layer (NameNode, ResourceManager, HiveServer2). +TF node name: cdp-master-01. Log paths use the TF-provisioned subdirectory layout. ## Log Paths -The automated cluster runtime maps its principal processing logs to these locations: -* HDFS Storage Core Log: /var/log/hadoop/hdfs/hdfs-daemon.log -* YARN Resource Manager Log: /var/log/hadoop/yarn/yarn-daemon.log +* HDFS NameNode: /var/log/hadoop/hdfs/hdfs-daemon.log +* YARN ResourceManager: /var/log/hadoop/yarn/yarn-daemon.log ## Target Error Keywords -During node metric anomalies, prioritize matching these infrastructure fault patterns: +Match these fault patterns during HDFS and YARN incident triage: * OutOfMemory +* DiskOutOfSpaceException * FATAL -* Connection refused \ No newline at end of file +* WARN +* Connection refused +* AuthenticationException +* GC overhead +* safe mode diff --git a/tests/fixtures/knowledge_base/cdp_utility.md b/tests/fixtures/knowledge_base/cdp_utility.md index 41f0620..1b01e51 100644 --- a/tests/fixtures/knowledge_base/cdp_utility.md +++ b/tests/fixtures/knowledge_base/cdp_utility.md @@ -1,15 +1,20 @@ -# CDP Operational Utility Runbook -This runbook maps operational metadata and runtime environments for the cdp utility worker node. +# CDP Utility Node Runbook +This runbook covers the cdp utility node layer (Hive Metastore, Spark History, Oozie, NiFi). +TF node name: cdp-utility-01. ## Log Paths -Analytics and workflow orchestration execution engines map log targets here: -* Apache Hive Metastore Instance: /var/log/hive/hive.log -* Apache Spark Cluster Engine: /var/log/spark/spark.log -* Oozie Coordinator Engine: /var/log/oozie/oozie.log -* Apache NiFi Flow Manager: /var/log/nifi/nifi.log +* Hive Metastore: /var/log/hive/hive.log +* Spark History Server: /var/log/spark/spark.log +* Oozie Coordinator: /var/log/oozie/oozie.log +* NiFi Flow Manager: /var/log/nifi/nifi.log ## Target Error Keywords -Isolate runtime engine query and execution exceptions using these tracking strings: +Isolate runtime engine query and execution failures using these patterns: * Hive * Spark -* Oozie \ No newline at end of file +* Oozie +* NiFi +* FATAL +* OOM +* OutOfMemory +* timeout diff --git a/tests/fixtures/knowledge_base/dataproc_cluster.md b/tests/fixtures/knowledge_base/dataproc_cluster.md new file mode 100644 index 0000000..f96cf38 --- /dev/null +++ b/tests/fixtures/knowledge_base/dataproc_cluster.md @@ -0,0 +1,20 @@ +# GCP Dataproc Cluster Runbook +This runbook covers Dataproc cluster-level failures: master/worker node issues, init action +failures, cluster scaling errors, and YARN/HDFS problems on the managed cluster. +GCP resource type: cloud_dataproc_cluster. UC2 cluster name: aria-uc2-cluster. + +## Log Paths +Cloud Logging API — no local SSH paths. Filter by: +* resource.type = "cloud_dataproc_cluster" +* resource.labels.cluster_name = + +## Target Error Keywords +Match these patterns for cluster-level triage on Dataproc: +* FATAL +* OutOfMemory +* YARN +* HDFS +* timeout +* AuthenticationException +* DiskOutOfSpaceException +* safe mode diff --git a/tests/fixtures/knowledge_base/dataproc_job.md b/tests/fixtures/knowledge_base/dataproc_job.md new file mode 100644 index 0000000..672fe8d --- /dev/null +++ b/tests/fixtures/knowledge_base/dataproc_job.md @@ -0,0 +1,20 @@ +# GCP Dataproc Job Runbook +This runbook covers Dataproc job execution failures: Spark driver crashes, executor OOM, +quota exceeded, SA key expiry, driver node timeouts, and init action failures. +GCP resource type: cloud_dataproc_job. UC2 cluster name: aria-uc2-cluster. + +## Log Paths +Cloud Logging API — no local SSH paths. Filter by: +* resource.type = "cloud_dataproc_job" +* resource.labels.cluster_name = + +## Target Error Keywords +Match these patterns for job-level triage on Dataproc: +* FATAL +* OutOfMemory +* Spark +* timeout +* DiskOutOfSpaceException +* AuthenticationException +* WARN +* ERROR diff --git a/tests/fixtures/knowledge_base/gcp_native.md b/tests/fixtures/knowledge_base/gcp_native.md new file mode 100644 index 0000000..1d43888 --- /dev/null +++ b/tests/fixtures/knowledge_base/gcp_native.md @@ -0,0 +1,21 @@ +# GCP Native Services Runbook +This runbook covers native GCP service incidents: BigQuery, Cloud Functions, Pub/Sub, GCS. +Full connector support is Phase 1.5 S6 scope. In round 2 testing (S5), UC3 validates the +graceful degradation path only — Agent 2 returns an empty LogQueryResult with LOW confidence, +and Agent 4 notifies with "no logs available — connector not yet implemented for this platform". + +Platforms: BigQuery, Cloud Functions, Pub/Sub, GCS (GCP native services). + +## Log Paths +No connector implemented in S4. GCPLogConnector returns empty result for these resource types: +* bigquery_dataset +* cloud_function +* pubsub_subscription +* gcs_bucket + +## Target Error Keywords +These keywords are used for future S6 connector implementation and round 3 testing: +* FATAL +* timeout +* ERROR +* WARN diff --git a/tests/unit/test_classifier.py b/tests/unit/test_classifier.py index 766ffa0..3bc7a98 100644 --- a/tests/unit/test_classifier.py +++ b/tests/unit/test_classifier.py @@ -274,3 +274,36 @@ def test_log_request_field_absent_classifies_normally() -> None: assert state.classification is not None assert state.classification.error_class == "oom" assert state.pending_log_request is None + + +# ── Retry logic (#83) ───────────────────────────────────────────────────────── + + +def test_llm_retry_succeeds_on_second_attempt() -> None: + """Transient LLM failure recovers on retry — ClassificationError is not raised (#83).""" + llm = MagicMock(spec=LLMClientInterface) + llm.complete.side_effect = [ + Exception("503 Service Unavailable"), + json.dumps(_oom_response()), + ] + + with __import__("unittest.mock", fromlist=["patch"]).patch("time.sleep"): + agent = ClassifierAgent(llm_client=llm) + state = agent.run(_make_state()) + + assert state.classification is not None + assert state.classification.error_class == "oom" + assert llm.complete.call_count == 2 + + +def test_llm_retry_exhausted_raises_classification_error() -> None: + """Two consecutive LLM failures raise ClassificationError after retry (#83).""" + llm = MagicMock(spec=LLMClientInterface) + llm.complete.side_effect = Exception("persistent failure") + + with __import__("unittest.mock", fromlist=["patch"]).patch("time.sleep"): + agent = ClassifierAgent(llm_client=llm) + with pytest.raises(ClassificationError, match="LLM call failed"): + agent.run(_make_state()) + + assert llm.complete.call_count == 2 diff --git a/tests/unit/test_file_kb.py b/tests/unit/test_file_kb.py index 340a581..8c50acc 100644 --- a/tests/unit/test_file_kb.py +++ b/tests/unit/test_file_kb.py @@ -20,8 +20,8 @@ class TestFileKnowledgeBaseInit: """Tests that FileKnowledgeBase initialises correctly from a directory of runbook files.""" def test_loads_fixture_files(self, kb): - """Verify that two fixture runbook files are loaded on initialisation.""" - assert len(kb._files) == 2 + """Verify all fixture runbook files loaded (2 legacy + 3 UC1 + 2 UC2 + 1 UC3).""" + assert len(kb._files) == 8 def test_raises_on_missing_directory(self): """Verify that a non-existent directory path raises KnowledgeBaseError.""" @@ -89,3 +89,61 @@ def test_platform_tag_preserved(self, kb): """Verify that the platform_tag supplied to get_log_hints is echoed in the returned hint.""" hint = kb.get_log_hints("yarn-resourcemanager", PlatformTag.CDP) assert hint.platform_tag == PlatformTag.CDP + + +class TestUC1RunbookAcceptance: + """Acceptance tests for UC1 (Hadoop VMs) runbooks — validates TF log path alignment. + + These tests assert the exact log paths and keywords that SSHLogConnector will use + when querying UC1 nodes. Paths must match the TF-provisioned subdirectory layout + (e.g. /var/log/hadoop/hdfs, not /var/log/hadoop-hdfs). See issue #60. + """ + + def test_cdp_master_log_paths_and_keywords(self, kb): + """cdp-master-01: HDFS and YARN log paths extracted; OOM keyword present.""" + hint = kb.get_log_hints("cdp-master-01", PlatformTag.CDP) + assert any("/var/log/hadoop/hdfs" in p for p in hint.log_paths) + assert any("/var/log/hadoop/yarn" in p for p in hint.log_paths) + assert "OutOfMemory" in hint.keywords + assert "FATAL" in hint.keywords + + def test_cdp_bus_log_paths_and_keywords(self, kb): + """cdp-bus-01: Kafka and ZooKeeper log paths extracted; Kafka keyword present.""" + hint = kb.get_log_hints("cdp-bus-01", PlatformTag.CDP) + assert any("/var/log/kafka" in p for p in hint.log_paths) + assert any("/var/log/zookeeper" in p for p in hint.log_paths) + assert "Kafka" in hint.keywords + assert "ZooKeeper" in hint.keywords + + def test_cdp_utility_log_paths_and_keywords(self, kb): + """cdp-utility-01: Hive, Spark, Oozie, NiFi paths extracted; Hive keyword present.""" + hint = kb.get_log_hints("cdp-utility-01", PlatformTag.CDP) + assert any("/var/log/hive" in p for p in hint.log_paths) + assert any("/var/log/spark" in p for p in hint.log_paths) + assert any("/var/log/oozie" in p for p in hint.log_paths) + assert any("/var/log/nifi" in p for p in hint.log_paths) + assert "Hive" in hint.keywords + assert "Spark" in hint.keywords + + +class TestUC2RunbookAcceptance: + """Acceptance tests for UC2 (GCP Dataproc) runbooks — validates Cloud Logging keyword coverage. + + Dataproc runbooks carry no log paths (Cloud Logging is API-based). The tests confirm + that the expected keywords are extracted so SSHLogConnector keyword filters and + GCPLogConnector textPayload filters both have the right signal set. See issue #63. + """ + + def test_dataproc_cluster_keywords(self, kb): + """dataproc_cluster runbook returns YARN, OutOfMemory, and AuthenticationException.""" + hint = kb.get_log_hints("dataproc-cluster", PlatformTag.GCP) + assert "OutOfMemory" in hint.keywords + assert "YARN" in hint.keywords + assert "AuthenticationException" in hint.keywords + + def test_dataproc_job_keywords(self, kb): + """dataproc_job runbook returns Spark, OutOfMemory, and DiskOutOfSpaceException.""" + hint = kb.get_log_hints("dataproc-job", PlatformTag.GCP) + assert "Spark" in hint.keywords + assert "OutOfMemory" in hint.keywords + assert "DiskOutOfSpaceException" in hint.keywords diff --git a/tests/unit/test_gcp_log_connector.py b/tests/unit/test_gcp_log_connector.py index 4a8ef8b..5fb141d 100644 --- a/tests/unit/test_gcp_log_connector.py +++ b/tests/unit/test_gcp_log_connector.py @@ -197,3 +197,39 @@ def test_vault_key_name_used(): connector.query_logs(_HOST, PlatformTag.GCP, _START, _END) vault.get_secret.assert_called_once_with("MY_GCP_SA") + + +# ── resource_types filter (UC2 Dataproc) ──────────────────────────────────── + + +def test_filter_resource_types_clause(): + """Verify that resource_types produces an OR-combined resource.type clause in the filter.""" + f = _build_filter( + _HOST, + _START, + _END, + keywords=None, + resource_types=["cloud_dataproc_cluster", "cloud_dataproc_job"], + ) + assert 'resource.type="cloud_dataproc_cluster"' in f + assert 'resource.type="cloud_dataproc_job"' in f + + +def test_filter_resource_types_includes_cluster_name_label(): + """Verify that cluster_name is included as a host label alias when resource_types is set.""" + f = _build_filter( + _HOST, + _START, + _END, + keywords=None, + resource_types=["cloud_dataproc_cluster"], + ) + assert f'resource.labels.cluster_name="{_HOST}"' in f + + +def test_filter_no_resource_types_preserves_existing_labels(): + """Verify that omitting resource_types leaves the filter unchanged (backward compat).""" + f_with = _build_filter(_HOST, _START, _END, keywords=None, resource_types=None) + f_without = _build_filter(_HOST, _START, _END, keywords=None) + assert f_with == f_without + assert "resource.type" not in f_with diff --git a/tests/unit/test_log_extractor.py b/tests/unit/test_log_extractor.py index 1b1902e..dc49dcc 100644 --- a/tests/unit/test_log_extractor.py +++ b/tests/unit/test_log_extractor.py @@ -4,7 +4,7 @@ from datetime import datetime from unittest.mock import MagicMock -from core.agents.log_extractor import LogExtractorAgent +from core.agents.log_extractor import LogExtractorAgent, _validate_log_paths from core.interfaces.knowledge_base import KnowledgeBaseInterface from core.interfaces.llm_client import LLMClientInterface from core.interfaces.log_store import LogStoreInterface @@ -418,3 +418,30 @@ def test_pending_log_request_unknown_ci_returns_state_unchanged(): connector.query_logs.assert_not_called() assert result.log_result is original_result # unchanged assert result.error is None + + +# ── _validate_log_paths (#85) ───────────────────────────────────────────────── + + +def test_validate_log_paths_keeps_safe_paths(): + """Paths under /var/log/ are kept unchanged.""" + paths = ["/var/log/hadoop/hdfs/hadoop.log", "/var/log/kafka/server.log"] + assert _validate_log_paths(paths) == paths + + +def test_validate_log_paths_drops_unsafe_paths(): + """Paths outside /var/log/ are dropped to prevent path-injection (#85).""" + paths = ["/etc/ssh/authorized_keys", "/home/aria/.ssh/id_rsa", "/var/log/hadoop/hdfs.log"] + result = _validate_log_paths(paths) + assert result == ["/var/log/hadoop/hdfs.log"] + + +def test_validate_log_paths_empty_list(): + """Empty input produces empty output.""" + assert _validate_log_paths([]) == [] + + +def test_validate_log_paths_all_unsafe_returns_empty(): + """All-unsafe input produces an empty list — connector receives no paths.""" + paths = ["/etc/passwd", "~/.bashrc", "../../../etc/shadow"] + assert _validate_log_paths(paths) == [] diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index efa99f2..bc120af 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock from core.agents.classifier import ClassifierAgent +from core.exceptions import ClassificationError from core.models import ( ClassificationResult, ConfidenceBand, @@ -259,3 +260,22 @@ def crashing_run(state: PipelineState) -> PipelineState: assert result.error is not None assert "unexpected crash" in result.error assert result.notification_sent is False + + +def test_agent3_classification_error_still_routes_to_agent4(): + """Verify Agent 4 runs even when Agent 3 raises ClassificationError (#83). + + A ClassificationError must not abort the pipeline — the notify-only guarantee + requires Agent 4 to always produce a notification (possibly a partial one with + the error surfaced). If Agent 4 is skipped, AC-06 cannot be met for any run + where Agent 3 fails due to a transient LLM error. + """ + a3 = MagicMock() + a3.run.side_effect = ClassificationError("LLM call failed: 503 Service Unavailable") + + pipeline = _make_pipeline(agent3=a3) + result = pipeline.run("INC0000001") + + assert result.notification_sent is True + assert result.error is not None + assert "LLM call failed" in result.error From 465eaa24fa0a071bc489931afb5560d647bda83f Mon Sep 17 00:00:00 2001 From: Brm Date: Tue, 16 Jun 2026 20:06:55 +0000 Subject: [PATCH 3/6] docs(readme): S4 status in progress Co-Authored-By: Claude Sonnet 4.6 --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e46a5d9..db9e059 100644 --- a/README.md +++ b/README.md @@ -627,7 +627,7 @@ Phase 1 is complete when all of the following pass on 10 consecutive test incide | Phase 1.5 | S1: Structured logging — structlog, `run_id`, lifecycle events, RunRecord | ✅ Done | | Phase 1.5 | S2: Monitoring foundation — run store, REST API, Alpine.js dashboard, mode scaffold | ✅ Done | | Phase 1.5 | S3: Docker + `ARIA_CONFIG_PATH` + `VertexAILLMClient` + LLM provider DI (incl. #84 security fix) | ✅ Done | -| Phase 1.5 | S4: Testing infrastructure — UC1/UC2/UC3 cluster wiring, KB runbooks, CMDB validation | 🔜 Planned | +| Phase 1.5 | S4: Testing infrastructure — UC1/UC2/UC3 cluster wiring, KB runbooks, CMDB validation | 🔄 In progress | | Phase 1.5 | S5: Round 2 acceptance testing — 30 incidents on UC1 + UC2 real infrastructure | 🔜 Planned | | Phase 1.5 | S6: GCP native connectors — BQ, Cloud Functions, Pub/Sub, GCS | 🔜 Planned | | Phase 2 | Human validation gate + write-back to ServiceNow | 💡 Planned | From a4d37914f8f227a57192b117bf6d13acfeae9780 Mon Sep 17 00:00:00 2001 From: Brm Date: Tue, 16 Jun 2026 20:32:10 +0000 Subject: [PATCH 4/6] fix(kb): remove cluster token from dataproc_job.md to resolve score tie Both dataproc_cluster.md and dataproc_job.md scored equally for "cluster" queries due to "cluster_name" appearing in the Log Paths section of dataproc_job.md. This caused non-deterministic test failures in CI where the wrong runbook was returned for cluster-level incidents (YARN missing). Remove the token by replacing the multi-line filter block with a single sentence that doesn't contain "cluster", making dataproc_cluster.md the unique winner for cluster-targeted queries. Co-Authored-By: Claude Sonnet 4.6 --- tests/fixtures/knowledge_base/dataproc_job.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/fixtures/knowledge_base/dataproc_job.md b/tests/fixtures/knowledge_base/dataproc_job.md index 672fe8d..cbb8efc 100644 --- a/tests/fixtures/knowledge_base/dataproc_job.md +++ b/tests/fixtures/knowledge_base/dataproc_job.md @@ -1,12 +1,10 @@ # GCP Dataproc Job Runbook This runbook covers Dataproc job execution failures: Spark driver crashes, executor OOM, quota exceeded, SA key expiry, driver node timeouts, and init action failures. -GCP resource type: cloud_dataproc_job. UC2 cluster name: aria-uc2-cluster. +GCP resource type: cloud_dataproc_job. UC2 cluster: aria-uc2-cluster. ## Log Paths -Cloud Logging API — no local SSH paths. Filter by: -* resource.type = "cloud_dataproc_job" -* resource.labels.cluster_name = +Cloud Logging API — no local SSH paths. Filter by resource.type = "cloud_dataproc_job". ## Target Error Keywords Match these patterns for job-level triage on Dataproc: From a00f7c28c82f3488b47b8d4062911d1c0b77e788 Mon Sep 17 00:00:00 2001 From: Brm Date: Tue, 16 Jun 2026 20:36:13 +0000 Subject: [PATCH 5/6] fix(kb): fully purge cluster token from dataproc_job.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previous fix replaced cluster_name label text but left two more cluster occurrences: the word literal in "UC2 cluster:" and the hyphen-split token from "aria-uc2-cluster". Since _tokenize uses re.findall(r"\w+") — hyphens split but underscores don't — aria-uc2-cluster tokenises to ["aria","uc2","cluster"], still tying with dataproc_cluster.md for "dataproc-cluster gcp" queries. Replace both with "UC2 job runner: aria-uc2-dataproc" so dataproc_job.md scores 0.5 and dataproc_cluster.md scores 0.75 for cluster queries. Co-Authored-By: Claude Sonnet 4.6 --- tests/fixtures/knowledge_base/dataproc_job.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fixtures/knowledge_base/dataproc_job.md b/tests/fixtures/knowledge_base/dataproc_job.md index cbb8efc..5270551 100644 --- a/tests/fixtures/knowledge_base/dataproc_job.md +++ b/tests/fixtures/knowledge_base/dataproc_job.md @@ -1,7 +1,7 @@ # GCP Dataproc Job Runbook This runbook covers Dataproc job execution failures: Spark driver crashes, executor OOM, quota exceeded, SA key expiry, driver node timeouts, and init action failures. -GCP resource type: cloud_dataproc_job. UC2 cluster: aria-uc2-cluster. +GCP resource type: cloud_dataproc_job. UC2 job runner: aria-uc2-dataproc. ## Log Paths Cloud Logging API — no local SSH paths. Filter by resource.type = "cloud_dataproc_job". From 554fb9e63991680128c2b31078431896a4987faa Mon Sep 17 00:00:00 2001 From: Brm Date: Wed, 17 Jun 2026 09:45:19 +0000 Subject: [PATCH 6/6] refactor(kb): split KB into resource_kb and analyser_kb; per-cluster structure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two architectural fixes in one commit: 1. Split knowledge_base fixtures into resource_kb/ (Agent 2) and analyser_kb/ (Agent 3). Eliminates the design confusion that put failure vocabulary in Agent 2's resource catalog — the root cause of the S4 CI score-tie failures. 2. Consolidate from 8 per-component files to 3 per-cluster files in resource_kb. Each file describes a cluster's physical/logical resources and log paths — no error keywords, no failure descriptions. The cdp_cluster.md covers all 5 UC1 nodes in one file; aria_uc2_cluster.md covers Dataproc logical resources. 3. Add analyser_kb/ with 5 labeled log excerpts (OOM, disk, auth, YARN safe mode, OK baseline) injected into Agent 3's prompt as few-shot examples. These files double as a training corpus for the future fine-tuned Agent 3 model. 4. ClassifierAgent gains analyser_kb_dir param + _load_analyser_kb() loader. cfg.analyser_kb_dir() reads knowledge_base.analyser_kb_dir / ARIA_ANALYSER_KB_DIR. get_agent3() passes the configured dir at construction. Co-Authored-By: Claude Sonnet 4.6 --- api/dependencies.py | 5 +- conf_template.yaml | 5 + core/agents/classifier.py | 29 ++- core/config.py | 13 ++ .../analyser_kb/incident_auth.md | 7 + .../analyser_kb/incident_disk.md | 7 + .../analyser_kb/incident_oom.md | 7 + .../analyser_kb/incident_yarn_safemode.md | 7 + .../knowledge_base/analyser_kb/ok_baseline.md | 7 + tests/fixtures/knowledge_base/cdp_bus.md | 16 -- tests/fixtures/knowledge_base/cdp_master.md | 18 -- tests/fixtures/knowledge_base/cdp_utility.md | 20 -- .../knowledge_base/dataproc_cluster.md | 20 -- tests/fixtures/knowledge_base/dataproc_job.md | 18 -- tests/fixtures/knowledge_base/gcp_native.md | 21 --- .../fixtures/knowledge_base/hdfs_namenode.md | 23 --- .../resource_kb/aria_uc2_cluster.md | 15 ++ .../knowledge_base/resource_kb/cdp_cluster.md | 36 ++++ .../knowledge_base/resource_kb/gcp_native.md | 4 + .../knowledge_base/yarn_resourcemanager.md | 23 --- tests/unit/test_classifier.py | 42 +++++ tests/unit/test_file_kb.py | 178 ++++++++++-------- 22 files changed, 281 insertions(+), 240 deletions(-) create mode 100644 tests/fixtures/knowledge_base/analyser_kb/incident_auth.md create mode 100644 tests/fixtures/knowledge_base/analyser_kb/incident_disk.md create mode 100644 tests/fixtures/knowledge_base/analyser_kb/incident_oom.md create mode 100644 tests/fixtures/knowledge_base/analyser_kb/incident_yarn_safemode.md create mode 100644 tests/fixtures/knowledge_base/analyser_kb/ok_baseline.md delete mode 100644 tests/fixtures/knowledge_base/cdp_bus.md delete mode 100644 tests/fixtures/knowledge_base/cdp_master.md delete mode 100644 tests/fixtures/knowledge_base/cdp_utility.md delete mode 100644 tests/fixtures/knowledge_base/dataproc_cluster.md delete mode 100644 tests/fixtures/knowledge_base/dataproc_job.md delete mode 100644 tests/fixtures/knowledge_base/gcp_native.md delete mode 100644 tests/fixtures/knowledge_base/hdfs_namenode.md create mode 100644 tests/fixtures/knowledge_base/resource_kb/aria_uc2_cluster.md create mode 100644 tests/fixtures/knowledge_base/resource_kb/cdp_cluster.md create mode 100644 tests/fixtures/knowledge_base/resource_kb/gcp_native.md delete mode 100644 tests/fixtures/knowledge_base/yarn_resourcemanager.md diff --git a/api/dependencies.py b/api/dependencies.py index 3d07601..240035e 100644 --- a/api/dependencies.py +++ b/api/dependencies.py @@ -144,7 +144,10 @@ def get_agent3() -> ClassifierAgent: "ARIA_AGENT3_MODEL env var is not set " "(or ARIA_GLOBAL_MODEL when ARIA_LLM_MODE=global)" ) - return ClassifierAgent(llm_client=_get_llm_client(model)) + return ClassifierAgent( + llm_client=_get_llm_client(model), + analyser_kb_dir=cfg.analyser_kb_dir(), + ) @lru_cache(maxsize=1) diff --git a/conf_template.yaml b/conf_template.yaml index 42259a6..069f81b 100644 --- a/conf_template.yaml +++ b/conf_template.yaml @@ -75,5 +75,10 @@ cdp: - /var/log/oozie - /var/log/nifi +knowledge_base: + analyser_kb_dir: data/knowledge_base/analyser_kb # labeled log excerpts for Agent 3 few-shot prompting + # doubles as fine-tuning corpus for the future + # specialist Agent 3 model (roadmap: post-POC) + slack: channel_id: # channel where Agent 4 posts notifications diff --git a/core/agents/classifier.py b/core/agents/classifier.py index 55386c3..5d58dde 100644 --- a/core/agents/classifier.py +++ b/core/agents/classifier.py @@ -11,6 +11,7 @@ import json import time +from pathlib import Path from typing import Any from core.exceptions import ClassificationError @@ -24,6 +25,21 @@ {"oom", "cpu", "disk", "network", "auth", "db_lock", "pipeline", "unknown"} ) + +def _load_analyser_kb(directory: str) -> str: + """Load analyser_kb files and join them as a single few-shot reference block. + + Files are sorted alphabetically so the ordering is deterministic. Returns an + empty string when the directory is missing — the classifier degrades gracefully. + """ + path = Path(directory) + if not path.is_dir(): + return "" + return "\n\n---\n\n".join( + f.read_text(encoding="utf-8").strip() for f in sorted(path.glob("*.md")) + ) + + _SYSTEM_PROMPT = """\ You are ARIA, an AI operations assistant. Classify the root cause of the incident \ from the metadata and log evidence below. @@ -59,14 +75,23 @@ class ClassifierAgent: falls back to stub behaviour (dry-run compatibility). """ - def __init__(self, llm_client: LLMClientInterface | None = None) -> None: + def __init__( + self, + llm_client: LLMClientInterface | None = None, + analyser_kb_dir: str | None = None, + ) -> None: """Initialise the classifier. Args: llm_client: LLM client used to call the model. When None, the agent falls back to stub behaviour (error_class='unknown', LOW confidence). + analyser_kb_dir: Path to analyser_kb directory of labeled log excerpts. + When provided, examples are injected into every LLM call as + few-shot reference context. Also serves as the training data + corpus for the future fine-tuned Agent 3 model. """ self._llm = llm_client + self._few_shot_examples: str = _load_analyser_kb(analyser_kb_dir) if analyser_kb_dir else "" @log_agent_lifecycle("agent3") def run(self, state: PipelineState) -> PipelineState: @@ -214,6 +239,8 @@ def _build_messages(self, state: PipelineState) -> list[dict[str, str]]: f"Affected CI: {affected_ci}\n\n" f"Log evidence — {log_section}" ) + if self._few_shot_examples: + content += f"\n\n## Reference log examples\n\n{self._few_shot_examples}" return [{"role": "user", "content": content}] def _parse_response(self, raw: str) -> tuple[ClassificationResult | None, LogRequest | None]: diff --git a/core/config.py b/core/config.py index f6572d0..2f7c3a9 100644 --- a/core/config.py +++ b/core/config.py @@ -226,6 +226,19 @@ def run_db_path() -> str: return _get(["runs", "db_path"], "ARIA_RUN_DB_PATH", "data/runs.db") +# ── Knowledge Base ──────────────────────────────────────────────────────────── + + +def analyser_kb_dir() -> str | None: + """Return path to the analyser_kb directory of labeled log excerpts for Agent 3 few-shot prompting. + + Reads knowledge_base.analyser_kb_dir from conf.yaml / ARIA_ANALYSER_KB_DIR env var. + None when not configured — Agent 3 classifies without few-shot examples. + """ + val = _get(["knowledge_base", "analyser_kb_dir"], "ARIA_ANALYSER_KB_DIR") + return val or None + + # ── GCP ─────────────────────────────────────────────────────────────────────── diff --git a/tests/fixtures/knowledge_base/analyser_kb/incident_auth.md b/tests/fixtures/knowledge_base/analyser_kb/incident_auth.md new file mode 100644 index 0000000..1e2a460 --- /dev/null +++ b/tests/fixtures/knowledge_base/analyser_kb/incident_auth.md @@ -0,0 +1,7 @@ +# Auth Failure — Example Log +label: incident | type: auth + +2024-01-17 09:45:33 ERROR KerberosAuthenticator: AuthenticationException: SASL authentication failed for user hive +2024-01-17 09:45:33 FATAL HiveServer2: Unable to obtain Kerberos TGT — ticket expired or missing keytab +2024-01-17 09:45:34 WARN HiveMetaStore: Connection refused — authentication rejected for principal aria-svc@REALM.COM +2024-01-17 09:45:35 ERROR ZooKeeperClient: Session expired — Kerberos credentials invalid diff --git a/tests/fixtures/knowledge_base/analyser_kb/incident_disk.md b/tests/fixtures/knowledge_base/analyser_kb/incident_disk.md new file mode 100644 index 0000000..e2f7e64 --- /dev/null +++ b/tests/fixtures/knowledge_base/analyser_kb/incident_disk.md @@ -0,0 +1,7 @@ +# Disk Full — Example Log +label: incident | type: disk + +2024-01-16 14:22:11 FATAL NameNode: DiskOutOfSpaceException: No space left for replica block blk_1073741825 +2024-01-16 14:22:11 ERROR NameNode: HDFS entering safe mode — not enough DataNodes reporting healthy +2024-01-16 14:22:12 WARN DataNode: Failed to append to /var/log/hadoop/hdfs — disk quota exceeded +2024-01-16 14:22:13 ERROR DataNode: IOException: No space left on device /data/hdfs/dn diff --git a/tests/fixtures/knowledge_base/analyser_kb/incident_oom.md b/tests/fixtures/knowledge_base/analyser_kb/incident_oom.md new file mode 100644 index 0000000..b9b2d4d --- /dev/null +++ b/tests/fixtures/knowledge_base/analyser_kb/incident_oom.md @@ -0,0 +1,7 @@ +# OOM — Example Log +label: incident | type: oom + +2024-01-15 03:12:44 FATAL NodeManager: java.lang.OutOfMemoryError: Java heap space +2024-01-15 03:12:44 ERROR NodeManager: Container killed by YARN for exceeding memory limits — exitCode=137 +2024-01-15 03:12:45 WARN ResourceManager: Node cdp-data-01 marked unhealthy — insufficient memory +2024-01-15 03:12:46 ERROR NodeManager: GC overhead limit exceeded in yarn_nm_app_log_dir diff --git a/tests/fixtures/knowledge_base/analyser_kb/incident_yarn_safemode.md b/tests/fixtures/knowledge_base/analyser_kb/incident_yarn_safemode.md new file mode 100644 index 0000000..dc01cb6 --- /dev/null +++ b/tests/fixtures/knowledge_base/analyser_kb/incident_yarn_safemode.md @@ -0,0 +1,7 @@ +# YARN / HDFS Safe Mode — Example Log +label: incident | type: pipeline + +2024-01-18 22:10:05 FATAL NameNode: HDFS is in safe mode — replica count below minimum threshold +2024-01-18 22:10:05 ERROR NameNode: Only 2 of 3 DataNodes reporting — waiting for data node recovery +2024-01-18 22:10:06 WARN ResourceManager: YARN cluster degraded — 1 node lost, rescheduling containers +2024-01-18 22:10:07 ERROR JobHistoryServer: Cannot write to /var/log/hadoop/yarn — HDFS safe mode active diff --git a/tests/fixtures/knowledge_base/analyser_kb/ok_baseline.md b/tests/fixtures/knowledge_base/analyser_kb/ok_baseline.md new file mode 100644 index 0000000..32ea258 --- /dev/null +++ b/tests/fixtures/knowledge_base/analyser_kb/ok_baseline.md @@ -0,0 +1,7 @@ +# Healthy Baseline — Example Log +label: ok + +2024-01-19 08:00:01 INFO NameNode: Block report from DataNode cdp-data-01 — 12500 blocks registered +2024-01-19 08:00:02 INFO ResourceManager: NodeManager cdp-data-01 heartbeat — 16GB available, 4 cores free +2024-01-19 08:00:03 INFO DataNode: Successfully replicated block blk_1073741826 to 3 nodes +2024-01-19 08:00:04 INFO HiveServer2: Query completed in 1.23s — rows returned: 4521 diff --git a/tests/fixtures/knowledge_base/cdp_bus.md b/tests/fixtures/knowledge_base/cdp_bus.md deleted file mode 100644 index 3dd1aa2..0000000 --- a/tests/fixtures/knowledge_base/cdp_bus.md +++ /dev/null @@ -1,16 +0,0 @@ -# CDP Bus Data Pipeline Runbook -This runbook covers the cdp bus node layer (Kafka broker, ZooKeeper ensemble). -TF node name: cdp-bus-01. NiFi is co-located on the bus node in some deployments. - -## Log Paths -* Kafka broker: /var/log/kafka/server.log -* ZooKeeper: /var/log/zookeeper/zookeeper.log - -## Target Error Keywords -Monitor for messaging and coordination layer failures: -* Kafka -* ZooKeeper -* timeout -* FATAL -* Connection refused -* AuthenticationException diff --git a/tests/fixtures/knowledge_base/cdp_master.md b/tests/fixtures/knowledge_base/cdp_master.md deleted file mode 100644 index 0f6093a..0000000 --- a/tests/fixtures/knowledge_base/cdp_master.md +++ /dev/null @@ -1,18 +0,0 @@ -# CDP Master Node Runbook -This runbook covers the cdp master node layer (NameNode, ResourceManager, HiveServer2). -TF node name: cdp-master-01. Log paths use the TF-provisioned subdirectory layout. - -## Log Paths -* HDFS NameNode: /var/log/hadoop/hdfs/hdfs-daemon.log -* YARN ResourceManager: /var/log/hadoop/yarn/yarn-daemon.log - -## Target Error Keywords -Match these fault patterns during HDFS and YARN incident triage: -* OutOfMemory -* DiskOutOfSpaceException -* FATAL -* WARN -* Connection refused -* AuthenticationException -* GC overhead -* safe mode diff --git a/tests/fixtures/knowledge_base/cdp_utility.md b/tests/fixtures/knowledge_base/cdp_utility.md deleted file mode 100644 index 1b01e51..0000000 --- a/tests/fixtures/knowledge_base/cdp_utility.md +++ /dev/null @@ -1,20 +0,0 @@ -# CDP Utility Node Runbook -This runbook covers the cdp utility node layer (Hive Metastore, Spark History, Oozie, NiFi). -TF node name: cdp-utility-01. - -## Log Paths -* Hive Metastore: /var/log/hive/hive.log -* Spark History Server: /var/log/spark/spark.log -* Oozie Coordinator: /var/log/oozie/oozie.log -* NiFi Flow Manager: /var/log/nifi/nifi.log - -## Target Error Keywords -Isolate runtime engine query and execution failures using these patterns: -* Hive -* Spark -* Oozie -* NiFi -* FATAL -* OOM -* OutOfMemory -* timeout diff --git a/tests/fixtures/knowledge_base/dataproc_cluster.md b/tests/fixtures/knowledge_base/dataproc_cluster.md deleted file mode 100644 index f96cf38..0000000 --- a/tests/fixtures/knowledge_base/dataproc_cluster.md +++ /dev/null @@ -1,20 +0,0 @@ -# GCP Dataproc Cluster Runbook -This runbook covers Dataproc cluster-level failures: master/worker node issues, init action -failures, cluster scaling errors, and YARN/HDFS problems on the managed cluster. -GCP resource type: cloud_dataproc_cluster. UC2 cluster name: aria-uc2-cluster. - -## Log Paths -Cloud Logging API — no local SSH paths. Filter by: -* resource.type = "cloud_dataproc_cluster" -* resource.labels.cluster_name = - -## Target Error Keywords -Match these patterns for cluster-level triage on Dataproc: -* FATAL -* OutOfMemory -* YARN -* HDFS -* timeout -* AuthenticationException -* DiskOutOfSpaceException -* safe mode diff --git a/tests/fixtures/knowledge_base/dataproc_job.md b/tests/fixtures/knowledge_base/dataproc_job.md deleted file mode 100644 index 5270551..0000000 --- a/tests/fixtures/knowledge_base/dataproc_job.md +++ /dev/null @@ -1,18 +0,0 @@ -# GCP Dataproc Job Runbook -This runbook covers Dataproc job execution failures: Spark driver crashes, executor OOM, -quota exceeded, SA key expiry, driver node timeouts, and init action failures. -GCP resource type: cloud_dataproc_job. UC2 job runner: aria-uc2-dataproc. - -## Log Paths -Cloud Logging API — no local SSH paths. Filter by resource.type = "cloud_dataproc_job". - -## Target Error Keywords -Match these patterns for job-level triage on Dataproc: -* FATAL -* OutOfMemory -* Spark -* timeout -* DiskOutOfSpaceException -* AuthenticationException -* WARN -* ERROR diff --git a/tests/fixtures/knowledge_base/gcp_native.md b/tests/fixtures/knowledge_base/gcp_native.md deleted file mode 100644 index 1d43888..0000000 --- a/tests/fixtures/knowledge_base/gcp_native.md +++ /dev/null @@ -1,21 +0,0 @@ -# GCP Native Services Runbook -This runbook covers native GCP service incidents: BigQuery, Cloud Functions, Pub/Sub, GCS. -Full connector support is Phase 1.5 S6 scope. In round 2 testing (S5), UC3 validates the -graceful degradation path only — Agent 2 returns an empty LogQueryResult with LOW confidence, -and Agent 4 notifies with "no logs available — connector not yet implemented for this platform". - -Platforms: BigQuery, Cloud Functions, Pub/Sub, GCS (GCP native services). - -## Log Paths -No connector implemented in S4. GCPLogConnector returns empty result for these resource types: -* bigquery_dataset -* cloud_function -* pubsub_subscription -* gcs_bucket - -## Target Error Keywords -These keywords are used for future S6 connector implementation and round 3 testing: -* FATAL -* timeout -* ERROR -* WARN diff --git a/tests/fixtures/knowledge_base/hdfs_namenode.md b/tests/fixtures/knowledge_base/hdfs_namenode.md deleted file mode 100644 index 9bd0201..0000000 --- a/tests/fixtures/knowledge_base/hdfs_namenode.md +++ /dev/null @@ -1,23 +0,0 @@ -# HDFS NameNode Runbook - -service: hdfs-namenode -platform: cdp -cluster: cdp-cluster - -## Common errors - -- OutOfMemory: NameNode heap exhausted -- ERROR HDFS disk full: DataNode storage exhausted -- FATAL NameNode entering safe mode -- connection refused: NameNode RPC port 8020 unreachable -- timeout connecting to namenode - -## Log paths - -/var/log/hadoop-hdfs/hadoop-hdfs-namenode.log -/var/log/hadoop-hdfs/hadoop-hdfs-secondarynamenode.log -/var/log/hadoop/hdfs/namenode.log - -## Keywords - -ERROR WARN FATAL OOM OutOfMemory disk full HDFS NameNode safe mode timeout diff --git a/tests/fixtures/knowledge_base/resource_kb/aria_uc2_cluster.md b/tests/fixtures/knowledge_base/resource_kb/aria_uc2_cluster.md new file mode 100644 index 0000000..2080809 --- /dev/null +++ b/tests/fixtures/knowledge_base/resource_kb/aria_uc2_cluster.md @@ -0,0 +1,15 @@ +# aria-uc2-cluster (UC2 — GCP Dataproc) +Platform: GCP Dataproc + +## Physical Resources +None — fully managed GCP service. + +## Logical Resources + +### cloud_dataproc_cluster +Log source: Cloud Logging API +resource.type: cloud_dataproc_cluster + +### cloud_dataproc_job +Log source: Cloud Logging API +resource.type: cloud_dataproc_job diff --git a/tests/fixtures/knowledge_base/resource_kb/cdp_cluster.md b/tests/fixtures/knowledge_base/resource_kb/cdp_cluster.md new file mode 100644 index 0000000..7d6761f --- /dev/null +++ b/tests/fixtures/knowledge_base/resource_kb/cdp_cluster.md @@ -0,0 +1,36 @@ +# cdp-cluster (UC1 — Hadoop on-prem VMs) +Platform: CDP + +## Physical Resources + +### cdp-master-01 +IP: POPULATE_FROM_TF_OUTPUT +Log paths: + - /var/log/hadoop/hdfs + - /var/log/hadoop/yarn + +### cdp-data-01 +IP: POPULATE_FROM_TF_OUTPUT +Log paths: + - /var/log/hadoop/hdfs + - /var/log/hadoop/yarn + +### cdp-data-02 +IP: POPULATE_FROM_TF_OUTPUT +Log paths: + - /var/log/hadoop/hdfs + - /var/log/hadoop/yarn + +### cdp-utility-01 +IP: POPULATE_FROM_TF_OUTPUT +Log paths: + - /var/log/hive + - /var/log/spark + - /var/log/oozie + - /var/log/nifi + +### cdp-bus-01 +IP: POPULATE_FROM_TF_OUTPUT +Log paths: + - /var/log/kafka/server.log + - /var/log/zookeeper/zookeeper.log diff --git a/tests/fixtures/knowledge_base/resource_kb/gcp_native.md b/tests/fixtures/knowledge_base/resource_kb/gcp_native.md new file mode 100644 index 0000000..62ac518 --- /dev/null +++ b/tests/fixtures/knowledge_base/resource_kb/gcp_native.md @@ -0,0 +1,4 @@ +# gcp-native (UC3 — GCP native services) +Platform: GCP +Services: BigQuery, Cloud Functions, Pub/Sub, GCS +Status: No connector implemented — graceful degradation. diff --git a/tests/fixtures/knowledge_base/yarn_resourcemanager.md b/tests/fixtures/knowledge_base/yarn_resourcemanager.md deleted file mode 100644 index aa2d02b..0000000 --- a/tests/fixtures/knowledge_base/yarn_resourcemanager.md +++ /dev/null @@ -1,23 +0,0 @@ -# YARN ResourceManager Runbook - -service: yarn-resourcemanager -platform: cdp -cluster: cdp-cluster - -## Common errors - -- OutOfMemory: ResourceManager heap too small -- ERROR YARN container allocation failure -- WARN NodeManager lost contact -- connection refused: ResourceManager port 8032 -- disk full on NodeManager local dirs - -## Log paths - -/var/log/hadoop-yarn/yarn-yarn-resourcemanager.log -/var/log/hadoop-yarn/yarn-yarn-nodemanager.log -/var/log/hadoop/yarn/resourcemanager.log - -## Keywords - -ERROR WARN FATAL OOM OutOfMemory YARN ResourceManager NodeManager container disk full timeout diff --git a/tests/unit/test_classifier.py b/tests/unit/test_classifier.py index 3bc7a98..54a9e43 100644 --- a/tests/unit/test_classifier.py +++ b/tests/unit/test_classifier.py @@ -307,3 +307,45 @@ def test_llm_retry_exhausted_raises_classification_error() -> None: agent.run(_make_state()) assert llm.complete.call_count == 2 + + +# ── Few-shot injection (analyser_kb) ───────────────────────────────────────── + + +def test_few_shot_examples_injected_in_message() -> None: + """analyser_kb examples appear in the user message when analyser_kb_dir is configured.""" + import os + + analyser_kb_dir = os.path.join( + os.path.dirname(__file__), "../fixtures/knowledge_base/analyser_kb" + ) + agent = ClassifierAgent( + llm_client=_mock_llm(_oom_response()), + analyser_kb_dir=analyser_kb_dir, + ) + messages = agent._build_messages(_make_state()) + content = messages[0]["content"] + + assert "Reference log examples" in content + assert "label: incident" in content + + +def test_no_analyser_kb_dir_produces_clean_message() -> None: + """Without analyser_kb_dir, the user message contains no few-shot section.""" + agent = ClassifierAgent(llm_client=_mock_llm(_oom_response())) + messages = agent._build_messages(_make_state()) + content = messages[0]["content"] + + assert "Reference log examples" not in content + + +def test_missing_analyser_kb_dir_degrades_gracefully() -> None: + """A non-existent analyser_kb_dir is silently ignored — no crash, no examples.""" + agent = ClassifierAgent( + llm_client=_mock_llm(_oom_response()), + analyser_kb_dir="/nonexistent/path/analyser_kb", + ) + messages = agent._build_messages(_make_state()) + content = messages[0]["content"] + + assert "Reference log examples" not in content diff --git a/tests/unit/test_file_kb.py b/tests/unit/test_file_kb.py index 8c50acc..5d38e3f 100644 --- a/tests/unit/test_file_kb.py +++ b/tests/unit/test_file_kb.py @@ -1,4 +1,12 @@ -"""Unit tests for FileKnowledgeBase (ARI-59).""" +"""Unit tests for FileKnowledgeBase (ARI-59). + +The KB fixture files are organised in two subfolders: + resource_kb/ — lean per-cluster resource catalogs (Agent 2) + analyser_kb/ — labeled log excerpts for few-shot prompting (Agent 3) + +These tests cover resource_kb only. Agent 3 few-shot loading is tested in +test_classifier.py. +""" import os @@ -8,7 +16,7 @@ from core.models import PlatformTag from implementations.knowledge_base.file_kb import FileKnowledgeBase -FIXTURE_DIR = os.path.join(os.path.dirname(__file__), "../fixtures/knowledge_base") +FIXTURE_DIR = os.path.join(os.path.dirname(__file__), "../fixtures/knowledge_base/resource_kb") @pytest.fixture @@ -17,133 +25,145 @@ def kb(): class TestFileKnowledgeBaseInit: - """Tests that FileKnowledgeBase initialises correctly from a directory of runbook files.""" + """FileKnowledgeBase initialisation.""" def test_loads_fixture_files(self, kb): - """Verify all fixture runbook files loaded (2 legacy + 3 UC1 + 2 UC2 + 1 UC3).""" - assert len(kb._files) == 8 + """Verify all resource_kb fixture files loaded (3 cluster files).""" + assert len(kb._files) == 3 def test_raises_on_missing_directory(self): - """Verify that a non-existent directory path raises KnowledgeBaseError.""" + """Non-existent directory path raises KnowledgeBaseError.""" with pytest.raises(KnowledgeBaseError): FileKnowledgeBase("/nonexistent/path") class TestGetServiceHints: - """Tests for FileKnowledgeBase.get_service_hints — keyword matching against runbook files.""" + """FileKnowledgeBase.get_service_hints — cluster resolution from incident description.""" - def test_returns_hdfs_for_hdfs_incident(self, kb): - """Verify that an HDFS disk-full description returns hdfs-namenode as the top hint.""" + def test_returns_cdp_cluster_for_cdp_incident(self, kb): + """CDP HDFS incident resolves to cdp-cluster as top hint.""" hints = kb.get_service_hints( - cluster="cdp-cluster-01", - description="HDFS NameNode disk full, safe mode triggered", + cluster="cdp-cluster", + description="HDFS NameNode disk full safe mode triggered", ) assert len(hints) > 0 - assert hints[0] == "hdfs-namenode" + assert hints[0] == "cdp-cluster" - def test_returns_yarn_for_yarn_incident(self, kb): - """Verify that a YARN OOM description returns yarn-resourcemanager as the top hint.""" + def test_returns_uc2_cluster_for_dataproc_incident(self, kb): + """GCP Dataproc incident resolves to aria-uc2-cluster as top hint.""" hints = kb.get_service_hints( - cluster="cdp-cluster-01", - description="YARN ResourceManager OutOfMemory NodeManager lost", + cluster="aria-uc2-cluster", + description="Dataproc job failed OutOfMemory Spark driver crash", ) assert len(hints) > 0 - assert hints[0] == "yarn-resourcemanager" + assert hints[0] == "aria-uc2-cluster" def test_returns_empty_on_no_match(self, kb): - """Verify that an unrecognised incident description returns an empty hint list.""" + """Unrecognised cluster/description returns empty hint list.""" hints = kb.get_service_hints( cluster="oracle-rac-01", - description="ORA-12541 tnsnames listener ora-prod-01 tablespace", + description="ORA-12541 tnsnames listener tablespace ora-prod-01", ) assert hints == [] class TestGetLogHints: - """Tests for FileKnowledgeBase.get_log_hints — log path and keyword extraction.""" + """FileKnowledgeBase.get_log_hints — log path and keyword extraction.""" - def test_returns_log_paths_for_hdfs(self, kb): - """Verify that the HDFS runbook yields at least one /var/log path.""" - hint = kb.get_log_hints("hdfs-namenode", PlatformTag.CDP) + def test_returns_log_paths_for_cdp_cluster(self, kb): + """CDP cluster runbook yields /var/log paths for all node types.""" + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) assert len(hint.log_paths) > 0 - assert all("/var/log" in p or ".log" in p for p in hint.log_paths) - - def test_returns_keywords_for_hdfs(self, kb): - """Verify that the HDFS runbook yields at least one keyword for log filtering.""" - hint = kb.get_log_hints("hdfs-namenode", PlatformTag.CDP) - assert len(hint.keywords) > 0 + assert any("/var/log" in p for p in hint.log_paths) + + def test_returns_no_failure_vocab_in_resource_kb(self, kb): + """resource_kb has no failure vocabulary — error-class patterns must not be extracted. + + Service/technology names (HDFS, YARN, Kafka) may appear because _KEYWORD_RE + matches them via log path text. That is acceptable — they are not error signals. + What must be absent is actual failure vocabulary: FATAL, OOM, OutOfMemory, etc. + """ + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) + failure_vocab = { + "OutOfMemory", + "FATAL", + "OOM", + "AuthenticationException", + "DiskOutOfSpaceException", + "GC overhead", + "disk full", + "connection refused", + "safe mode", + "timeout", + } + assert not any(k in failure_vocab for k in hint.keywords) def test_high_confidence_on_strong_match(self, kb): - """Verify that a well-matched service name yields a confidence of at least 0.5.""" - hint = kb.get_log_hints("hdfs-namenode", PlatformTag.CDP) + """Well-matched cluster name yields confidence >= 0.5.""" + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) assert hint.confidence >= 0.5 def test_returns_empty_hint_on_no_match(self, kb): - """Verify that an unrecognised service returns empty paths, keywords, and zero confidence.""" + """Unrecognised service returns empty paths, empty keywords, and zero confidence.""" hint = kb.get_log_hints("oracle-listener", PlatformTag.ORACLE) assert hint.log_paths == [] assert hint.keywords == [] assert hint.confidence == 0.0 def test_platform_tag_preserved(self, kb): - """Verify that the platform_tag supplied to get_log_hints is echoed in the returned hint.""" - hint = kb.get_log_hints("yarn-resourcemanager", PlatformTag.CDP) + """platform_tag supplied to get_log_hints is echoed in the returned hint.""" + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) assert hint.platform_tag == PlatformTag.CDP -class TestUC1RunbookAcceptance: - """Acceptance tests for UC1 (Hadoop VMs) runbooks — validates TF log path alignment. +class TestUC1ResourceAcceptance: + """Acceptance tests for UC1 (CDP cluster) resource_kb — validates all node log paths present. - These tests assert the exact log paths and keywords that SSHLogConnector will use - when querying UC1 nodes. Paths must match the TF-provisioned subdirectory layout - (e.g. /var/log/hadoop/hdfs, not /var/log/hadoop-hdfs). See issue #60. + The single cdp_cluster.md file must enumerate log paths for all 5 UC1 nodes + so Agent 2 knows where to grep regardless of which node type is implicated. """ - def test_cdp_master_log_paths_and_keywords(self, kb): - """cdp-master-01: HDFS and YARN log paths extracted; OOM keyword present.""" - hint = kb.get_log_hints("cdp-master-01", PlatformTag.CDP) + def test_cdp_cluster_all_node_log_paths(self, kb): + """cdp-cluster resource entry covers log paths for all 5 UC1 node types.""" + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) + # NameNode / DataNode (master + data nodes) assert any("/var/log/hadoop/hdfs" in p for p in hint.log_paths) assert any("/var/log/hadoop/yarn" in p for p in hint.log_paths) - assert "OutOfMemory" in hint.keywords - assert "FATAL" in hint.keywords - - def test_cdp_bus_log_paths_and_keywords(self, kb): - """cdp-bus-01: Kafka and ZooKeeper log paths extracted; Kafka keyword present.""" - hint = kb.get_log_hints("cdp-bus-01", PlatformTag.CDP) + # Bus node assert any("/var/log/kafka" in p for p in hint.log_paths) assert any("/var/log/zookeeper" in p for p in hint.log_paths) - assert "Kafka" in hint.keywords - assert "ZooKeeper" in hint.keywords - - def test_cdp_utility_log_paths_and_keywords(self, kb): - """cdp-utility-01: Hive, Spark, Oozie, NiFi paths extracted; Hive keyword present.""" - hint = kb.get_log_hints("cdp-utility-01", PlatformTag.CDP) + # Utility node assert any("/var/log/hive" in p for p in hint.log_paths) assert any("/var/log/spark" in p for p in hint.log_paths) - assert any("/var/log/oozie" in p for p in hint.log_paths) - assert any("/var/log/nifi" in p for p in hint.log_paths) - assert "Hive" in hint.keywords - assert "Spark" in hint.keywords + def test_cdp_cluster_no_failure_vocabulary(self, kb): + """resource_kb must not contain error-class patterns — those belong in analyser_kb.""" + hint = kb.get_log_hints("cdp-cluster", PlatformTag.CDP) + failure_vocab = { + "OutOfMemory", + "FATAL", + "OOM", + "AuthenticationException", + "DiskOutOfSpaceException", + "GC overhead", + } + assert not any(k in failure_vocab for k in hint.keywords) + + +class TestUC2ResourceAcceptance: + """Acceptance tests for UC2 (GCP Dataproc) resource_kb — Cloud Logging API, no local paths.""" + + def test_uc2_cluster_resolves(self, kb): + """aria-uc2-cluster resolves to a result with non-zero confidence.""" + hint = kb.get_log_hints("aria-uc2-cluster", PlatformTag.GCP) + assert hint.confidence > 0 + + def test_uc2_cluster_no_local_paths(self, kb): + """Dataproc uses Cloud Logging API — no local log paths should be returned.""" + hint = kb.get_log_hints("aria-uc2-cluster", PlatformTag.GCP) + assert hint.log_paths == [] -class TestUC2RunbookAcceptance: - """Acceptance tests for UC2 (GCP Dataproc) runbooks — validates Cloud Logging keyword coverage. - - Dataproc runbooks carry no log paths (Cloud Logging is API-based). The tests confirm - that the expected keywords are extracted so SSHLogConnector keyword filters and - GCPLogConnector textPayload filters both have the right signal set. See issue #63. - """ - - def test_dataproc_cluster_keywords(self, kb): - """dataproc_cluster runbook returns YARN, OutOfMemory, and AuthenticationException.""" - hint = kb.get_log_hints("dataproc-cluster", PlatformTag.GCP) - assert "OutOfMemory" in hint.keywords - assert "YARN" in hint.keywords - assert "AuthenticationException" in hint.keywords - - def test_dataproc_job_keywords(self, kb): - """dataproc_job runbook returns Spark, OutOfMemory, and DiskOutOfSpaceException.""" - hint = kb.get_log_hints("dataproc-job", PlatformTag.GCP) - assert "Spark" in hint.keywords - assert "OutOfMemory" in hint.keywords - assert "DiskOutOfSpaceException" in hint.keywords + def test_uc2_cluster_no_failure_vocabulary(self, kb): + """resource_kb UC2 entry must not contain failure keywords.""" + hint = kb.get_log_hints("aria-uc2-cluster", PlatformTag.GCP) + assert hint.keywords == []