diff --git a/docs/memory.md b/docs/memory.md index a4c69f72..f1005b70 100644 --- a/docs/memory.md +++ b/docs/memory.md @@ -2,6 +2,24 @@ CAO's memory system gives agents persistent, cross-session storage. Agents store facts, decisions, and preferences during a session; CAO injects relevant memories back as context when the agent starts its next session. +> For how it works under the hood — the three storage layers, the SQLite schema, and the read/write paths — see [`memory-internals.md`](memory-internals.md). + +## Capabilities + +- **Four scopes** (`global`, `project`, `session`, `agent`) and four type labels. +- **Store / recall / forget** via MCP tools and `cao memory` CLI commands. +- **Markdown wiki storage** with a SQLite metadata index. +- **Search**: keyword (BM25), recency, or hybrid; results ranked by recency, a + composite 3-factor score (BM25 + recency + usage), or usage. +- **Cross-references** between related memories, expandable on recall. +- **Auto-injection** into each provider's config file on terminal creation, plus a + `` block prepended to the agent's first message. +- **LLM wiki compaction** (`cao memory compact`) that rewrites topic articles. +- **Linting** (`cao memory lint`) for orphans, contradictions, stale claims, and more. +- **Self-healing** (`cao memory heal`) that turns lint findings into fixes — dry-run + by default, `--apply` to mutate, with a full audit trail. +- **Tiered retention / cleanup**, a daily audit log, and a memory Web UI. + ## How It Works 1. **Agent stores a memory** via `memory_store` MCP tool during a session @@ -20,7 +38,10 @@ Scope controls where a memory is stored and who can read it back. | `session` | `memory/global/wiki/session/` | Ephemeral: notes for current session only | | `agent` | `memory/global/wiki/agent/` | Role-specific: patterns the agent role always applies | -`project` is the default scope. The project hash is `sha256(realpath(cwd))[:12]`. +`project` is the default scope. Project identity resolves via a precedence chain — a +`CAO_PROJECT_ID` / `memory.project_id` override, then the normalized git remote URL, then +`sha256(realpath(cwd))[:12]` as a fallback — so a project stays recallable across renames +and moves. > **Note:** `session` and `agent` scopes are stored under the global container, not in their own top-level directories. Only `project` scope gets a dedicated directory keyed by project hash. @@ -59,14 +80,19 @@ Search memories by keyword query and optional filters. ``` memory_recall( - query="testing", # optional, searches content - scope="project", # optional, filter by scope - memory_type=None, # optional, filter by type - limit=10 # optional, default 10, max 100 + query="testing", # optional, searches content + scope="project", # optional, filter by scope + memory_type=None, # optional, filter by type + limit=10, # optional, default 10, max 100 + search_mode="hybrid", # optional: "hybrid" (default), "bm25", "metadata" + sort_by="recency", # optional: "recency" (default), "score", "usage" + include_related=False # optional, expand cross-referenced memories ) ``` -Results are returned sorted by recency, with scope precedence: `session` > `project` > `global`. +`sort_by` controls ranking: `recency` (newest first), `score` (composite 3-factor — +BM25 relevance + recency + usage), or `usage` (most accessed). When no `scope` is +given, results follow scope precedence: `session` > `project` > `global`. ### `memory_forget` @@ -98,11 +124,39 @@ cao memory delete --scope project --yes # Clear all memories for a scope cao memory clear --scope session --yes + +# Lint the wiki for orphans, contradictions, stale claims, etc. +cao memory lint +cao memory lint --scope project --format json + +# Compact wiki topics with the LLM compiler (repair sweep) +cao memory compact --scope global +cao memory compact --key testing-framework + +# Repair lint findings — dry-run by default, --apply to mutate +cao memory heal --scope project # dry-run plan +cao memory heal --scope project --apply +cao memory heal --scope project --apply --aggressive # also heal poison_frequency ``` +`cao memory heal` consumes the findings from `cao memory lint` and applies one fix per +issue type: it deletes orphan pages, resolves contradictions (keeping the newer article), +strips stale claims, and — only under `--aggressive` — zeroes poisoned access counts. It +is dry-run by default; pass `--apply` to mutate. Every applied mutation is written to the +daily audit log. + ## Context Injection -When an agent receives its first message in a session, CAO prepends a `` block containing relevant memories (up to 3000 characters). The block format: +CAO injects relevant memories into a new session two ways: + +1. **First-message block** — when an agent receives its first message in a session, + CAO prepends a `` block containing relevant memories. +2. **Provider config file** — built-in plugins for Claude Code, Codex, and Kiro CLI + write the same block into each provider's per-project config file (e.g. + `.claude/CLAUDE.md`) on terminal creation, delimited by `cao-memory` markers so + repeated runs overwrite the same section. + +The block format: ``` @@ -115,11 +169,16 @@ When an agent receives its first message in a session, CAO prepends a ` ``` -Memories are selected in scope precedence order: `session` > `project` > `global`. +Memories are selected in scope precedence order: `session` > `project` > `global`. Each +scope is independently capped — at most `MEMORY_MAX_PER_SCOPE` (10) entries and +`MEMORY_SCOPE_BUDGET_CHARS` (1000) characters per scope — so one scope cannot monopolize +the injection budget. -## Auto-Save +## Saving Memories -In Phase 1 there is no automatic save hook. Agents must call `memory_store` explicitly via MCP when they want to persist a fact. Agent profiles include guidance on when to store. Hook-driven auto-save is shipped via per-provider plugins in a subsequent PR. +Agents call `memory_store` explicitly via MCP when they want to persist a fact; agent +profiles include guidance on when to store (see below). Hook- and plugin-driven injection +surfaces stored memories back into later sessions automatically. ## Storage Layout diff --git a/src/cli_agent_orchestrator/cli/commands/memory.py b/src/cli_agent_orchestrator/cli/commands/memory.py index eaf6c603..c372a86f 100644 --- a/src/cli_agent_orchestrator/cli/commands/memory.py +++ b/src/cli_agent_orchestrator/cli/commands/memory.py @@ -359,3 +359,133 @@ def compact_cmd(scope, key): for topic_key, status in sorted(results.items()): click.echo(f"{status:<22} {topic_key}") click.echo(f"\nSummary: {summary}") + + +@memory.command(name="heal") +@click.option( + "--scope", + # Only global/project are resolvable from cwd context here; session/agent + # need a session_name/agent_profile that this CLI path cannot derive, so + # offering them would only ever raise "could not resolve scope_id". + type=click.Choice([MemoryScope.GLOBAL.value, MemoryScope.PROJECT.value], case_sensitive=False), + default="project", + show_default=True, + help="Scope to heal (global or project).", +) +@click.option( + "--apply", + "do_apply", + is_flag=True, + default=False, + help="Apply mutations. Without this flag, prints a dry-run plan only.", +) +@click.option( + "--aggressive", + is_flag=True, + default=False, + help="Enable destructive poison_frequency healing (requires --apply too).", +) +@click.option( + "--issue-type", + "issue_type", + type=click.Choice( + ["orphan_page", "contradiction", "stale_claim", "poison_frequency"], + case_sensitive=False, + ), + default=None, + help="Restrict healing to a single issue type.", +) +@click.option( + "--format", + "out_format", + type=click.Choice(["table", "json"], case_sensitive=False), + default="table", + show_default=True, + help="Output format.", +) +def heal_cmd(scope, do_apply, aggressive, issue_type, out_format): + """Repair wiki lint findings (orphan pages, contradictions, stale claims). + + Dry-run by DEFAULT — prints what would change. Pass --apply to mutate. + poison_frequency healing additionally requires --aggressive. + graph_density is flag-only and never mutated. + """ + import json as _json + + from cli_agent_orchestrator.services import wiki_healer + from cli_agent_orchestrator.services.wiki_lint import run_lint + + svc = _get_memory_service() + ctx = _cwd_context() + + scope_id = None + if scope != MemoryScope.GLOBAL.value: + scope_id = svc.resolve_scope_id(scope, ctx) + if scope_id is None: + raise click.ClickException(f"could not resolve scope_id for scope '{scope}'") + + project_hash = scope_id or "unknown" + + try: + issues = _run_async(run_lint(project_hash, scope=scope)) + except Exception as e: + raise click.ClickException(f"lint run failed: {e}") + + if issue_type is not None: + issues = [i for i in issues if i.issue_type == issue_type] + + try: + report = _run_async( + wiki_healer.heal( + issues, + scope=scope, + scope_id=scope_id, + apply=do_apply, + aggressive=aggressive, + ) + ) + except wiki_healer.HealConflictError as e: + raise click.ClickException(str(e)) + except Exception as e: + raise click.ClickException(f"heal failed: {e}") + + is_json = out_format.lower() == "json" + if is_json: + payload = { + "scope": report.scope, + "scope_id": report.scope_id, + "apply": report.apply, + "aggressive": report.aggressive, + "dry_run_summary": report.dry_run_summary, + "truncated_by_type": report.truncated_by_type, + "truncated_run_level": report.truncated_run_level, + "total_suppressed": report.total_suppressed, + "actions": [ + { + "issue_type": a.issue_type, + "key": a.key, + "related_key": a.related_key, + "description": a.description, + "status": a.status, + } + for a in report.actions + ], + } + click.echo(_json.dumps(payload, indent=2)) + return + + if report.dry_run_summary: + click.echo(report.dry_run_summary) + if not report.actions: + click.echo("Nothing to heal.") + return + header = f"{'STATUS':<10} {'TYPE':<22} {'KEY':<30} DESCRIPTION" + click.echo(header) + click.echo("-" * len(header)) + for a in report.actions: + click.echo(f"{a.status:<10} {a.issue_type:<22} {a.key:<30} {a.description}") + if report.total_suppressed: + click.echo( + f"\n{report.total_suppressed} action(s) suppressed by caps " + f"(by type: {report.truncated_by_type}, run-level: {report.truncated_run_level})." + ) diff --git a/src/cli_agent_orchestrator/services/audit_log.py b/src/cli_agent_orchestrator/services/audit_log.py index 08907ffe..7e60ef7c 100644 --- a/src/cli_agent_orchestrator/services/audit_log.py +++ b/src/cli_agent_orchestrator/services/audit_log.py @@ -59,6 +59,13 @@ SYNC_AUDIT_EVENTS: frozenset = frozenset( { "lint_run_completed", + # Phase 4 U1 — wiki self-healing. Each applied mutation emits an + # AWAITED audit event; ``heal_run_completed`` summarises the run. + "orphan_pruned", + "contradiction_resolved", + "stale_claim_pruned", + "poison_access_zeroed", + "heal_run_completed", } ) NOWAIT_AUDIT_EVENTS: frozenset = frozenset( diff --git a/src/cli_agent_orchestrator/services/wiki_healer.py b/src/cli_agent_orchestrator/services/wiki_healer.py new file mode 100644 index 00000000..be3eb73f --- /dev/null +++ b/src/cli_agent_orchestrator/services/wiki_healer.py @@ -0,0 +1,970 @@ +"""Wiki Healer (Phase 4 U1). + +Self-healing for the memory wiki. Consumes the frozen ``LintIssue`` records +produced by :mod:`cli_agent_orchestrator.services.wiki_lint` and, under an +explicit ``--apply`` gate, repairs three classes of finding: + +- ``orphan_page`` — delete the wiki file + drop its index.md line + SQLite row. +- ``contradiction`` — keep the newer article (by ``updated_at``), ``forget`` the loser. +- ``stale_claim`` — strip the paragraph naming the stale path/symbol, atomic rewrite. + Only the ``file not found:`` and ``symbol not found in source:`` sub-types are + healable; the ``related_keys references missing key:`` sub-type (emitted by the + graph detector under the ``stale_claim`` type) is reported ``skipped`` — there is + no prose paragraph to strip. + +``poison_frequency`` is gated behind BOTH ``--apply`` AND ``--aggressive`` (a +dual gate); ``graph_density`` is flag-only and never mutates. + +Design invariants (see issue #297): + +1. Dry-run default. ``apply=False`` mutates nothing. +2. SQL row authoritative on mutation — contradiction/poison re-read the DB row + by ``(key, scope, scope_id)`` and trust the DB, never the LintIssue payload. +3. Atomic per-issue-type batch — each group's DB writes run in one transaction. +4. Concurrency guard — a dedicated ``.heal.lock`` (fcntl.flock, LOCK_NB). +5. Bounded blast radius — ``MAX_HEAL_ACTIONS`` run-level cap + per-type caps. +6. Recovery field — ``stale_claim`` stashes the pre-strip paragraph (size-capped). +7. Full audit trail — every applied mutation emits an AWAITED audit event. +""" + +from __future__ import annotations + +import errno +import fcntl +import logging +import os +import re +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Optional + +from cli_agent_orchestrator.services.audit_log import write_audit +from cli_agent_orchestrator.services.memory_service import ( + MemoryDisabledError, + MemoryService, + _is_memory_enabled, +) +from cli_agent_orchestrator.services.wiki_lint import ISSUE_CAPS, LintIssue + +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Constants & caps +# ----------------------------------------------------------------------------- + +MAX_HEAL_ACTIONS = 200 # hard run-level cap across all issue types +STALE_CLAIM_PRESTRIP_PARAGRAPH_MAX_BYTES = 2048 # audit recovery field size cap + +# Issue types this healer mutates / reports, in deterministic batch order. +_MUTATING_TYPES = ("orphan_page", "contradiction", "stale_claim", "poison_frequency") + +# Map a mutating issue_type → the action issue_type recorded on success. +_ACTION_TYPE = { + "orphan_page": "orphan_pruned", + "contradiction": "contradiction_resolved", + "stale_claim": "stale_claim_pruned", + "poison_frequency": "poison_access_zeroed", +} + +# Description parse patterns for stale_claim. +_FILE_NOT_FOUND_RE = re.compile(r"file not found: (\S+)") +_SYMBOL_NOT_FOUND_RE = re.compile(r"symbol not found in source: (\S+)") + + +# ----------------------------------------------------------------------------- +# Exceptions +# ----------------------------------------------------------------------------- + + +class HealError(RuntimeError): + """Base error for the healer.""" + + +class HealConflictError(HealError): + """Raised when the ``.heal.lock`` cannot be acquired (another heal runs).""" + + +# ----------------------------------------------------------------------------- +# Frozen dataclasses +# ----------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class HealAction: + """Single applied (or planned) mutation outcome.""" + + issue_type: str # orphan_pruned|contradiction_resolved|stale_claim_pruned|poison_access_zeroed + key: str + related_key: Optional[str] = None + description: str = "" # human summary of what was done / would be done + status: str = "applied" # "applied"|"skipped"|"error"|"planned" + pre_strip_paragraph: Optional[str] = None # only for stale_claim; size-capped + # Buffered audit payload (event_type, summary, fields). Populated by the + # healers instead of emitting inline; the batch loop flushes it only AFTER + # the group's db.commit() succeeds, so a rolled-back mutation never leaves a + # false audit record (invariant #7, fixed). None when there is nothing to + # audit (e.g. a skipped no-op that produced no side effect). + audit: Optional[tuple] = None # (event_type: str, summary: str, fields: dict) + + +@dataclass(frozen=True) +class HealReport: + """Aggregate heal run summary.""" + + scope: str + scope_id: Optional[str] + apply: bool + aggressive: bool + actor: str = "cli" # who triggered the run — forensic anchor on heal_run_completed + actions: list = field(default_factory=list) # list[HealAction] + dry_run_summary: Optional[str] = None # non-None only when apply=False + truncated_by_type: dict = field(default_factory=dict) # {"stale_claim": 45} + truncated_run_level: int = 0 # total suppressed due to MAX_HEAL_ACTIONS + total_suppressed: int = 0 # truncated_by_type.sum() + truncated_run_level + + +# ----------------------------------------------------------------------------- +# Lock handling +# ----------------------------------------------------------------------------- + + +def _heal_lock_path(svc: MemoryService, scope: str, scope_id: Optional[str]) -> Path: + return svc._get_project_dir(scope, scope_id) / "wiki" / ".heal.lock" + + +def _acquire_lock(lock_path: Path) -> int: + """Acquire the non-blocking heal lock; raise HealConflictError on contention.""" + lock_path.parent.mkdir(parents=True, exist_ok=True) + try: + lock_fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT, 0o600) + except OSError as e: + raise HealError(f"lock open failed: {type(e).__name__}") from e + try: + fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except OSError as e: + try: + os.close(lock_fd) + except OSError: + pass + if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): + raise HealConflictError( + "another heal is running; lock acquire failed (EWOULDBLOCK)" + ) from e + raise HealError(f"lock acquire failed: {type(e).__name__}") from e + return lock_fd + + +def _release_lock(lock_fd: int) -> None: + try: + fcntl.flock(lock_fd, fcntl.LOCK_UN) + except OSError: + pass + try: + os.close(lock_fd) + except OSError: + pass + + +# ----------------------------------------------------------------------------- +# Stale-claim paragraph strip +# ----------------------------------------------------------------------------- + + +def _parse_stale_identifier(description: str) -> Optional[str]: + """Extract the stale path/symbol from a stale_claim description, or None.""" + m = _FILE_NOT_FOUND_RE.search(description or "") + if m: + return m.group(1).strip() + m = _SYMBOL_NOT_FOUND_RE.search(description or "") + if m: + return m.group(1).strip() + return None + + +def _cap_pre_strip(paragraph: str) -> str: + """Size-cap the recovery paragraph to STALE_CLAIM_PRESTRIP_PARAGRAPH_MAX_BYTES.""" + enc = paragraph.encode("utf-8") + if len(enc) <= STALE_CLAIM_PRESTRIP_PARAGRAPH_MAX_BYTES: + return paragraph + return ( + enc[:STALE_CLAIM_PRESTRIP_PARAGRAPH_MAX_BYTES].decode("utf-8", errors="ignore") + + "[…truncated]" + ) + + +def _strip_stale_paragraph(content: str, stale_id: str) -> tuple[str, Optional[str]]: + """Strip the first paragraph that names ``stale_id``. + + Returns ``(new_content, pre_strip_paragraph)``. ``pre_strip_paragraph`` is + None when no paragraph matched (content unchanged). A paragraph is a + maximal run of non-blank lines bounded by blank lines / EOF. + """ + lines = content.split("\n") + + # Group lines into paragraphs: list of (lines, start_idx). + paragraphs: list[list[str]] = [] + current: list[str] = [] + for line in lines: + if line.strip() == "": + if current: + paragraphs.append(current) + current = [] + else: + current.append(line) + if current: + paragraphs.append(current) + + pattern = re.compile(r"\b" + re.escape(stale_id) + r"\b") + target_idx: Optional[int] = None + for i, para in enumerate(paragraphs): + if any(pattern.search(ln) for ln in para): + target_idx = i + break + + if target_idx is None: + return content, None + + pre_strip = "\n".join(paragraphs[target_idx]) + remaining = [p for i, p in enumerate(paragraphs) if i != target_idx] + if remaining: + rebuilt = "\n\n".join("\n".join(p) for p in remaining).strip() + "\n" + else: + rebuilt = "" + return rebuilt, pre_strip + + +# ----------------------------------------------------------------------------- +# Per-issue healers (mutating; called only when apply gate is satisfied) +# ----------------------------------------------------------------------------- + + +def _now_ts() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _row_exists(db: Any, key: str, scope: str, scope_id: Optional[str]) -> bool: + """True iff a metadata row exists for (key, scope, scope_id) on ``db``.""" + from cli_agent_orchestrator.clients.database import MemoryMetadataModel + + return ( + db.query(MemoryMetadataModel) + .filter( + MemoryMetadataModel.key == key, + MemoryMetadataModel.scope == scope, + ( + MemoryMetadataModel.scope_id == scope_id + if scope_id is not None + else MemoryMetadataModel.scope_id.is_(None) + ), + ) + .first() + ) is not None + + +# Mirror the index-entry shape parsed by wiki_lint._detect_orphan_pages. +_INDEX_KEY_RE = re.compile(r"\[([a-z0-9-]{1,60})\]") + + +def _index_has_key(svc: MemoryService, scope: str, scope_id: Optional[str], key: str) -> bool: + """True iff ``key`` appears as an index entry for this (scope, scope_id). + + Best-effort read of the container's ``index.md``. On any read error we + return ``True`` (treat as present) so a transient failure never green-lights + a deletion — the orphan guard must fail safe. + """ + try: + index_path = svc.get_index_path(scope, scope_id) + if not index_path.exists(): + return False + for line in index_path.read_text(encoding="utf-8").splitlines(): + m = _INDEX_KEY_RE.search(line) + if m and m.group(1) == key: + return True + return False + except OSError: + return True + + +def _delete_row(db: Any, key: str, scope: str, scope_id: Optional[str]) -> int: + """Delete the metadata row for (key, scope, scope_id) on the SHARED session. + + Does NOT commit — the caller's per-issue-type batch transaction owns the + commit/rollback boundary (design invariant #3). + """ + from cli_agent_orchestrator.clients.database import MemoryMetadataModel + + deleted: int = ( + db.query(MemoryMetadataModel) + .filter( + MemoryMetadataModel.key == key, + MemoryMetadataModel.scope == scope, + ( + MemoryMetadataModel.scope_id == scope_id + if scope_id is not None + else MemoryMetadataModel.scope_id.is_(None) + ), + ) + .delete() + ) + return deleted + + +async def _heal_orphan( + svc: MemoryService, issue: LintIssue, scope: str, scope_id: Optional[str], db: Any +) -> HealAction: + """Delete wiki file + index line + SQLite row for one orphan_page issue. + + The SQLite row delete runs on the SHARED ``db`` session (no per-issue + commit); the batch transaction is committed once by the caller. + """ + try: + key = svc._sanitize_key(issue.key) + except ValueError: + return HealAction( + "orphan_pruned", issue.key, status="skipped", description="key failed sanitisation" + ) + try: + wiki_path = svc.get_wiki_path(scope, scope_id, key) + except ValueError: + return HealAction( + "orphan_pruned", key, status="skipped", description="path traversal guard rejected key" + ) + + # Defensive orphan re-check (cross-project safety). The LintIssue carries no + # scope_id, and run_lint(scope="project") returns orphans across ALL project + # containers; the healer reconstructs wiki_path using the CURRENT (scope, + # scope_id). A key collision (project B has an orphan key that is a LIVE + # memory in project A) would otherwise delete a valid file. An orphan is, by + # definition, absent from BOTH the index and SQLite for its own container — + # so if either is present for THIS (scope, scope_id) the file is not + # orphaned here and we refuse to unlink it. + if _row_exists(db, key, scope, scope_id) or _index_has_key(svc, scope, scope_id, key): + return HealAction( + "orphan_pruned", + key, + status="skipped", + description="not orphaned in this scope (SQLite row or index entry present); refusing delete", + ) + + if not wiki_path.exists(): + # Already gone — still drop any stale index/metadata. Track whether the + # best-effort index removal succeeded so the description does not claim + # cleanup that did not happen. + index_removed = True + try: + svc._update_index(scope, scope_id, key, "", "", "", _now_ts(), "remove") + except Exception as e: + index_removed = False + logger.warning("orphan_pruned index remove failed key=%s: %s", key, type(e).__name__) + _delete_row(db, key, scope, scope_id) + desc = ( + "file already absent; index/metadata cleaned" + if index_removed + else "file already absent; metadata row removed (index removal failed, see logs)" + ) + return HealAction( + "orphan_pruned", + key, + status="applied", + description=desc, + audit=( + "orphan_pruned", + f"deleted orphan wiki file: {key}", + { + "key": key, + "scope": scope, + "scope_id": scope_id or "", + "file_path": str(wiki_path), + }, + ), + ) + + try: + wiki_path.unlink() + svc._update_index(scope, scope_id, key, "", "", "", _now_ts(), "remove") + _delete_row(db, key, scope, scope_id) + except Exception as e: + logger.warning("orphan_pruned failed key=%s: %s", key, type(e).__name__) + return HealAction( + "orphan_pruned", key, status="error", description=f"delete failed: {type(e).__name__}" + ) + + return HealAction( + "orphan_pruned", + key, + status="applied", + description=f"deleted {wiki_path.name}, index line, metadata row", + audit=( + "orphan_pruned", + f"deleted orphan wiki file: {key}", + { + "key": key, + "scope": scope, + "scope_id": scope_id or "", + "file_path": str(wiki_path), + }, + ), + ) + + +async def _heal_contradiction( + svc: MemoryService, issue: LintIssue, scope: str, scope_id: Optional[str], db: Any +) -> HealAction: + """Keep newer article (by updated_at), forget the loser. SQL-row authoritative.""" + from cli_agent_orchestrator.clients.database import MemoryMetadataModel + + key_a = issue.key + key_b = issue.related_key + if not key_b: + return HealAction( + "contradiction_resolved", + key_a, + status="skipped", + description="contradiction missing related_key", + ) + + # SQL-row authoritative: re-read both rows on the SHARED batch session. + def _read(k: str) -> Optional[Any]: + return ( + db.query(MemoryMetadataModel) + .filter( + MemoryMetadataModel.key == k, + MemoryMetadataModel.scope == scope, + ( + MemoryMetadataModel.scope_id == scope_id + if scope_id is not None + else MemoryMetadataModel.scope_id.is_(None) + ), + ) + .first() + ) + + row_a = _read(key_a) + row_b = _read(key_b) + if row_a is None or row_b is None: + return HealAction( + "contradiction_resolved", + key_a, + related_key=key_b, + status="skipped", + description="one or both rows missing; nothing to resolve", + ) + + ua = row_a.updated_at + ub = row_b.updated_at + if ua.tzinfo is None: + ua = ua.replace(tzinfo=timezone.utc) + if ub.tzinfo is None: + ub = ub.replace(tzinfo=timezone.utc) + + # Keep the newer article. On a same-second tie (DB updated_at is + # second-resolution, so ties are plausible) fall back to a DETERMINISTIC + # tiebreak — keep the lexicographically-smaller key — so the choice of which + # memory survives is reproducible and never depends on argument order. + if ua > ub or (ua == ub and key_a <= key_b): + winner_key, loser_key = key_a, key_b + winner_ts, loser_ts = ua, ub + else: + winner_key, loser_key = key_b, key_a + winner_ts, loser_ts = ub, ua + + # Forget the loser. The DB-row delete runs on the SHARED session (deferred + # commit / batch transaction, invariant #3); the non-DB side effects (wiki + # file + index line) are applied immediately. The memory-disabled kill + # switch is enforced at heal() entry; re-check defensively here so a + # disabled state never produces a silent mutation. + if not _is_memory_enabled(): + raise MemoryDisabledError( + "memory is disabled (memory.enabled=false); refusing to heal wiki" + ) + try: + try: + loser_key_s = svc._sanitize_key(loser_key) + loser_path = svc.get_wiki_path(scope, scope_id, loser_key_s) + except ValueError: + loser_key_s, loser_path = loser_key, None + if loser_path is not None and loser_path.exists(): + loser_path.unlink() + svc._update_index(scope, scope_id, loser_key_s, "", "", "", _now_ts(), "remove") + _delete_row(db, loser_key_s, scope, scope_id) + except Exception as e: + logger.warning( + "contradiction_resolved forget failed key=%s: %s", loser_key, type(e).__name__ + ) + return HealAction( + "contradiction_resolved", + winner_key, + related_key=loser_key, + status="error", + description=f"forget failed: {type(e).__name__}", + ) + + return HealAction( + "contradiction_resolved", + winner_key, + related_key=loser_key, + status="applied", + description=f"kept {winner_key}, forgot {loser_key}", + audit=( + "contradiction_resolved", + f"kept {winner_key}, forgot {loser_key}", + { + "winner_key": winner_key, + "loser_key": loser_key, + "scope": scope, + "scope_id": scope_id or "", + "winner_updated_at": winner_ts.strftime("%Y-%m-%dT%H:%M:%SZ"), + "loser_updated_at": loser_ts.strftime("%Y-%m-%dT%H:%M:%SZ"), + }, + ), + ) + + +async def _heal_stale_claim( + svc: MemoryService, issue: LintIssue, scope: str, scope_id: Optional[str], db: Any +) -> HealAction: + """Strip the paragraph naming the stale path/symbol; atomic rewrite. + + The primary mutation is the atomic file rewrite. The metadata ``updated_at`` + stamp is a best-effort, non-critical follow-up via ``_upsert_metadata`` (its + own short transaction) and is intentionally NOT part of the batch + transaction — a stamp failure must never roll back a successful rewrite. + """ + stale_id = _parse_stale_identifier(issue.description) + if stale_id is None: + return HealAction( + "stale_claim_pruned", + issue.key, + status="skipped", + description="description format unrecognised; no strip", + ) + + try: + key = svc._sanitize_key(issue.key) + wiki_path = svc.get_wiki_path(scope, scope_id, key) + except ValueError: + return HealAction( + "stale_claim_pruned", + issue.key, + status="skipped", + description="key failed sanitisation / traversal guard", + ) + + if not wiki_path.exists(): + return HealAction( + "stale_claim_pruned", key, status="skipped", description="article vanished before strip" + ) + + try: + content = wiki_path.read_text(encoding="utf-8") + except OSError as e: + return HealAction( + "stale_claim_pruned", + key, + status="error", + description=f"read failed: {type(e).__name__}", + ) + + new_content, pre_strip = _strip_stale_paragraph(content, stale_id) + capped_pre_strip = _cap_pre_strip(pre_strip) if pre_strip is not None else None + + if pre_strip is None: + # No paragraph matched — leave content unchanged, audit + skipped. This + # records a read-only outcome (nothing mutated), so it is safe to emit + # regardless of the batch commit; buffer it for uniformity. + return HealAction( + "stale_claim_pruned", + key, + status="skipped", + description=f"stale id {stale_id} not found in article", + pre_strip_paragraph=None, + audit=( + "stale_claim_pruned", + f"no paragraph found for {stale_id} in {key}", + { + "key": key, + "scope": scope, + "scope_id": scope_id or "", + "stale_identifier": stale_id, + "pre_strip_paragraph": "", + }, + ), + ) + + tmp_path = wiki_path.parent / f".{wiki_path.stem}.strip.tmp" + try: + tmp_path.write_text(new_content, encoding="utf-8") + os.replace(str(tmp_path), str(wiki_path)) + except OSError as e: + try: + if tmp_path.exists(): + tmp_path.unlink() + except OSError: + pass + return HealAction( + "stale_claim_pruned", + key, + status="error", + description=f"atomic rewrite failed: {type(e).__name__}", + ) + + # Stamp the metadata updated_at so the article is seen as freshly changed. + try: + svc._upsert_metadata( + key=key, + memory_type=svc._memory_type_from_file(new_content), + scope=scope, + scope_id=scope_id, + file_path=str(wiki_path), + tags=svc._tags_from_file(new_content), + source_provider=None, + source_terminal_id=None, + token_estimate=len(new_content) // 4, + preserve_provenance=True, + ) + except Exception as e: + logger.debug("stale_claim metadata stamp failed key=%s: %s", key, type(e).__name__) + + return HealAction( + "stale_claim_pruned", + key, + status="applied", + description=f"stripped paragraph naming {stale_id}", + pre_strip_paragraph=capped_pre_strip, + audit=( + "stale_claim_pruned", + f"stripped stale reference to {stale_id} in article {key}", + { + "key": key, + "scope": scope, + "scope_id": scope_id or "", + "stale_identifier": stale_id, + # Already byte-capped by _cap_pre_strip(); audit_log re-caps defensively. + "pre_strip_paragraph": capped_pre_strip or "", + }, + ), + ) + + +async def _heal_poison( + svc: MemoryService, issue: LintIssue, scope: str, scope_id: Optional[str], db: Any +) -> HealAction: + """Zero access_count + last_accessed_at for one poison_frequency row. + + SQL-row authoritative: re-read and mutate the row on the SHARED batch + session (no per-issue commit; the caller's batch transaction owns the + commit/rollback boundary). + """ + from cli_agent_orchestrator.clients.database import MemoryMetadataModel + + key = issue.key + try: + row = ( + db.query(MemoryMetadataModel) + .filter( + MemoryMetadataModel.key == key, + MemoryMetadataModel.scope == scope, + ( + MemoryMetadataModel.scope_id == scope_id + if scope_id is not None + else MemoryMetadataModel.scope_id.is_(None) + ), + ) + .first() + ) + if row is None: + return HealAction( + "poison_access_zeroed", + key, + status="skipped", + description="row missing; nothing to zero", + ) + old_count = int(row.access_count or 0) + row.access_count = 0 + row.last_accessed_at = None + except Exception as e: + logger.warning("poison_access_zeroed failed key=%s: %s", key, type(e).__name__) + return HealAction( + "poison_access_zeroed", + key, + status="error", + description=f"zero failed: {type(e).__name__}", + ) + + return HealAction( + "poison_access_zeroed", + key, + status="applied", + description=f"reset access_count {old_count} → 0", + audit=( + "poison_access_zeroed", + f"reset access_count from {old_count} to 0", + { + "key": key, + "scope": scope, + "scope_id": scope_id or "", + "access_count_was": str(old_count), + }, + ), + ) + + +_HEALERS = { + "orphan_page": _heal_orphan, + "contradiction": _heal_contradiction, + "stale_claim": _heal_stale_claim, + "poison_frequency": _heal_poison, +} + + +# ----------------------------------------------------------------------------- +# Public entry point +# ----------------------------------------------------------------------------- + + +def _plan_description(issue: LintIssue) -> str: + """Human-readable 'Would ...' line for a dry-run action.""" + t = issue.issue_type + if t == "orphan_page": + return f"Would delete orphan wiki file: {issue.key}" + if t == "contradiction": + return f"Would resolve contradiction {issue.key} vs {issue.related_key} (keep newer)" + if t == "stale_claim": + sid = _parse_stale_identifier(issue.description) + return f"Would strip paragraph naming {sid or ''} in {issue.key}" + if t == "poison_frequency": + return f"Would zero access_count of {issue.key}" + return f"Would heal {t}: {issue.key}" + + +async def heal( + issues: list, + *, + scope: str, + scope_id: Optional[str], + apply: bool = False, + aggressive: bool = False, + actor: str = "cli", + svc: Optional[MemoryService] = None, +) -> HealReport: + """Heal a wiki from lint findings. + + Dry-run by default (``apply=False`` mutates nothing). ``poison_frequency`` + is gated behind BOTH ``apply`` AND ``aggressive``. ``graph_density`` and + ``lint_error`` bookkeeping rows are never healed. + """ + # Memory-disabled kill switch: refuse to mutate (or even plan) when memory + # is turned off. Raised early so the critical security signal propagates to + # the caller instead of being swallowed by a per-issue try/except. + if not _is_memory_enabled(): + raise MemoryDisabledError( + "memory is disabled (memory.enabled=false); refusing to heal wiki" + ) + + if svc is None: + svc = MemoryService() + + # 1. Filter: drop lint_error bookkeeping; drop graph_density (flag-only); + # drop poison_frequency unless dual gate satisfied. + poison_allowed = apply and aggressive + by_type: dict = {t: [] for t in _MUTATING_TYPES} + for issue in issues: + if not isinstance(issue, LintIssue): + continue + t = issue.issue_type + if t == "lint_error" or t == "graph_density": + continue + if t == "poison_frequency" and not poison_allowed: + continue + if t not in by_type: + continue + by_type[t].append(issue) + + actions: list = [] + truncated_by_type: dict = {} + truncated_run_level = 0 + actions_applied = 0 + + # Dry-run: no lock, no mutations. Build the plan (respecting caps so the + # plan matches what an apply would do). + if not apply: + for t in _MUTATING_TYPES: + cap = ISSUE_CAPS.get(t) # contradiction has no per-type cap + for issue in by_type[t]: + if actions_applied >= MAX_HEAL_ACTIONS: + truncated_run_level += 1 + continue + if cap is not None and ( + sum( + 1 + for a in actions + if a.status != "skipped" and _action_src_type(a.issue_type) == t + ) + >= cap + ): + truncated_by_type[t] = truncated_by_type.get(t, 0) + 1 + continue + # A stale_claim whose description the healer cannot parse + # (e.g. the "related_keys references missing key:" sub-type + # emitted by the graph detector) is a no-op at apply time. Mark + # it skipped in the plan too, so the dry-run never advertises a + # strip that apply would silently decline. + if t == "stale_claim" and _parse_stale_identifier(issue.description) is None: + actions.append( + HealAction( + _ACTION_TYPE[t], + issue.key, + related_key=issue.related_key, + description="description format unrecognised; would skip", + status="skipped", + ) + ) + continue + actions.append( + HealAction( + _ACTION_TYPE[t], + issue.key, + related_key=issue.related_key, + description=_plan_description(issue), + status="planned", + ) + ) + actions_applied += 1 + total_suppressed = sum(truncated_by_type.values()) + truncated_run_level + n_applicable = sum(len(v) for v in by_type.values()) + n_planned = sum(1 for a in actions if a.status == "planned") + summary = ( + f"Would apply {n_planned} of {n_applicable} actionable findings" + f" ({total_suppressed} suppressed by caps)." + ) + if not poison_allowed and any( + i.issue_type == "poison_frequency" for i in issues if isinstance(i, LintIssue) + ): + summary += " Poison issues gated behind --apply --aggressive." + report = HealReport( + scope=scope, + scope_id=scope_id, + apply=False, + aggressive=aggressive, + actor=actor, + actions=actions, + dry_run_summary=summary, + truncated_by_type=truncated_by_type, + truncated_run_level=truncated_run_level, + total_suppressed=total_suppressed, + ) + await _emit_run_completed(report) + return report + + # 2. Apply path: acquire the non-blocking heal lock for the whole run. + # Each issue-type group's DB writes run in ONE transaction on a shared + # session (invariant #3): a commit failure rolls THAT group back and + # marks its actions error; other groups proceed independently. + lock_fd = _acquire_lock(_heal_lock_path(svc, scope, scope_id)) + try: + for t in _MUTATING_TYPES: + cap = ISSUE_CAPS.get(t) + healer = _HEALERS[t] + n_this_type = 0 + group_start = len(actions) + db = svc._get_db_session() + try: + for issue in by_type[t]: + if actions_applied >= MAX_HEAL_ACTIONS: + truncated_run_level += 1 + continue + if cap is not None and n_this_type >= cap: + truncated_by_type[t] = truncated_by_type.get(t, 0) + 1 + continue + action = await healer(svc, issue, scope, scope_id, db) + actions.append(action) + # Only actionable work consumes the blast-radius budget; a + # skipped no-op (unparseable description, missing row, …) + # mutated nothing and must not crowd out a real fix. + if action.status != "skipped": + n_this_type += 1 + actions_applied += 1 + db.commit() + except Exception as e: + # Roll the whole group back at the DB level; filesystem side effects (if any) + # may already have occurred, so surface as errored actions and avoid emitting + # mutation audits for this group. + logger.warning("heal batch group commit failed type=%s: %s", t, type(e).__name__) + try: + db.rollback() + except Exception: + pass + rolled = actions[group_start:] + del actions[group_start:] + for a in rolled: + actions.append( + HealAction( + a.issue_type, + a.key, + related_key=a.related_key, + description=( + f"commit failed ({type(e).__name__}); DB rolled back; " + f"filesystem changes may be partial: {a.description}" + ), + status="error", + pre_strip_paragraph=a.pre_strip_paragraph, + ) + ) + continue + finally: + try: + db.close() + except Exception: + pass + # Commit succeeded — NOW flush the group's buffered audit events. + # Emitting only post-commit guarantees the audit log never records a + # mutation that was rolled back. + for a in actions[group_start:]: + if a.audit is not None: + event_type, summary, fields = a.audit + await write_audit(event_type, summary, **fields) + finally: + _release_lock(lock_fd) + + total_suppressed = sum(truncated_by_type.values()) + truncated_run_level + report = HealReport( + scope=scope, + scope_id=scope_id, + apply=True, + aggressive=aggressive, + actor=actor, + actions=actions, + dry_run_summary=None, + truncated_by_type=truncated_by_type, + truncated_run_level=truncated_run_level, + total_suppressed=total_suppressed, + ) + await _emit_run_completed(report) + return report + + +def _action_src_type(action_type: str) -> str: + """Reverse-map an action issue_type back to its lint source type.""" + for src, act in _ACTION_TYPE.items(): + if act == action_type: + return src + return action_type + + +async def _emit_run_completed(report: HealReport) -> None: + breakdown = ",".join(f"{k}:{v}" for k, v in sorted(report.truncated_by_type.items())) or "none" + n_applied = sum(1 for a in report.actions if a.status == "applied") + await write_audit( + "heal_run_completed", + f"heal completed: {n_applied} actions applied, {report.total_suppressed} suppressed", + scope=report.scope, + scope_id=report.scope_id or "", + actor=report.actor, + apply="true" if report.apply else "false", + aggressive="true" if report.aggressive else "false", + n_actions=str(len(report.actions)), + n_truncated=str(report.total_suppressed), + truncation_breakdown=breakdown, + ) diff --git a/test/cli/commands/test_memory.py b/test/cli/commands/test_memory.py index 1de02d65..302eb5e3 100644 --- a/test/cli/commands/test_memory.py +++ b/test/cli/commands/test_memory.py @@ -11,6 +11,7 @@ from cli_agent_orchestrator.cli.commands.memory import ( clear, delete, + heal_cmd, lint_cmd, list_memories, show, @@ -400,3 +401,167 @@ def test_completion_summary_excluded_from_json_payload(self, mock_get_svc, mock_ assert len(payload) == 1 assert payload[0]["key"] == "lonely" assert not any("lint_run_completed" in p["description"] for p in payload) + + +class TestMemoryHeal: + """cao memory heal — dry-run default, --apply gate, poison dual-gate.""" + + @patch("cli_agent_orchestrator.services.wiki_healer.heal", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.services.wiki_lint.run_lint", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.cli.commands.memory._get_memory_service") + def test_dry_run_default(self, mock_get_svc, mock_run_lint, mock_heal): + from cli_agent_orchestrator.services.wiki_healer import HealAction, HealReport + + mock_svc = MagicMock() + mock_svc.resolve_scope_id = MagicMock(return_value="proj-abc") + mock_get_svc.return_value = mock_svc + mock_run_lint.return_value = [] + mock_heal.return_value = HealReport( + scope="project", + scope_id="proj-abc", + apply=False, + aggressive=False, + actions=[ + HealAction( + "orphan_pruned", "lonely", description="Would delete orphan", status="planned" + ) + ], + dry_run_summary="Would apply 1 of 1 actionable findings (0 suppressed by caps).", + ) + + runner = CliRunner() + result = runner.invoke(heal_cmd, ["--scope", "project"]) + + assert result.exit_code == 0 + # heal() was called with apply=False by default. + _, kwargs = mock_heal.call_args + assert kwargs["apply"] is False + assert kwargs["aggressive"] is False + assert "Would apply 1" in result.stdout + assert "planned" in result.stdout + + @patch("cli_agent_orchestrator.services.wiki_healer.heal", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.services.wiki_lint.run_lint", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.cli.commands.memory._get_memory_service") + def test_apply_flag_passed_through(self, mock_get_svc, mock_run_lint, mock_heal): + from cli_agent_orchestrator.services.wiki_healer import HealAction, HealReport + + mock_svc = MagicMock() + mock_svc.resolve_scope_id = MagicMock(return_value="proj-abc") + mock_get_svc.return_value = mock_svc + mock_run_lint.return_value = [] + mock_heal.return_value = HealReport( + scope="project", + scope_id="proj-abc", + apply=True, + aggressive=False, + actions=[ + HealAction("orphan_pruned", "lonely", description="deleted", status="applied") + ], + ) + + runner = CliRunner() + result = runner.invoke(heal_cmd, ["--scope", "project", "--apply"]) + + assert result.exit_code == 0 + _, kwargs = mock_heal.call_args + assert kwargs["apply"] is True + assert "applied" in result.stdout + + @patch("cli_agent_orchestrator.services.wiki_healer.heal", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.services.wiki_lint.run_lint", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.cli.commands.memory._get_memory_service") + def test_aggressive_flag_passed_through(self, mock_get_svc, mock_run_lint, mock_heal): + from cli_agent_orchestrator.services.wiki_healer import HealReport + + mock_svc = MagicMock() + mock_svc.resolve_scope_id = MagicMock(return_value="proj-abc") + mock_get_svc.return_value = mock_svc + mock_run_lint.return_value = [] + mock_heal.return_value = HealReport( + scope="project", scope_id="proj-abc", apply=True, aggressive=True, actions=[] + ) + + runner = CliRunner() + result = runner.invoke(heal_cmd, ["--scope", "project", "--apply", "--aggressive"]) + + assert result.exit_code == 0 + _, kwargs = mock_heal.call_args + assert kwargs["apply"] is True + assert kwargs["aggressive"] is True + assert "Nothing to heal." in result.stdout + + @patch("cli_agent_orchestrator.services.wiki_healer.heal", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.services.wiki_lint.run_lint", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.cli.commands.memory._get_memory_service") + def test_issue_type_filter(self, mock_get_svc, mock_run_lint, mock_heal): + from cli_agent_orchestrator.services.wiki_healer import HealReport + from cli_agent_orchestrator.services.wiki_lint import LintIssue + + mock_svc = MagicMock() + mock_svc.resolve_scope_id = MagicMock(return_value="proj-abc") + mock_get_svc.return_value = mock_svc + mock_run_lint.return_value = [ + LintIssue(issue_type="orphan_page", key="a"), + LintIssue(issue_type="stale_claim", key="b", description="file not found: x.py"), + ] + mock_heal.return_value = HealReport( + scope="project", scope_id="proj-abc", apply=False, aggressive=False, actions=[] + ) + + runner = CliRunner() + result = runner.invoke(heal_cmd, ["--scope", "project", "--issue-type", "orphan_page"]) + + assert result.exit_code == 0 + passed_issues = mock_heal.call_args.args[0] + assert all(i.issue_type == "orphan_page" for i in passed_issues) + assert len(passed_issues) == 1 + + @patch("cli_agent_orchestrator.services.wiki_healer.heal", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.services.wiki_lint.run_lint", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.cli.commands.memory._get_memory_service") + def test_json_output(self, mock_get_svc, mock_run_lint, mock_heal): + import json + + from cli_agent_orchestrator.services.wiki_healer import HealAction, HealReport + + mock_svc = MagicMock() + mock_svc.resolve_scope_id = MagicMock(return_value="proj-abc") + mock_get_svc.return_value = mock_svc + mock_run_lint.return_value = [] + mock_heal.return_value = HealReport( + scope="project", + scope_id="proj-abc", + apply=True, + aggressive=False, + actions=[ + HealAction("orphan_pruned", "lonely", description="deleted", status="applied") + ], + total_suppressed=0, + ) + + runner = CliRunner() + result = runner.invoke(heal_cmd, ["--scope", "project", "--apply", "--format", "json"]) + + assert result.exit_code == 0 + payload = json.loads(result.stdout) + assert payload["apply"] is True + assert payload["actions"][0]["key"] == "lonely" + + @patch("cli_agent_orchestrator.services.wiki_healer.heal", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.services.wiki_lint.run_lint", new_callable=AsyncMock) + @patch("cli_agent_orchestrator.cli.commands.memory._get_memory_service") + def test_lock_conflict_surfaces_clean_error(self, mock_get_svc, mock_run_lint, mock_heal): + from cli_agent_orchestrator.services.wiki_healer import HealConflictError + + mock_svc = MagicMock() + mock_svc.resolve_scope_id = MagicMock(return_value="proj-abc") + mock_get_svc.return_value = mock_svc + mock_run_lint.return_value = [] + mock_heal.side_effect = HealConflictError("another heal is running") + + runner = CliRunner() + result = runner.invoke(heal_cmd, ["--scope", "project", "--apply"]) + + assert result.exit_code != 0 + assert "another heal is running" in result.output diff --git a/test/services/test_audit_log.py b/test/services/test_audit_log.py index 8c7dd02e..71cf774a 100644 --- a/test/services/test_audit_log.py +++ b/test/services/test_audit_log.py @@ -393,6 +393,12 @@ def test_sync_set_contents(self): assert SYNC_AUDIT_EVENTS == frozenset( { "lint_run_completed", + # Phase 4 U1 — wiki self-healing mutations. + "orphan_pruned", + "contradiction_resolved", + "stale_claim_pruned", + "poison_access_zeroed", + "heal_run_completed", } ) diff --git a/test/services/test_wiki_healer.py b/test/services/test_wiki_healer.py new file mode 100644 index 00000000..51cdd734 --- /dev/null +++ b/test/services/test_wiki_healer.py @@ -0,0 +1,631 @@ +"""Wiki Healer tests (Phase 4 U1). + +Covers the locked decisions and design invariants: + +- Dry-run default mutates NOTHING (no file, no DB). +- Each of the 3 fixes (orphan_page, contradiction, stale_claim) applied. +- poison_frequency gated behind --apply AND --aggressive (dual gate). +- graph_density never mutates / never reported. +- lint_error bookkeeping rows ignored. +- caps / truncation reported, no silent drops. +- audit event emitted per applied mutation + one heal_run_completed. +- SQL-row-authoritative behaviour for contradiction / poison. +- concurrency lock (.heal.lock) raises HealConflictError. +""" + +from __future__ import annotations + +import asyncio +import fcntl +import os +from datetime import datetime, timezone +from pathlib import Path + +import pytest +from sqlalchemy import create_engine + +from cli_agent_orchestrator.clients.database import Base, MemoryMetadataModel +from cli_agent_orchestrator.services import audit_log, wiki_healer +from cli_agent_orchestrator.services.memory_service import MemoryService +from cli_agent_orchestrator.services.wiki_healer import ( + MAX_HEAL_ACTIONS, + STALE_CLAIM_PRESTRIP_PARAGRAPH_MAX_BYTES, + HealConflictError, + _parse_stale_identifier, + _strip_stale_paragraph, + heal, +) +from cli_agent_orchestrator.services.wiki_lint import _make_issue + + +def _run(coro): + return asyncio.run(coro) + + +@pytest.fixture +def db_engine(tmp_path): + db_path = tmp_path / "test.db" + engine = create_engine(f"sqlite:///{db_path}", connect_args={"check_same_thread": False}) + Base.metadata.create_all(bind=engine) + return engine + + +@pytest.fixture +def svc(tmp_path, db_engine): + return MemoryService(base_dir=tmp_path, db_engine=db_engine) + + +@pytest.fixture +def audit_base(tmp_path, monkeypatch): + """Redirect audit log writes to a tmp dir so we can assert on emitted events.""" + base = tmp_path / "audit-base" + base.mkdir() + monkeypatch.setattr(audit_log, "MEMORY_BASE_DIR", base) + return base + + +def _read_audit(base: Path) -> str: + date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + p = base / "logs" / "memory" / f"{date_str}.md" + return p.read_text(encoding="utf-8") if p.exists() else "" + + +def _store(svc, key, content="body content here", *, scope="global", tags="t"): + return _run( + svc.store(content=content, scope=scope, memory_type="reference", key=key, tags=tags) + ) + + +def _row(svc, key, scope="global", scope_id=None): + with svc._get_db_session() as db: + q = db.query(MemoryMetadataModel).filter( + MemoryMetadataModel.key == key, + MemoryMetadataModel.scope == scope, + ( + MemoryMetadataModel.scope_id == scope_id + if scope_id is not None + else MemoryMetadataModel.scope_id.is_(None) + ), + ) + return q.first() + + +def _make_orphan(svc, key, content="orphan body", *, scope="global", scope_id=None): + """Create a TRUE orphan: a wiki file on disk with NO SQLite row and NO + index entry. This is the only input the orphan healer should ever delete — + a key that still has a row/index line in this scope is a live memory and is + refused by the defensive re-check (cross-project collision safety).""" + wiki_path = svc.get_wiki_path(scope, scope_id, key) + wiki_path.parent.mkdir(parents=True, exist_ok=True) + wiki_path.write_text(content, encoding="utf-8") + return wiki_path + + +# =========================================================================== +# Strip algorithm unit tests +# =========================================================================== + + +class TestStripAlgorithm: + def test_parse_file_identifier(self): + assert _parse_stale_identifier("file not found: src/config.py") == "src/config.py" + + def test_parse_symbol_identifier(self): + assert _parse_stale_identifier("symbol not found in source: MY_FUNC") == "MY_FUNC" + + def test_parse_unknown_returns_none(self): + assert _parse_stale_identifier("something else entirely") is None + + def test_strip_first_matching_paragraph(self): + content = ( + "First paragraph fine.\n" + "Still fine.\n" + "\n" + "This one mentions src/config.py and is stale.\n" + "\n" + "Last paragraph fine.\n" + ) + new, pre = _strip_stale_paragraph(content, "src/config.py") + assert pre is not None + assert "src/config.py" in pre + assert "src/config.py" not in new + assert "First paragraph fine." in new + assert "Last paragraph fine." in new + + def test_strip_stops_at_first_match(self): + content = "para one foo\n\npara two foo\n" + new, pre = _strip_stale_paragraph(content, "foo") + # First paragraph stripped, second remains. + assert "para one" not in new + assert "para two foo" in new + + def test_word_boundary_no_partial_match(self): + content = "we reconfigured the system here\n" + new, pre = _strip_stale_paragraph(content, "config") + assert pre is None # "config" inside "reconfigured" must NOT match + assert new == content + + def test_no_match_returns_unchanged(self): + content = "nothing relevant here\n" + new, pre = _strip_stale_paragraph(content, "absent.py") + assert pre is None + assert new == content + + +# =========================================================================== +# Dry-run: mutates nothing +# =========================================================================== + + +class TestDryRun: + def test_dry_run_no_file_or_db_mutation(self, svc, audit_base): + _store(svc, "orphan-one") + wiki_path = svc.get_wiki_path("global", None, "orphan-one") + assert wiki_path.exists() + + issues = [_make_issue(issue_type="orphan_page", key="orphan-one", description="orphan")] + report = _run(heal(issues, scope="global", scope_id=None, apply=False, svc=svc)) + + assert report.apply is False + assert report.dry_run_summary is not None + assert all(a.status == "planned" for a in report.actions) + # Nothing deleted. + assert wiki_path.exists() + assert _row(svc, "orphan-one") is not None + + def test_dry_run_emits_only_run_completed(self, svc, audit_base): + _store(svc, "orphan-one") + issues = [_make_issue(issue_type="orphan_page", key="orphan-one")] + _run(heal(issues, scope="global", scope_id=None, apply=False, svc=svc)) + log = _read_audit(audit_base) + assert "[heal_run_completed]" in log + assert "[orphan_pruned]" not in log + assert "apply=false" in log + + +# =========================================================================== +# orphan_page fix +# =========================================================================== + + +class TestOrphanFix: + def test_orphan_applied_deletes_everything(self, svc, audit_base): + wiki_path = _make_orphan(svc, "stale-orphan") + assert wiki_path.exists() + assert _row(svc, "stale-orphan") is None # true orphan — no SQLite row + + issues = [_make_issue(issue_type="orphan_page", key="stale-orphan")] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + + assert report.apply is True + assert len(report.actions) == 1 + assert report.actions[0].status == "applied" + assert report.actions[0].issue_type == "orphan_pruned" + assert not wiki_path.exists() + assert _row(svc, "stale-orphan") is None + + log = _read_audit(audit_base) + assert "[orphan_pruned]" in log + assert "key=stale-orphan" in log + + def test_orphan_with_live_row_is_refused(self, svc, audit_base): + # Cross-project collision safety: run_lint(scope="project") returns + # orphans across ALL containers, but LintIssue carries no scope_id, so + # the healer reconstructs the path in the CURRENT container. A key that + # is a LIVE memory here (SQLite row present) must NOT be deleted even + # though some other project flagged the same key as an orphan. + _store(svc, "shared-key") + wiki_path = svc.get_wiki_path("global", None, "shared-key") + assert wiki_path.exists() + assert _row(svc, "shared-key") is not None + + issues = [_make_issue(issue_type="orphan_page", key="shared-key")] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + + assert report.actions[0].status == "skipped" + assert "not orphaned" in report.actions[0].description + # Live memory untouched. + assert wiki_path.exists() + assert _row(svc, "shared-key") is not None + assert "[orphan_pruned]" not in _read_audit(audit_base) + + def test_orphan_missing_file_still_cleans(self, svc, audit_base): + # No file at all — healer should not crash, should emit audit. + issues = [_make_issue(issue_type="orphan_page", key="ghost")] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + assert report.actions[0].status == "applied" + assert "[orphan_pruned]" in _read_audit(audit_base) + + +# =========================================================================== +# contradiction fix (SQL-row authoritative) +# =========================================================================== + + +class TestContradictionFix: + def test_keeps_newer_forgets_older(self, svc, audit_base): + _store(svc, "older") + _store(svc, "newer") + # Force the timestamps so "newer" wins regardless of clock resolution. + with svc._get_db_session() as db: + old_row = db.query(MemoryMetadataModel).filter_by(key="older").first() + new_row = db.query(MemoryMetadataModel).filter_by(key="newer").first() + old_row.updated_at = datetime(2020, 1, 1, tzinfo=timezone.utc) + new_row.updated_at = datetime(2026, 1, 1, tzinfo=timezone.utc) + db.commit() + + older_path = svc.get_wiki_path("global", None, "older") + newer_path = svc.get_wiki_path("global", None, "newer") + + issues = [ + _make_issue( + issue_type="contradiction", + key="older", + related_key="newer", + description="they disagree", + ) + ] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + + assert report.actions[0].status == "applied" + # Loser (older) gone; winner (newer) kept. + assert not older_path.exists() + assert newer_path.exists() + assert _row(svc, "older") is None + assert _row(svc, "newer") is not None + + log = _read_audit(audit_base) + assert "[contradiction_resolved]" in log + assert "winner_key=newer" in log + assert "loser_key=older" in log + + def test_sql_authoritative_skip_when_row_missing(self, svc, audit_base): + # Only one of the two rows exists in DB → SKIP (trust DB, not payload). + _store(svc, "only-one") + issues = [_make_issue(issue_type="contradiction", key="only-one", related_key="phantom")] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + assert report.actions[0].status == "skipped" + # Nothing forgotten. + assert _row(svc, "only-one") is not None + + def test_same_second_tie_is_deterministic_keep_smaller_key(self, svc, audit_base): + # Equal updated_at → deterministic tiebreak keeps the lexicographically + # smaller key ("alpha"), regardless of which side is key_a/related_key. + _store(svc, "alpha") + _store(svc, "bravo") + same = datetime(2026, 1, 1, tzinfo=timezone.utc) + with svc._get_db_session() as db: + for k in ("alpha", "bravo"): + db.query(MemoryMetadataModel).filter_by(key=k).first().updated_at = same + db.commit() + + # Present the pair "loser-order" (bravo as key_a) to prove order doesn't decide. + issues = [_make_issue(issue_type="contradiction", key="bravo", related_key="alpha")] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + + assert report.actions[0].status == "applied" + assert _row(svc, "alpha") is not None # smaller key survives the tie + assert _row(svc, "bravo") is None + assert "winner_key=alpha" in _read_audit(audit_base) + + +# =========================================================================== +# stale_claim fix +# =========================================================================== + + +class TestStaleClaimFix: + def test_strips_paragraph_and_rewrites(self, svc, audit_base): + body = ( + "Intro paragraph that is fine.\n" + "\n" + "This refers to src/gone.py which no longer exists.\n" + "\n" + "Closing paragraph that is fine.\n" + ) + _store(svc, "article-x", content=body) + wiki_path = svc.get_wiki_path("global", None, "article-x") + + issues = [ + _make_issue( + issue_type="stale_claim", + key="article-x", + description="file not found: src/gone.py", + severity="error", + ) + ] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + + assert report.actions[0].status == "applied" + assert report.actions[0].pre_strip_paragraph is not None + assert "src/gone.py" in report.actions[0].pre_strip_paragraph + + new_content = wiki_path.read_text(encoding="utf-8") + assert "src/gone.py" not in new_content + assert "Intro paragraph" in new_content + assert "Closing paragraph" in new_content + + log = _read_audit(audit_base) + assert "[stale_claim_pruned]" in log + assert "stale_identifier=src/gone.py" in log + + def test_unparseable_description_skipped(self, svc, audit_base): + _store(svc, "article-y", content="some body\n") + issues = [ + _make_issue(issue_type="stale_claim", key="article-y", description="weird format") + ] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + assert report.actions[0].status == "skipped" + + def test_dry_run_does_not_count_unparseable_as_actionable(self, svc, audit_base): + # The "related_keys references missing key:" stale_claim sub-type has no + # paragraph to strip; the dry-run plan must show it skipped, not planned, + # and must not advertise it in the "Would apply N" count. + issues = [ + _make_issue( + issue_type="stale_claim", + key="article-q", + description="related_keys references missing key: gone-topic", + ) + ] + report = _run(heal(issues, scope="global", scope_id=None, apply=False, svc=svc)) + assert report.actions[0].status == "skipped" + assert "Would apply 0 of 1" in (report.dry_run_summary or "") + + def test_pre_strip_size_capped(self, svc, audit_base): + huge_para = "src/gone.py " + ("X" * (STALE_CLAIM_PRESTRIP_PARAGRAPH_MAX_BYTES + 500)) + body = f"intro\n\n{huge_para}\n\noutro\n" + _store(svc, "article-z", content=body) + issues = [ + _make_issue( + issue_type="stale_claim", + key="article-z", + description="file not found: src/gone.py", + ) + ] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + pre = report.actions[0].pre_strip_paragraph + assert pre is not None + assert pre.endswith("[…truncated]") + + +# =========================================================================== +# poison_frequency dual gate +# =========================================================================== + + +class TestPoisonGate: + def _seed_poison(self, svc): + _store(svc, "poisoned") + with svc._get_db_session() as db: + row = db.query(MemoryMetadataModel).filter_by(key="poisoned").first() + row.access_count = 999 + db.commit() + + def test_apply_without_aggressive_skips_poison(self, svc, audit_base): + self._seed_poison(svc) + issues = [_make_issue(issue_type="poison_frequency", key="poisoned")] + report = _run( + heal(issues, scope="global", scope_id=None, apply=True, aggressive=False, svc=svc) + ) + # Not even reported. + assert report.actions == [] + assert int(_row(svc, "poisoned").access_count) == 999 + assert "[poison_access_zeroed]" not in _read_audit(audit_base) + + def test_dry_run_aggressive_does_not_mutate(self, svc, audit_base): + self._seed_poison(svc) + issues = [_make_issue(issue_type="poison_frequency", key="poisoned")] + report = _run( + heal(issues, scope="global", scope_id=None, apply=False, aggressive=True, svc=svc) + ) + # apply=False → poison gated out entirely (dual gate needs apply too). + assert report.actions == [] + assert int(_row(svc, "poisoned").access_count) == 999 + + def test_apply_and_aggressive_zeroes(self, svc, audit_base): + self._seed_poison(svc) + issues = [_make_issue(issue_type="poison_frequency", key="poisoned")] + report = _run( + heal(issues, scope="global", scope_id=None, apply=True, aggressive=True, svc=svc) + ) + assert report.actions[0].status == "applied" + assert int(_row(svc, "poisoned").access_count) == 0 + log = _read_audit(audit_base) + assert "[poison_access_zeroed]" in log + assert "access_count_was=999" in log + + +# =========================================================================== +# Audit emitted only AFTER a successful commit (rollback drops the audit) +# =========================================================================== + + +class TestAuditAfterCommit: + def _force_commit_failure(self, svc, monkeypatch): + """Wrap _get_db_session so the returned session's commit() raises.""" + real_get = svc._get_db_session + + def _patched(): + db = real_get() + monkeypatch.setattr(db, "commit", _raise_commit) + return db + + def _raise_commit(): + raise RuntimeError("simulated commit failure") + + monkeypatch.setattr(svc, "_get_db_session", _patched) + + def test_poison_rollback_emits_no_audit_and_keeps_count(self, svc, audit_base, monkeypatch): + # poison_frequency's ONLY side effect is the in-session DB write. A + # commit failure must roll it back AND leave no false audit record. + _store(svc, "poisoned") + with svc._get_db_session() as db: + db.query(MemoryMetadataModel).filter_by(key="poisoned").first().access_count = 999 + db.commit() + + self._force_commit_failure(svc, monkeypatch) + issues = [_make_issue(issue_type="poison_frequency", key="poisoned")] + report = _run( + heal(issues, scope="global", scope_id=None, apply=True, aggressive=True, svc=svc) + ) + + assert report.actions[0].status == "error" + # Row reverted by rollback — count never zeroed. + assert int(_row(svc, "poisoned").access_count) == 999 + # And crucially: NO mutation audit recorded for the rolled-back write. + log = _read_audit(audit_base) + assert "[poison_access_zeroed]" not in log + + def test_orphan_rollback_emits_no_mutation_audit(self, svc, audit_base, monkeypatch): + _make_orphan(svc, "orphan-rb") + self._force_commit_failure(svc, monkeypatch) + issues = [_make_issue(issue_type="orphan_page", key="orphan-rb")] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + + assert report.actions[0].status == "error" + assert "[orphan_pruned]" not in _read_audit(audit_base) + + def test_successful_commit_still_emits_audit(self, svc, audit_base): + # Control: the happy path still emits the mutation audit (post-commit). + _make_orphan(svc, "orphan-ok") + issues = [_make_issue(issue_type="orphan_page", key="orphan-ok")] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + assert report.actions[0].status == "applied" + assert "[orphan_pruned]" in _read_audit(audit_base) + + +# =========================================================================== +# Skipped no-ops must not consume the cap budget +# =========================================================================== + + +class TestSkippedDoesNotConsumeCap: + def test_skipped_stale_claim_does_not_eat_cap(self, svc, audit_base, monkeypatch): + # Cap stale_claim at 1. An unparseable (skipped) issue ordered first must + # NOT consume that single slot — the real fix after it should still apply. + monkeypatch.setitem(wiki_healer.ISSUE_CAPS, "stale_claim", 1) + _store(svc, "real-article", content="intro\n\nrefers to src/gone.py here\n\noutro\n") + issues = [ + _make_issue(issue_type="stale_claim", key="bogus", description="weird format"), + _make_issue( + issue_type="stale_claim", + key="real-article", + description="file not found: src/gone.py", + ), + ] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + statuses = {a.key: a.status for a in report.actions} + assert statuses["bogus"] == "skipped" + assert statuses["real-article"] == "applied" # not crowded out by the skip + assert report.truncated_by_type.get("stale_claim", 0) == 0 + + def test_dry_run_skipped_does_not_eat_cap(self, svc, audit_base, monkeypatch): + monkeypatch.setitem(wiki_healer.ISSUE_CAPS, "stale_claim", 1) + _store(svc, "real-article", content="intro\n\nrefers to src/gone.py here\n\noutro\n") + issues = [ + _make_issue(issue_type="stale_claim", key="bogus", description="weird format"), + _make_issue( + issue_type="stale_claim", + key="real-article", + description="file not found: src/gone.py", + ), + ] + report = _run(heal(issues, scope="global", scope_id=None, apply=False, svc=svc)) + statuses = {a.key: a.status for a in report.actions} + assert statuses["bogus"] == "skipped" + assert statuses["real-article"] == "planned" + assert report.truncated_by_type.get("stale_claim", 0) == 0 + + +# =========================================================================== +# graph_density flag-only + lint_error ignored +# =========================================================================== + + +class TestFilterRules: + def test_graph_density_never_mutates(self, svc, audit_base): + _store(svc, "popular") + issues = [_make_issue(issue_type="graph_density", key="popular", description="hot")] + report = _run( + heal(issues, scope="global", scope_id=None, apply=True, aggressive=True, svc=svc) + ) + assert report.actions == [] + assert _row(svc, "popular") is not None + + def test_lint_error_rows_ignored(self, svc, audit_base): + issues = [ + _make_issue( + issue_type="lint_error", key="run_lint", description="lint_run_completed: 5/5" + ), + _make_issue(issue_type="lint_error", key="orphan_page", description="truncated"), + ] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + assert report.actions == [] + + +# =========================================================================== +# Caps / truncation reporting +# =========================================================================== + + +class TestCaps: + def test_per_type_cap_reported(self, svc, audit_base, monkeypatch): + # Shrink the orphan cap so we can exercise truncation cheaply. + monkeypatch.setitem(wiki_healer.ISSUE_CAPS, "orphan_page", 2) + for i in range(5): + _make_orphan(svc, f"orphan-{i}") + issues = [_make_issue(issue_type="orphan_page", key=f"orphan-{i}") for i in range(5)] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + assert len([a for a in report.actions if a.status == "applied"]) == 2 + assert report.truncated_by_type.get("orphan_page") == 3 + assert report.total_suppressed == 3 + log = _read_audit(audit_base) + assert "truncation_breakdown=orphan_page:3" in log + + def test_run_level_cap_reported(self, svc, audit_base, monkeypatch): + monkeypatch.setattr(wiki_healer, "MAX_HEAL_ACTIONS", 2) + for i in range(4): + _make_orphan(svc, f"orphan-{i}") + issues = [_make_issue(issue_type="orphan_page", key=f"orphan-{i}") for i in range(4)] + report = _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + assert len(report.actions) == 2 + assert report.truncated_run_level == 2 + assert report.total_suppressed == 2 + + +# =========================================================================== +# Concurrency lock +# =========================================================================== + + +class TestLock: + def test_conflict_when_lock_held(self, svc, audit_base): + lock_path = wiki_healer._heal_lock_path(svc, "global", None) + lock_path.parent.mkdir(parents=True, exist_ok=True) + fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT, 0o600) + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + try: + _store(svc, "orphan-one") + issues = [_make_issue(issue_type="orphan_page", key="orphan-one")] + with pytest.raises(HealConflictError): + _run(heal(issues, scope="global", scope_id=None, apply=True, svc=svc)) + finally: + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + + def test_no_lock_for_dry_run(self, svc, audit_base): + # Even with the lock held, a dry-run must not require it. + lock_path = wiki_healer._heal_lock_path(svc, "global", None) + lock_path.parent.mkdir(parents=True, exist_ok=True) + fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT, 0o600) + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + try: + _store(svc, "orphan-one") + issues = [_make_issue(issue_type="orphan_page", key="orphan-one")] + report = _run(heal(issues, scope="global", scope_id=None, apply=False, svc=svc)) + assert report.apply is False + assert report.actions[0].status == "planned" + finally: + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd)