Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ Phase 1 is complete when all of the following pass on 10 consecutive test incide
| Phase 1.5 | S1: Structured logging — structlog, `run_id`, lifecycle events, RunRecord | ✅ Done |
| Phase 1.5 | S2: Monitoring foundation — run store, REST API, Alpine.js dashboard, mode scaffold | ✅ Done |
| Phase 1.5 | S3: Docker + `ARIA_CONFIG_PATH` + `VertexAILLMClient` + LLM provider DI (incl. #84 security fix) | ✅ Done |
| Phase 1.5 | S4: Testing infrastructure — UC1/UC2/UC3 cluster wiring, KB runbooks, CMDB validation | 🔜 Planned |
| Phase 1.5 | S4: Testing infrastructure — UC1/UC2/UC3 cluster wiring, KB runbooks, CMDB validation | 🔄 In progress |
| Phase 1.5 | S5: Round 2 acceptance testing — 30 incidents on UC1 + UC2 real infrastructure | 🔜 Planned |
| Phase 1.5 | S6: GCP native connectors — BQ, Cloud Functions, Pub/Sub, GCS | 🔜 Planned |
| Phase 2 | Human validation gate + write-back to ServiceNow | 💡 Planned |
Expand Down
14 changes: 11 additions & 3 deletions api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ def get_agent3() -> ClassifierAgent:
"ARIA_AGENT3_MODEL env var is not set "
"(or ARIA_GLOBAL_MODEL when ARIA_LLM_MODE=global)"
)
return ClassifierAgent(llm_client=_get_llm_client(model))
return ClassifierAgent(
llm_client=_get_llm_client(model),
analyser_kb_dir=cfg.analyser_kb_dir(),
)


@lru_cache(maxsize=1)
Expand Down Expand Up @@ -240,12 +243,17 @@ def get_agent2() -> LogExtractorAgent:
registry = {
PlatformTag.CDP: SSHLogConnector(
vault,
ssh_key_secret="CDP_SSH_KEY",
ssh_key_secret=cfg.cdp_ssh_key_secret(),
ssh_user=cfg.cdp_ssh_user(),
host_key_secret="CDP_HOST_KEY" if os.environ.get("CDP_HOST_KEY") else None,
log_dirs=cfg.cdp_log_dirs(),
),
PlatformTag.GCP: GCPLogConnector(vault),
# resource_types scopes Cloud Logging queries to Dataproc cluster and job logs (UC2).
# S6 will generalise this into a configurable resource_type_templates dict.
PlatformTag.GCP: GCPLogConnector(
vault,
resource_types=["cloud_dataproc_cluster", "cloud_dataproc_job"],
),
}
llm = None
model = _resolve_model("2")
Expand Down
25 changes: 21 additions & 4 deletions conf_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,29 @@ gcp: # only required when connectors.log = gcp

cdp:
ssh_user: hadoop # OS user with read access to log directories below
log_dirs: # directories searched for logs on CDP cluster nodes
- /var/log/hadoop-hdfs # leave unset to use these defaults
- /var/log/hadoop-yarn
ssh_key_secret: CDP_SSH_KEY # vault key name for the SSH private key PEM.
# EnvVarVault: reads os.environ[ssh_key_secret] directly.
# GCPSecretManagerVault: normalises underscores→hyphens and
# prepends 'aria-', so 'CDP_SSH_KEY' → GCP secret 'aria-cdp-ssh-key'.
# UC1 (TF): TF provisions secret 'aria-uc1-ssh-private-key'.
# Set ssh_key_secret: CDP_UC1_SSH_PRIVATE_KEY to match, or
# rename the TF secret to 'aria-cdp-ssh-key'.
log_dirs: # directories searched for logs on CDP cluster nodes.
# UC1 (TF Hadoop VMs) use subdirectory paths — set these
# at deployment time from TF startup script:
- /var/log/hadoop/hdfs # TF UC1 NameNode / DataNode logs
- /var/log/hadoop/yarn # TF UC1 ResourceManager / NodeManager logs
- /var/log/hive
- /var/log/oozie
- /var/log/spark
- /var/log/kafka
- /var/log/zookeeper
- /var/log/oozie
- /var/log/nifi

knowledge_base:
analyser_kb_dir: data/knowledge_base/analyser_kb # labeled log excerpts for Agent 3 few-shot prompting
# doubles as fine-tuning corpus for the future
# specialist Agent 3 model (roadmap: post-POC)

slack:
channel_id: <your-slack-channel-id> # channel where Agent 4 posts notifications
68 changes: 58 additions & 10 deletions core/agents/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"""

import json
import time
from pathlib import Path
from typing import Any

from core.exceptions import ClassificationError
Expand All @@ -23,6 +25,21 @@
{"oom", "cpu", "disk", "network", "auth", "db_lock", "pipeline", "unknown"}
)


def _load_analyser_kb(directory: str) -> str:
"""Load analyser_kb files and join them as a single few-shot reference block.

Files are sorted alphabetically so the ordering is deterministic. Returns an
empty string when the directory is missing — the classifier degrades gracefully.
"""
path = Path(directory)
if not path.is_dir():
return ""
return "\n\n---\n\n".join(
f.read_text(encoding="utf-8").strip() for f in sorted(path.glob("*.md"))
)


_SYSTEM_PROMPT = """\
You are ARIA, an AI operations assistant. Classify the root cause of the incident \
from the metadata and log evidence below.
Expand Down Expand Up @@ -58,14 +75,23 @@ class ClassifierAgent:
falls back to stub behaviour (dry-run compatibility).
"""

def __init__(self, llm_client: LLMClientInterface | None = None) -> None:
def __init__(
self,
llm_client: LLMClientInterface | None = None,
analyser_kb_dir: str | None = None,
) -> None:
"""Initialise the classifier.

Args:
llm_client: LLM client used to call the model. When None, the agent
falls back to stub behaviour (error_class='unknown', LOW confidence).
analyser_kb_dir: Path to analyser_kb directory of labeled log excerpts.
When provided, examples are injected into every LLM call as
few-shot reference context. Also serves as the training data
corpus for the future fine-tuned Agent 3 model.
"""
self._llm = llm_client
self._few_shot_examples: str = _load_analyser_kb(analyser_kb_dir) if analyser_kb_dir else ""

@log_agent_lifecycle("agent3")
def run(self, state: PipelineState) -> PipelineState:
Expand Down Expand Up @@ -109,16 +135,36 @@ def run(self, state: PipelineState) -> PipelineState:
logger.info("classifier: running for %s", state.incident_number)

messages = self._build_messages(state)
try:
raw = self._llm.complete(
messages,
system=_SYSTEM_PROMPT,
temperature=0.0,
max_tokens=1024,
# Retry once with a 1-second backoff before surfacing as ClassificationError (#83).
# A transient LLM failure must not kill Agent 4 — the notify-only guarantee requires
# Agent 4 to always run, even when Agent 3 cannot classify.
_llm_exc: Exception | None = None
for attempt in range(2):
try:
raw = self._llm.complete(
messages,
system=_SYSTEM_PROMPT,
temperature=0.0,
max_tokens=1024,
)
_llm_exc = None
break
except Exception as exc:
_llm_exc = exc
if attempt == 0:
logger.warning(
"classifier: LLM call failed for %s (attempt 1/2), retrying: %s",
state.incident_number,
exc,
)
time.sleep(1)
if _llm_exc is not None:
logger.error(
"classifier: LLM call failed for %s after 2 attempts: %s",
state.incident_number,
_llm_exc,
)
except Exception as exc:
logger.error("classifier: LLM call failed for %s: %s", state.incident_number, exc)
raise ClassificationError(f"LLM call failed: {exc}") from exc
raise ClassificationError(f"LLM call failed: {_llm_exc}") from _llm_exc

try:
classification, log_request = self._parse_response(raw)
Expand Down Expand Up @@ -193,6 +239,8 @@ def _build_messages(self, state: PipelineState) -> list[dict[str, str]]:
f"Affected CI: {affected_ci}\n\n"
f"Log evidence — {log_section}"
)
if self._few_shot_examples:
content += f"\n\n## Reference log examples\n\n{self._few_shot_examples}"
return [{"role": "user", "content": content}]

def _parse_response(self, raw: str) -> tuple[ClassificationResult | None, LogRequest | None]:
Expand Down
29 changes: 28 additions & 1 deletion core/agents/log_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,12 @@ def _plan_with_llm(self, state: PipelineState) -> LogQueryPlan:
)

data = json.loads(response)
raw_paths = list(data.get("log_paths", []))
return LogQueryPlan(
connector_name=str(data["connector_name"]),
log_paths=list(data.get("log_paths", [])),
# Validate LLM-planned paths against safe prefixes to prevent path-injection
# from adversarial log content coercing the LLM toward /etc/ or ~/ targets (#85).
log_paths=_validate_log_paths(raw_paths),
keywords=list(data.get("keywords", [])),
time_window_minutes=int(data.get("time_window_minutes", _DEFAULT_WINDOW)),
reasoning=str(data.get("reasoning", "")),
Expand Down Expand Up @@ -496,3 +499,27 @@ def _empty(host: str, platform_tag: PlatformTag) -> LogQueryResult:
total_scanned=0,
confidence=ConfidenceBand.LOW,
)


# Allowed log path prefixes for LLM-planned paths (#85).
# Paths outside these prefixes are dropped before being passed to connectors.
# Mirrors the CI-name allowlist used for host resolution in _resolve_ci_from_request().
_SAFE_LOG_PREFIXES: tuple[str, ...] = ("/var/log/",)


def _validate_log_paths(paths: list[str]) -> list[str]:
"""Filter LLM-planned log paths to known-safe directory prefixes.

