diff --git a/src/cli_agent_orchestrator/cli/commands/memory.py b/src/cli_agent_orchestrator/cli/commands/memory.py index eaf6c603..4c7147bb 100644 --- a/src/cli_agent_orchestrator/cli/commands/memory.py +++ b/src/cli_agent_orchestrator/cli/commands/memory.py @@ -53,7 +53,7 @@ def memory(): "--scope", type=click.Choice([s.value for s in MemoryScope], case_sensitive=False), default=None, - help="Filter by scope (global, project, session, agent).", + help="Filter by scope (global, project, session, agent, federated).", ) @click.option( "--type", @@ -179,7 +179,7 @@ def delete(key, scope, yes): "--scope", type=click.Choice([s.value for s in MemoryScope], case_sensitive=False), required=True, - help="Scope to clear (required). One of: global, project, session, agent.", + help="Scope to clear (required). One of: global, project, session, agent, federated.", ) @click.option("--yes", "-y", is_flag=True, help="Skip confirmation prompt.") def clear(scope, yes): diff --git a/src/cli_agent_orchestrator/mcp_server/server.py b/src/cli_agent_orchestrator/mcp_server/server.py index cb130a04..e2c5e42f 100644 --- a/src/cli_agent_orchestrator/mcp_server/server.py +++ b/src/cli_agent_orchestrator/mcp_server/server.py @@ -1093,7 +1093,10 @@ async def memory_store( content: str = Field(description="Memory content to store (markdown supported)"), scope: str = Field( default="project", - description='Memory scope: "global", "project", "session", or "agent"', + description=( + 'Memory scope: "global", "project", "session", "agent", or ' + '"federated" (machine-wide shared tier; rejects credentials)' + ), ), memory_type: str = Field( default="project", @@ -1153,7 +1156,10 @@ async def memory_recall( ), scope: Optional[str] = Field( default=None, - description='Filter by scope: "global", "project", "session", "agent". Omit to search all.', + description=( + 'Filter by scope: "global", "project", "session", "agent", ' + '"federated". Omit to search all.' + ), ), memory_type: Optional[str] = Field( default=None, @@ -1237,7 +1243,10 @@ async def memory_forget( key: str = Field(description="Key of the memory to remove (e.g. 'prefer-pytest')"), scope: str = Field( default="project", - description='Scope of the memory to remove: "global", "project", "session", or "agent"', + description=( + 'Scope of the memory to remove: "global", "project", "session", ' + '"agent", or "federated"' + ), ), ) -> Dict[str, Any]: """Remove a memory by key and scope. diff --git a/src/cli_agent_orchestrator/models/memory.py b/src/cli_agent_orchestrator/models/memory.py index f9bba137..13799cd3 100644 --- a/src/cli_agent_orchestrator/models/memory.py +++ b/src/cli_agent_orchestrator/models/memory.py @@ -54,6 +54,7 @@ class MemoryScope(str, Enum): PROJECT = "project" SESSION = "session" AGENT = "agent" + FEDERATED = "federated" class MemoryType(str, Enum): @@ -71,7 +72,7 @@ class Memory(BaseModel): id: str = Field(..., description="Unique memory identifier") key: str = Field(..., description="Slug identifier, e.g. 'prefer-pytest'") memory_type: str = Field(..., description="One of: user, feedback, project, reference") - scope: str = Field(..., description="One of: global, project, session, agent") + scope: str = Field(..., description="One of: global, project, session, agent, federated") scope_id: Optional[str] = Field(None, description="Auto-resolved scope identifier") file_path: str = Field(..., description="Path to wiki topic file") tags: str = Field(default="", description="Comma-separated tags") diff --git a/src/cli_agent_orchestrator/services/cleanup_service.py b/src/cli_agent_orchestrator/services/cleanup_service.py index 1be98dd6..98ed6032 100644 --- a/src/cli_agent_orchestrator/services/cleanup_service.py +++ b/src/cli_agent_orchestrator/services/cleanup_service.py @@ -85,6 +85,7 @@ def cleanup_old_data(): "agent": None, "project": 90, "session": 14, + "federated": None, } PERMANENT_MEMORY_TYPES: frozenset[str] = frozenset({"user", "feedback"}) @@ -124,9 +125,10 @@ async def cleanup_expired_memories() -> None: continue # Extract scope_id from path: .../memory/{scope_id}/wiki/index.md - # "global" dir → scope_id=None, project hash dirs → scope_id=hash + # "global"/"federated" dirs → scope_id=None (flat, machine-wide), + # project hash dirs → scope_id=hash project_dir_name = index_path.parent.parent.name - scope_id = None if project_dir_name == "global" else project_dir_name + scope_id = None if project_dir_name in ("global", "federated") else project_dir_name for entry in expired_entries: try: @@ -190,7 +192,7 @@ def _find_expired_entries(index_path: Path, now: datetime) -> list[dict]: # Detect scope section headers: ## global, ## session, etc. if line.startswith("## "): section = line[3:].strip() - if section in ("global", "project", "session", "agent"): + if section in ("global", "project", "session", "agent", "federated"): current_scope = section continue diff --git a/src/cli_agent_orchestrator/services/memory_scoring.py b/src/cli_agent_orchestrator/services/memory_scoring.py index 7f68cfee..71fb56e6 100644 --- a/src/cli_agent_orchestrator/services/memory_scoring.py +++ b/src/cli_agent_orchestrator/services/memory_scoring.py @@ -41,16 +41,25 @@ "project": 1, "global": 2, "agent": 3, + "federated": 4, } # Cross-scope write authorisation table. Caller may write a target -# scope iff ``SCOPE_RANK[caller] >= SCOPE_RANK[target]``. ``agent`` and -# ``project`` share rank 1 — siblings; cross-sibling writes are rejected. +# scope iff ``caller == target`` OR ``SCOPE_RANK[caller] > SCOPE_RANK[target]`` +# (strict). ``agent`` and ``project`` share rank 1 — siblings; cross-sibling +# writes are rejected. +# +# ``federated`` is intentionally asymmetric: it has the LOWEST recall +# precedence (4 in SCOPE_PRECEDENCE — last on recall) yet write-rank 0, +# so it is writable by every caller except ``session`` (rank 0 can only +# write its own scope). This mirrors how ``session`` is write-rank 0 but +# has the HIGHEST recall precedence (0) — the two tables are independent. SCOPE_RANK: dict = { "session": 0, "project": 1, "agent": 1, "global": 2, + "federated": 0, } @@ -118,8 +127,12 @@ def scope_write_allowed(caller: str, target: str) -> bool: """Store-time scope guard. Returns True iff a caller running at ``caller`` scope is permitted to - write a memory at ``target`` scope. ``agent`` and ``project`` are - siblings at rank 1; cross-sibling writes are rejected. + write a memory at ``target`` scope. The rule is: ``caller == target`` OR + ``SCOPE_RANK[caller] > SCOPE_RANK[target]`` (strict). ``agent`` and + ``project`` are siblings at rank 1; cross-sibling writes are rejected. + Strictness matters for ``federated`` (rank 0): ``session`` (also rank 0) + cannot write it because ``0 > 0`` is False, while any higher-rank caller + can — i.e. writable by every caller except session. Unknown scopes (not in ``SCOPE_RANK``) deny by default — fail closed. """ diff --git a/src/cli_agent_orchestrator/services/memory_service.py b/src/cli_agent_orchestrator/services/memory_service.py index d4000b8f..94ee1603 100644 --- a/src/cli_agent_orchestrator/services/memory_service.py +++ b/src/cli_agent_orchestrator/services/memory_service.py @@ -396,6 +396,7 @@ def resolve_caller_scope(terminal_context: Optional[dict]) -> str: "project", "agent", "global", + "federated", }: return explicit return "global" @@ -415,6 +416,11 @@ def resolve_scope_id( if scope == MemoryScope.GLOBAL.value: return None + # ``federated`` is a single machine-wide tier with no per-id + # isolation — like ``global``, its scope_id is always None. + if scope == MemoryScope.FEDERATED.value: + return None + ctx = terminal_context or {} if scope == MemoryScope.PROJECT.value: @@ -538,6 +544,10 @@ def _get_project_dir(self, scope: str, scope_id: Optional[str]) -> Path: # container directory. if scope == MemoryScope.PROJECT.value and scope_id: return self.base_dir / scope_id + # ``federated`` is a machine-wide shared tier living in its own + # top-level container, a sibling of ``global``. + if scope == MemoryScope.FEDERATED.value: + return self.base_dir / "federated" return self.base_dir / "global" def get_wiki_path(self, scope: str, scope_id: Optional[str], key: str) -> Path: @@ -598,6 +608,18 @@ async def store( MemoryScope(scope) MemoryType(memory_type) + # Federated writes are credential-gated. The machine-wide shared + # tier rejects content matching common secret patterns. The log + # line carries the pattern NAME only — never content bytes. + if scope == MemoryScope.FEDERATED.value: + from cli_agent_orchestrator.services.secret_gate import scan_for_secrets + + hit = scan_for_secrets(content) + if hit: + # Do not log detector output; emit only a constant event marker. + logger.warning("federated_secret_rejected") + raise ValueError(f"federated write rejected: matched credential pattern {hit!r}") + # Store-time cross-scope write guard. A caller may # only write a scope it is authorised for (SCOPE_RANK). The caller # scope defaults to "global" (operator) unless terminal_context sets @@ -1897,6 +1919,7 @@ async def _metadata_recall( MemoryScope.PROJECT.value: 1, MemoryScope.GLOBAL.value: 2, MemoryScope.AGENT.value: 3, + MemoryScope.FEDERATED.value: 4, } results.sort(key=lambda m: (precedence.get(m.scope, 99), -m.updated_at.timestamp())) @@ -2134,11 +2157,24 @@ def _get_search_dirs( if global_dir.exists(): dirs.append(global_dir) + # Include the machine-wide federated tier when present. The + # ``.exists()`` guard preserves the byte-identical search-dir + # invariant: with no federated memories on disk, the dir list is + # unchanged from pre-federation behaviour. + federated_dir = self.base_dir / "federated" + if federated_dir.exists() and federated_dir not in dirs: + dirs.append(federated_dir) + if scan_all: # Enumerate all project-hash dirs (for CLI use where user owns the filesystem) if self.base_dir.exists(): for child in sorted(self.base_dir.iterdir()): - if child.is_dir() and child.name != "global" and child not in dirs: + if ( + child.is_dir() + and child.name != "global" + and child.name != "federated" + and child not in dirs + ): dirs.append(child) elif terminal_context: # Include the specific project dir for this terminal's cwd diff --git a/src/cli_agent_orchestrator/services/secret_gate.py b/src/cli_agent_orchestrator/services/secret_gate.py new file mode 100644 index 00000000..db1cfdb1 --- /dev/null +++ b/src/cli_agent_orchestrator/services/secret_gate.py @@ -0,0 +1,55 @@ +"""Credential pattern gate for federated memory writes. + +Pure module — no I/O, no logging, no state. ``scan_for_secrets`` matches +the supplied content against a fixed, ordered list of named regexes and +returns the NAME of the first matching pattern (or ``None`` if clean). + +Used ONLY to reject credentials on ``scope="federated"`` writes — the +machine-wide shared tier. This is a heuristic deny-list, not entropy +scoring; it errs toward catching common credential shapes. +""" + +import re +from typing import List, Optional, Pattern, Tuple + +# Ordered (name, compiled_regex) pairs. First match wins, so ordering is +# stable and reproducible across calls. No entropy scoring. +_SECRET_PATTERNS: List[Tuple[str, Pattern[str]]] = [ + # AWS access key IDs — long-lived (AKIA) and temporary/STS (ASIA). + ("aws_access_key", re.compile(r"(?:AKIA|ASIA)[0-9A-Z]{16}")), + # PEM-encoded private keys (RSA / EC / OpenSSH / generic). + ( + "pem_private_key", + re.compile(r"-----BEGIN (?:RSA |EC |OPENSSH |)?PRIVATE KEY-----"), + ), + # Bearer / api-key / token assignments with a long value. The separator + # may be ':'/'=' OR whitespace, so the canonical HTTP header form + # 'Authorization: Bearer ' (Bearer followed by a space) is caught. + ( + "bearer_token", + re.compile(r"(?i)(?:bearer|api[_-]?key|token)[\s:=]+\S{16,}"), + ), + # Generic secret/password assignments. + ( + "secret_assignment", + re.compile(r"(?i)(?:password|passwd|secret|pwd)\s*[:=]\s*\S{6,}"), + ), + # GitHub personal access tokens (ghp_ / ghs_ ...). + ("github_pat", re.compile(r"gh[ps]_[A-Za-z0-9]{36,}")), + # GitLab personal access tokens. + ("gitlab_pat", re.compile(r"glpat-[A-Za-z0-9_-]{20,}")), +] + + +def scan_for_secrets(content: str) -> Optional[str]: + """Return the NAME of the first credential pattern that matches. + + Returns ``None`` when no pattern matches. The caller must not echo the + matched bytes — only the returned pattern name is safe to log. + """ + if not content: + return None + for name, pattern in _SECRET_PATTERNS: + if pattern.search(content): + return name + return None diff --git a/src/cli_agent_orchestrator/skills/cao-memory/SKILL.md b/src/cli_agent_orchestrator/skills/cao-memory/SKILL.md index 2a4327d0..7c62bd5a 100644 --- a/src/cli_agent_orchestrator/skills/cao-memory/SKILL.md +++ b/src/cli_agent_orchestrator/skills/cao-memory/SKILL.md @@ -24,6 +24,7 @@ Every memory has a **scope** (where it applies) and a **type** (what kind of fac |-------|-----------|---------| | `project` (default) | This repo / working directory | Conventions, architecture, build rules | | `global` | Every project | User identity, durable cross-project preferences | +| `federated` | Every project on this machine | Reusable, repo-independent lessons worth sharing across all your work (rejects credentials) | | `session` | This run only | Short-lived task context | | `agent` | This agent role | Role-specific working notes | @@ -39,7 +40,7 @@ already told you, search memory first. memory_recall(query="database widgets endpoint testing") ``` -Omit `scope` to search all scopes (results follow precedence session → project → global → agent). +Omit `scope` to search all scopes (results follow precedence session → project → global → agent → federated). Filter with `scope=` or `memory_type=` when you know where to look. Recall is for searching *beyond* what was auto-injected (see below) — don't re-recall what's already in front of you. @@ -66,6 +67,29 @@ memory_store( Same `key` + `scope` upserts (updates in place) rather than duplicating. +### Share across all your projects — `scope="federated"` + +When a lesson is durable **and not specific to this repo** — a reusable library gotcha, a +debugging trick, a tooling preference that holds everywhere — store it with +`scope="federated"` so it follows you into every project on this machine, not just this one. + +``` +memory_store( + content="tmux paste-buffer needs `-p` or multi-line input loses bracketed-paste framing.", + scope="federated", + memory_type="reference", +) +``` + +Federated memories sit at the **lowest recall precedence** — a project-local fact with the +same key always wins — so federating is safe: it only adds a fallback, never overrides what's +true here. To un-share, `memory_forget(key=..., scope="federated")`. + +- **Never federate secrets.** Tokens, keys, and passwords are **rejected automatically** on a + federated write — and they'd be exposed to every project anyway. Keep credentials out of + memory entirely. +- **When in doubt, use `project`.** Federate only what you're confident is reusable everywhere. + ## Forget — remove what's wrong or superseded ``` diff --git a/test/services/test_memory_service.py b/test/services/test_memory_service.py index fd336fc7..774a8078 100644 --- a/test/services/test_memory_service.py +++ b/test/services/test_memory_service.py @@ -241,15 +241,27 @@ def test_recall_scope_precedence(self, tmp_path: Path): terminal_context=ctx, ) ) + _run( + svc.store( + content="federated fact", + scope="federated", + memory_type="project", + key="fact-federated", + terminal_context=ctx, + ) + ) results = _run(svc.recall(query="fact", terminal_context=ctx)) - # Session should come before project, project before global + # Session should come before project, project before global, + # global before federated (federated has lowest precedence). scopes = [m.scope for m in results] if "session" in scopes and "project" in scopes: assert scopes.index("session") < scopes.index("project") if "project" in scopes and "global" in scopes: assert scopes.index("project") < scopes.index("global") + if "global" in scopes and "federated" in scopes: + assert scopes.index("global") < scopes.index("federated") class TestRecallQueryMatching: @@ -1166,3 +1178,225 @@ async def test_recall_isolates_by_session_scope_id(self, tmp_path): # CLI scan_all path still sees both. results_all = await svc.recall(scope="session", terminal_context=None, scan_all=True) assert {m.key for m in results_all} == {"a-key", "b-key"} + + +# =========================================================================== +# FEDERATED scope (issue #313) — machine-wide shared tier +# =========================================================================== + + +def _federated_engine(tmp_path: Path): + """Build an isolated SQLite engine so DB-row assertions are deterministic.""" + from sqlalchemy import create_engine + + from cli_agent_orchestrator.clients.database import Base + + engine = create_engine( + f"sqlite:///{tmp_path / 'fed.db'}", + connect_args={"check_same_thread": False}, + ) + Base.metadata.create_all(bind=engine) + return engine + + +def _fed_row(engine, key: str): + from sqlalchemy.orm import sessionmaker + + from cli_agent_orchestrator.clients.database import MemoryMetadataModel + + Session = sessionmaker(bind=engine) + with Session() as db: + return ( + db.query(MemoryMetadataModel) + .filter_by(key=key, scope="federated", scope_id=None) + .first() + ) + + +class TestFederatedScope: + """Federated tier: store/recall roundtrip, precedence, layout, forget, + scope_id resolution, search-dir invariant, and the secret gate. + """ + + def test_federated_store_recall_roundtrip(self, tmp_path: Path): + svc = MemoryService(base_dir=tmp_path) + ctx = _make_terminal_context() + _run( + svc.store( + content="federated shared note about widgets", + scope="federated", + memory_type="reference", + key="fed-note", + terminal_context=ctx, + ) + ) + results = _run(svc.recall(query="widgets", terminal_context=ctx)) + assert any(m.key == "fed-note" and m.scope == "federated" for m in results) + + def test_federated_recall_ranks_last(self, tmp_path: Path): + """With one memory per scope sharing the query term, federated ranks + after global (lowest precedence). + """ + svc = MemoryService(base_dir=tmp_path) + ctx = _make_terminal_context() + for scope, key in ( + ("session", "rank-session"), + ("project", "rank-project"), + ("global", "rank-global"), + ("federated", "rank-federated"), + ): + _run( + svc.store( + content="shared rankterm content", + scope=scope, + memory_type="reference", + key=key, + terminal_context=ctx, + ) + ) + results = _run(svc.recall(query="rankterm", terminal_context=ctx)) + scopes = [m.scope for m in results] + assert "global" in scopes and "federated" in scopes + assert scopes.index("global") < scopes.index("federated") + + def test_federated_file_location(self, tmp_path: Path): + """Wiki file lands at base/federated/wiki/federated/{key}.md.""" + svc = MemoryService(base_dir=tmp_path) + ctx = _make_terminal_context() + mem = _run( + svc.store( + content="federated layout content", + scope="federated", + memory_type="reference", + key="fed-layout", + terminal_context=ctx, + ) + ) + expected = tmp_path / "federated" / "wiki" / "federated" / "fed-layout.md" + assert expected.exists() + assert Path(mem.file_path).resolve() == expected.resolve() + + def test_federated_forget_removes_file_index_and_row(self, tmp_path: Path): + engine = _federated_engine(tmp_path) + svc = MemoryService(base_dir=tmp_path, db_engine=engine) + ctx = _make_terminal_context() + _run( + svc.store( + content="federated forgettable content", + scope="federated", + memory_type="reference", + key="fed-forget", + terminal_context=ctx, + ) + ) + wiki = tmp_path / "federated" / "wiki" / "federated" / "fed-forget.md" + assert wiki.exists() + assert _fed_row(engine, "fed-forget") is not None + index_path = svc.get_index_path("federated", None) + assert "[fed-forget]" in index_path.read_text(encoding="utf-8") + + ok = _run(svc.forget(key="fed-forget", scope="federated", terminal_context=ctx)) + assert ok is True + assert not wiki.exists() + assert _fed_row(engine, "fed-forget") is None + assert "[fed-forget]" not in index_path.read_text(encoding="utf-8") + + def test_federated_resolve_scope_id_is_none(self, tmp_path: Path): + svc = MemoryService(base_dir=tmp_path) + ctx = _make_terminal_context() + assert svc.resolve_scope_id("federated", ctx) is None + + def test_empty_federated_search_dirs_byte_identical(self, tmp_path: Path): + """When no federated dir exists, the search-dir list is unchanged from + pre-federation behaviour; an empty federated dir is only appended once + it actually exists (guards the recency invariant). + """ + svc = MemoryService(base_dir=tmp_path) + ctx = _make_terminal_context() + # Seed a global memory so the global dir exists. + _run( + svc.store( + content="global seed", + scope="global", + memory_type="reference", + key="seed", + terminal_context=ctx, + ) + ) + fed_dir = tmp_path / "federated" + assert not fed_dir.exists() + dirs_absent = svc._get_search_dirs(None, ctx) + assert fed_dir not in dirs_absent + + # Creating an empty federated dir adds it; removing it reverts. + fed_dir.mkdir(parents=True) + dirs_present = svc._get_search_dirs(None, ctx) + assert fed_dir in dirs_present + + fed_dir.rmdir() + dirs_again = svc._get_search_dirs(None, ctx) + assert dirs_again == dirs_absent + + def test_scan_all_does_not_list_federated_twice(self, tmp_path: Path): + svc = MemoryService(base_dir=tmp_path) + ctx = _make_terminal_context() + _run( + svc.store( + content="federated scanall content", + scope="federated", + memory_type="reference", + key="fed-scanall", + terminal_context=ctx, + ) + ) + dirs = svc._get_search_dirs(None, ctx, scan_all=True) + fed_dir = (tmp_path / "federated").resolve() + matches = [d for d in dirs if d.resolve() == fed_dir] + assert len(matches) == 1 + + def test_federated_secret_rejected_nothing_written(self, tmp_path: Path, caplog): + """An AKIA key on a federated write raises ValueError and writes + nothing (no wiki file, no db row); the error/log carries no secrets. + """ + engine = _federated_engine(tmp_path) + svc = MemoryService(base_dir=tmp_path, db_engine=engine) + ctx = _make_terminal_context() + secret = "deploy creds AKIAIOSFODNN7EXAMPLE here" + with caplog.at_level("WARNING", logger="cli_agent_orchestrator.services.memory_service"): + with pytest.raises(ValueError) as exc_info: + _run( + svc.store( + content=secret, + scope="federated", + memory_type="reference", + key="fed-secret", + terminal_context=ctx, + ) + ) + # No secret bytes in the raised message. + assert "AKIAIOSFODNN7EXAMPLE" not in str(exc_info.value) + # No secret bytes in any log record. + for rec in caplog.records: + assert "AKIAIOSFODNN7EXAMPLE" not in rec.getMessage() + # Nothing persisted. + wiki = tmp_path / "federated" / "wiki" / "federated" / "fed-secret.md" + assert not wiki.exists() + assert _fed_row(engine, "fed-secret") is None + + def test_same_secret_content_allowed_at_global_scope(self, tmp_path: Path): + """The gate is federated-only: identical AKIA-bearing content stores + fine at scope=global. + """ + svc = MemoryService(base_dir=tmp_path) + ctx = _make_terminal_context() + secret = "deploy creds AKIAIOSFODNN7EXAMPLE here" + mem = _run( + svc.store( + content=secret, + scope="global", + memory_type="reference", + key="glob-secret-ok", + terminal_context=ctx, + ) + ) + assert Path(mem.file_path).exists() diff --git a/test/services/test_scoring.py b/test/services/test_scoring.py index b9352de5..beeaad8c 100644 --- a/test/services/test_scoring.py +++ b/test/services/test_scoring.py @@ -324,6 +324,25 @@ def test_scope_write_allowed_table(self): for s in ("session", "project", "agent", "global"): assert scope_write_allowed(s, s) is True + def test_scope_write_allowed_federated_table(self): + """Federated is write-rank 0 but writable by every caller except + session: any caller with rank > 0 (project/agent/global) may write it, + federated may write itself, but session (rank 0, names differ) cannot, + and federated cannot write the higher-ranked global tier. + """ + assert scope_write_allowed("global", "federated") is True + assert scope_write_allowed("project", "federated") is True + assert scope_write_allowed("agent", "federated") is True + assert scope_write_allowed("session", "federated") is False + assert scope_write_allowed("federated", "federated") is True + assert scope_write_allowed("federated", "global") is False + + def test_sort_by_recency_invariant_still_present(self): + """Confirm the load-bearing byte-identical recency test still exists + (do NOT modify it). + """ + assert hasattr(TestU73PlanCases, "test_sort_by_recency_reproduces_phase1_order") + # =========================================================================== # T2 — Mass-recall score gaming / increment rate-limit diff --git a/test/services/test_secret_gate.py b/test/services/test_secret_gate.py new file mode 100644 index 00000000..90144d6d --- /dev/null +++ b/test/services/test_secret_gate.py @@ -0,0 +1,68 @@ +"""Tests for the federated-write credential gate (``scan_for_secrets``). + +The gate is a pure deny-list heuristic: it returns the NAME of the first +matching credential pattern, or ``None`` when the content looks clean. It is +used ONLY on ``scope="federated"`` writes. +""" + +import pytest + +from cli_agent_orchestrator.services.secret_gate import scan_for_secrets + +# --------------------------------------------------------------------------- +# Positive cases — each must return a non-None pattern name. +# --------------------------------------------------------------------------- + +_POSITIVE = [ + ("aws_access_key", "creds: AKIAIOSFODNN7EXAMPLE in config"), + ( + "pem_private_key", + "-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEA\n-----END RSA PRIVATE KEY-----", + ), + ("secret_assignment", "password=hunter2longenough"), + # Canonical HTTP header form: 'Authorization: Bearer '. + # The separator after the keyword may be whitespace, ':' or '='. + ("bearer_token", "Authorization: Bearer abcdef0123456789ABCDEF"), + ("github_pat", "ghp_" + "a" * 36), + ("gitlab_pat", "glpat-" + "x" * 20), +] + + +@pytest.mark.parametrize("label,content", _POSITIVE, ids=[p[0] for p in _POSITIVE]) +def test_scan_for_secrets_positive(label, content): + """Credential-shaped content returns a non-None pattern name.""" + result = scan_for_secrets(content) + assert result is not None + assert isinstance(result, str) + + +# --------------------------------------------------------------------------- +# Negative cases — each must return None. +# --------------------------------------------------------------------------- + +_NEGATIVE = [ + ("plain_prose", "This is a normal note about how pytest fixtures work."), + ("bare_uuid", "session id 550e8400-e29b-41d4-a716-446655440000"), + ("short_token", "token=abc"), + ("git_sha", "fixed in commit a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0"), + ( + "normal_markdown", + "# Title\n\n- bullet one\n- bullet two\n\nSome **bold** text and a [link](http://x).", + ), +] + + +@pytest.mark.parametrize("label,content", _NEGATIVE, ids=[n[0] for n in _NEGATIVE]) +def test_scan_for_secrets_negative(label, content): + """Benign content returns None.""" + assert scan_for_secrets(content) is None + + +def test_scan_for_secrets_empty(): + """Empty content is clean.""" + assert scan_for_secrets("") is None + + +def test_bearer_space_form_is_caught(): + """The canonical space-separated Bearer header is caught by the gate.""" + assert scan_for_secrets("Authorization: Bearer abcdef0123456789ABCDEF") == "bearer_token"