Prevents adversarial log content from coercing the LLM into planning paths
that point outside log directories (e.g. /etc/ssh/, ~/.ssh/). Any path that
does not start with an allowed prefix is dropped with a warning.
"""
safe = [p for p in paths if any(p.startswith(prefix) for prefix in _SAFE_LOG_PREFIXES)]
dropped = len(paths) - len(safe)
if dropped:
logger.warning(
"Agent 2: dropped %d LLM-planned log path(s) outside allowed prefixes %s",
dropped,
_SAFE_LOG_PREFIXES,
)
return safe
27 changes: 27 additions & 0 deletions core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ def cdp_ssh_user() -> str:
return _get(["cdp", "ssh_user"], "CDP_SSH_USER", "hadoop")


def cdp_ssh_key_secret() -> str:
"""Return the vault key name used to retrieve the CDP SSH private key.

For EnvVarVault: the key name is read directly from the environment.
For GCPSecretManagerVault: underscores are normalised to hyphens and an
'aria-' prefix is added — e.g. 'CDP_SSH_KEY' → GCP secret 'aria-cdp-ssh-key'.
The TF-provisioned secret name for UC1 is 'aria-uc1-ssh-private-key', which
requires setting cdp.ssh_key_secret: CDP_UC1_SSH_PRIVATE_KEY in conf.yaml
(resolves to 'aria-cdp-uc1-ssh-private-key') or renaming the TF secret.
Defaults to 'CDP_SSH_KEY' (backward-compatible with pre-S4 deployments).
"""
return _get(["cdp", "ssh_key_secret"], "CDP_SSH_KEY_SECRET", "CDP_SSH_KEY")


def cdp_log_dirs() -> list[str]:
"""Return directories to search for logs on CDP cluster nodes.

Expand Down Expand Up @@ -212,6 +226,19 @@ def run_db_path() -> str:
return _get(["runs", "db_path"], "ARIA_RUN_DB_PATH", "data/runs.db")


# ── Knowledge Base ────────────────────────────────────────────────────────────


def analyser_kb_dir() -> str | None:
"""Return path to the analyser_kb directory of labeled log excerpts for Agent 3 few-shot prompting.

Reads knowledge_base.analyser_kb_dir from conf.yaml / ARIA_ANALYSER_KB_DIR env var.
None when not configured — Agent 3 classifies without few-shot examples.
"""
val = _get(["knowledge_base", "analyser_kb_dir"], "ARIA_ANALYSER_KB_DIR")
return val or None


# ── GCP ───────────────────────────────────────────────────────────────────────


Expand Down
30 changes: 24 additions & 6 deletions core/orchestrator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from langgraph.graph.state import CompiledStateGraph

import core.config as cfg
from core.exceptions import ClassificationError
from core.interfaces.run_state_store import RunStateStoreInterface
from core.interfaces.run_store import RunStoreInterface
from core.logging_config import configure_logging
Expand Down Expand Up @@ -129,13 +130,30 @@ def _agent2_node(self, state: PipelineState) -> dict:
}

def _agent3_node(self, state: PipelineState) -> dict:
"""LangGraph node wrapper for Agent 3. Returns classification and any pending log request."""
"""LangGraph node wrapper for Agent 3. Returns classification and any pending log request.

ClassificationError is caught here rather than propagated — if Agent 3 cannot
classify, the error is recorded in state and the pipeline routes to Agent 4 so
the notify-only guarantee is never violated (#83).
"""
self._track_agent(state, "agent3")
result = self._agent3.run(state)
return {
"classification": result.classification,
"pending_log_request": result.pending_log_request,
}
try:
result = self._agent3.run(state)
return {
"classification": result.classification,
"pending_log_request": result.pending_log_request,
}
except ClassificationError as exc:
logger.error(
"agent3: ClassificationError for %s — routing to agent4 with error: %s",
state.incident_number,
exc,
)
return {
"classification": None,
"pending_log_request": None,
"error": str(exc),
}

def _agent4_node(self, state: PipelineState) -> dict:
"""LangGraph node wrapper for Agent 4. Returns notification_sent and any delivery error."""
Expand Down
20 changes: 6 additions & 14 deletions data/cluster_hosts.json
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
{
"_comment": "CI name → IP mapping for Agent 2 ReAct loop. Used when Agent 3 requests logs from a service different from the primary affected_ci. All test CIs resolve to localhost (127.0.0.1) since this VPS simulates the full cluster.",
"cdp-dn-01": "127.0.0.1",
"cdp-nn-01": "127.0.0.1",
"cdp-hms-01": "127.0.0.1",
"cdp-rm-01": "127.0.0.1",
"cdp-zk-01": "127.0.0.1",
"cdp-dn-02": "127.0.0.1",
"cdp-nn-02": "127.0.0.1",
"cdp-hs2-02": "127.0.0.1",
"cdp-dn-03": "127.0.0.1",
"cdp-cluster-prod": "127.0.0.1",
"cdp-rm-02": "127.0.0.1",
"cdp-hms-02": "127.0.0.1",
"cdp-hs2-03": "127.0.0.1"
"_comment": "CI name → IP mapping for Agent 2 ReAct loop cross-service resolution. Keys must match the CI names in ServiceNow CMDB. IPs are populated from Terraform output after UC1 cluster apply: terraform -chdir=infra/terraform/uc_testing/uc1-hadoop-onprem output -json node_internal_ips",
"cdp-master-01": "POPULATE_FROM_TF_OUTPUT",
"cdp-data-01": "POPULATE_FROM_TF_OUTPUT",
"cdp-data-02": "POPULATE_FROM_TF_OUTPUT",
"cdp-utility-01": "POPULATE_FROM_TF_OUTPUT",
"cdp-bus-01": "POPULATE_FROM_TF_OUTPUT"
}
Loading
Loading