From 63b654d6f596ba185d4e1ae3f77c1ad5ebcb9b48 Mon Sep 17 00:00:00 2001 From: Lei Huang Date: Thu, 7 May 2026 23:33:49 +0000 Subject: [PATCH 1/8] feat(claude-code): auto-discover plugin marketplace agents MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Agents enabled via Claude Code plugin marketplaces (~/.claude/settings.json → extraKnownMarketplaces) are now discoverable by CAO without requiring a manual agent_dirs.claude_code entry in ~/.aws/cli-agent-orchestrator/settings.json. Changes: - utils/agent_profiles.py: add _discover_claude_plugin_agent_dirs() that walks enabled marketplaces, validates marketplace.json, and collects each plugin's agents/ directory. Integrated into list_agent_profiles() and _read_agent_profile_source() scan order. - services/settings_service.py: change default agent_dirs.claude_code from ~/.aws/cli-agent-orchestrator/agent-store to ~/.claude/agents/. Users with a saved value are unaffected (saved-over-default merge semantics). - docs/settings.md: document the new discovery behavior. - test/utils/test_claude_plugin_discovery.py: unit tests covering happy path, empty enabledPlugins, orphan plugin entries, file-vs-dir source validation, cross-marketplace name collision, symlink escape, and _read_agent_profile_source() plugin integration. - CHANGELOG.md: Unreleased entry noting discovery + default path change. --- CHANGELOG.md | 6 + docs/settings.md | 37 ++ .../services/settings_service.py | 2 +- .../utils/agent_profiles.py | 67 ++- test/utils/test_claude_plugin_discovery.py | 475 ++++++++++++++++++ 5 files changed, 585 insertions(+), 2 deletions(-) create mode 100644 test/utils/test_claude_plugin_discovery.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4999e68d5..97d67277d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Auto-discover agents from enabled Claude Code plugin marketplaces (`~/.claude/settings.json → extraKnownMarketplaces`). Default `agent_dirs.claude_code` path changed from `~/.aws/cli-agent-orchestrator/agent-store` to `~/.claude/agents/`; users with a saved `agent_dirs.claude_code` are unaffected. + ## [2.1.1] - 2026-04-28 ### Added diff --git a/docs/settings.md b/docs/settings.md index 360ee6ae3..b8d24ba62 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -74,6 +74,43 @@ Add additional directories that are scanned for agent profiles across all provid } ``` +## Claude Code Plugin Marketplace Auto-Discovery + +CAO automatically discovers agent profiles from Claude Code plugin marketplaces. When plugins are installed via AIM (or any tool that registers marketplaces in Claude Code's settings), their agents appear in CAO without manual configuration. + +### How It Works + +On each profile scan, CAO reads `~/.claude/settings.json` and: + +1. Iterates `extraKnownMarketplaces` entries with `source.source == "directory"`. +2. Reads each marketplace's `.claude-plugin/marketplace.json` for the plugin list. +3. For each plugin, checks `enabledPlugins["@"]` — only enabled plugins are scanned. +4. If the plugin has an `agents/` subdirectory, its `.md` profiles are included. + +Discovered profiles appear with `source: "claude_plugin"` in `GET /agents/profiles`. + +### Precedence + +Plugin agents are scanned **after** the local store and provider directories but **before** extra custom directories. If a local agent has the same name as a plugin agent, the local one wins (first-match dedup). + +### Disabling a Plugin's CAO Visibility + +Toggle the plugin off in `~/.claude/settings.json`: + +```json +{ + "enabledPlugins": { + "MyPlugin@aim": false + } +} +``` + +CAO will stop listing that plugin's agents on the next request. + +### Security + +Plugin paths are validated to stay within their marketplace root directory. Any plugin whose resolved path escapes the marketplace root is skipped with a warning. + ## API Endpoints | Method | Endpoint | Description | diff --git a/src/cli_agent_orchestrator/services/settings_service.py b/src/cli_agent_orchestrator/services/settings_service.py index 61c01a3e6..3fabc4be0 100644 --- a/src/cli_agent_orchestrator/services/settings_service.py +++ b/src/cli_agent_orchestrator/services/settings_service.py @@ -15,7 +15,7 @@ _DEFAULTS = { "kiro_cli": str(Path.home() / ".kiro" / "agents"), "q_cli": str(Path.home() / ".aws" / "amazonq" / "cli-agents"), - "claude_code": str(Path.home() / ".aws" / "cli-agent-orchestrator" / "agent-store"), + "claude_code": str(Path.home() / ".claude" / "agents"), "codex": str(Path.home() / ".aws" / "cli-agent-orchestrator" / "agent-store"), "cao_installed": str(Path.home() / ".aws" / "cli-agent-orchestrator" / "agent-context"), } diff --git a/src/cli_agent_orchestrator/utils/agent_profiles.py b/src/cli_agent_orchestrator/utils/agent_profiles.py index 834a1aae1..2eae1f94a 100644 --- a/src/cli_agent_orchestrator/utils/agent_profiles.py +++ b/src/cli_agent_orchestrator/utils/agent_profiles.py @@ -1,5 +1,6 @@ """Agent profile utilities.""" +import json import logging from importlib import resources from pathlib import Path @@ -81,6 +82,61 @@ def _scan_directory(directory: Path, source_label: str, profiles: Dict[str, Dict } +def _discover_claude_plugin_agent_dirs() -> List[Path]: + """Walk Claude Code marketplaces and return agent dirs from enabled plugins.""" + settings_path = Path.home() / ".claude" / "settings.json" + try: + data = json.loads(settings_path.read_text()) + except (json.JSONDecodeError, OSError): + if settings_path.exists(): + logger.warning("Failed to read %s", settings_path) + return [] + + marketplaces = data.get("extraKnownMarketplaces", {}) + enabled_plugins = data.get("enabledPlugins", {}) + if not marketplaces: + return [] + + agent_dirs: List[Path] = [] + for mkt_name, mkt_config in marketplaces.items(): + source = mkt_config.get("source", {}) + if source.get("source") != "directory": + continue + mkt_path = Path(source.get("path", "")) + if not mkt_path.is_dir(): + continue + + marketplace_json = mkt_path / ".claude-plugin" / "marketplace.json" + try: + mkt_data = json.loads(marketplace_json.read_text()) + except (json.JSONDecodeError, OSError): + logger.warning("Failed to read %s", marketplace_json) + continue + + resolved_mkt = mkt_path.resolve() + for plugin in mkt_data.get("plugins", []): + plugin_name = plugin.get("name", "") + if not enabled_plugins.get(f"{plugin_name}@{mkt_name}", False): + continue + plugin_source = plugin.get("source", "") + plugin_dir = (mkt_path / plugin_source).resolve() + # Path containment check + try: + plugin_dir.relative_to(resolved_mkt) + except ValueError: + logger.warning( + "Plugin path %s escapes marketplace root %s — skipping", + plugin_dir, + resolved_mkt, + ) + continue + agents_dir = plugin_dir / "agents" + if agents_dir.is_dir(): + agent_dirs.append(agents_dir) + + return agent_dirs + + def list_agent_profiles() -> List[Dict]: """Discover all available agent profiles from all configured directories. @@ -137,7 +193,11 @@ def list_agent_profiles() -> List[Dict]: continue _scan_directory(path, label, profiles) - # 4. Extra user-added directories + # 4. Claude Code plugin marketplace directories + for plugin_agents_dir in _discover_claude_plugin_agent_dirs(): + _scan_directory(plugin_agents_dir, "claude_plugin", profiles) + + # 5. Extra user-added directories for extra_dir in get_extra_agent_dirs(): _scan_directory(Path(extra_dir), "custom", profiles) @@ -202,6 +262,11 @@ def _lookup_in_directory(directory: Path) -> str | None: if found is not None: return found + for plugin_agents_dir in _discover_claude_plugin_agent_dirs(): + found = _lookup_in_directory(plugin_agents_dir) + if found is not None: + return found + for extra_dir in get_extra_agent_dirs(): found = _lookup_in_directory(Path(extra_dir)) if found is not None: diff --git a/test/utils/test_claude_plugin_discovery.py b/test/utils/test_claude_plugin_discovery.py new file mode 100644 index 000000000..ebfb8badb --- /dev/null +++ b/test/utils/test_claude_plugin_discovery.py @@ -0,0 +1,475 @@ +"""Tests for Claude Code plugin marketplace auto-discovery.""" + +import json +from pathlib import Path +from unittest.mock import patch + +import pytest + +from cli_agent_orchestrator.utils.agent_profiles import ( + _discover_claude_plugin_agent_dirs, + list_agent_profiles, +) + + +def _setup_marketplace(home: Path, plugins: list, enabled: dict, mkt_name="aim", + mkt_source_type="directory", extra_settings=None): + """Helper to create a fake marketplace structure under a fake home.""" + claude_dir = home / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = home / ".aim" / "cc-plugins" + mkt_path.mkdir(parents=True) + + settings = {"extraKnownMarketplaces": {mkt_name: {"source": { + "path": str(mkt_path), "source": mkt_source_type + }}}, "enabledPlugins": enabled} + if extra_settings: + settings.update(extra_settings) + (claude_dir / "settings.json").write_text(json.dumps(settings)) + + # Create marketplace.json + mkt_meta_dir = mkt_path / ".claude-plugin" + mkt_meta_dir.mkdir(parents=True) + mkt_json_plugins = [] + for p in plugins: + name = p["name"] + source = p.get("source", f"./{name}") + mkt_json_plugins.append({"name": name, "source": source, "version": "1.0"}) + # Create plugin agents dir if requested + if p.get("has_agents", False): + agents_dir = mkt_path / source.lstrip("./") / "agents" + agents_dir.mkdir(parents=True) + (agents_dir / f"{name}-agent.md").write_text( + f"---\nname: {name}-agent\ndescription: Agent from {name}\n---\nPrompt" + ) + (mkt_meta_dir / "marketplace.json").write_text(json.dumps({ + "name": mkt_name, "plugins": mkt_json_plugins + })) + return mkt_path + + +class TestDiscoverClaudePluginAgentDirs: + """Unit tests for _discover_claude_plugin_agent_dirs.""" + + def test_settings_json_missing(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_no_extra_known_marketplaces(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + (claude_dir / "settings.json").write_text(json.dumps({"someOtherKey": True})) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_malformed_settings_json(self, tmp_path, monkeypatch, caplog): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + (claude_dir / "settings.json").write_text("not valid json{{{") + import logging + with caplog.at_level(logging.WARNING): + result = _discover_claude_plugin_agent_dirs() + assert result == [] + assert "Failed to read" in caplog.text + + def test_non_directory_source_skipped(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], + {"p1@aim": True}, mkt_source_type="git") + assert _discover_claude_plugin_agent_dirs() == [] + + def test_marketplace_path_not_exists(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + (claude_dir / "settings.json").write_text(json.dumps({ + "extraKnownMarketplaces": {"aim": {"source": { + "path": "/nonexistent/path", "source": "directory" + }}}, + "enabledPlugins": {} + })) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_marketplace_json_missing(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / ".aim" / "cc-plugins" + mkt_path.mkdir(parents=True) + (claude_dir / "settings.json").write_text(json.dumps({ + "extraKnownMarketplaces": {"aim": {"source": { + "path": str(mkt_path), "source": "directory" + }}}, + "enabledPlugins": {"p1@aim": True} + })) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_plugin_not_enabled_skipped(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], + {"p1@aim": False}) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_plugin_enabled_agents_dir_exists(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + mkt_path = _setup_marketplace( + tmp_path, [{"name": "myplugin", "has_agents": True}], + {"myplugin@aim": True} + ) + result = _discover_claude_plugin_agent_dirs() + assert len(result) == 1 + assert result[0] == (mkt_path / "myplugin" / "agents").resolve() + + def test_plugin_enabled_agents_dir_missing(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + # Plugin exists but no agents/ subdir + _setup_marketplace(tmp_path, [{"name": "noagents", "has_agents": False}], + {"noagents@aim": True}) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_multiple_marketplaces_multiple_plugins(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + + # Marketplace 1 + mkt1 = tmp_path / "mkt1" + mkt1.mkdir() + (mkt1 / ".claude-plugin").mkdir() + (mkt1 / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ + "name": "m1", "plugins": [ + {"name": "a", "source": "./a", "version": "1"}, + {"name": "b", "source": "./b", "version": "1"}, + ] + })) + (mkt1 / "a" / "agents").mkdir(parents=True) + (mkt1 / "a" / "agents" / "a-agent.md").write_text("---\nname: a-agent\n---\n") + (mkt1 / "b" / "agents").mkdir(parents=True) + (mkt1 / "b" / "agents" / "b-agent.md").write_text("---\nname: b-agent\n---\n") + + # Marketplace 2 + mkt2 = tmp_path / "mkt2" + mkt2.mkdir() + (mkt2 / ".claude-plugin").mkdir() + (mkt2 / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ + "name": "m2", "plugins": [ + {"name": "c", "source": "./c", "version": "1"}, + ] + })) + (mkt2 / "c" / "agents").mkdir(parents=True) + (mkt2 / "c" / "agents" / "c-agent.md").write_text("---\nname: c-agent\n---\n") + + (claude_dir / "settings.json").write_text(json.dumps({ + "extraKnownMarketplaces": { + "m1": {"source": {"path": str(mkt1), "source": "directory"}}, + "m2": {"source": {"path": str(mkt2), "source": "directory"}}, + }, + "enabledPlugins": {"a@m1": True, "b@m1": True, "c@m2": True} + })) + + result = _discover_claude_plugin_agent_dirs() + assert len(result) == 3 + + def test_path_traversal_blocked(self, tmp_path, monkeypatch, caplog): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + (mkt_path / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ + "name": "evil", "plugins": [ + {"name": "bad", "source": "../../../../etc", "version": "1"}, + ] + })) + # Create agents dir at the traversal target to ensure it would match + # if containment check didn't exist + (tmp_path / "etc" / "agents").mkdir(parents=True, exist_ok=True) + + (claude_dir / "settings.json").write_text(json.dumps({ + "extraKnownMarketplaces": {"evil": {"source": { + "path": str(mkt_path), "source": "directory" + }}}, + "enabledPlugins": {"bad@evil": True} + })) + + import logging + with caplog.at_level(logging.WARNING): + result = _discover_claude_plugin_agent_dirs() + assert result == [] + assert "escapes marketplace root" in caplog.text + + +class TestListAgentProfilesPluginIntegration: + """Integration tests: plugin agents appear in list_agent_profiles.""" + + def test_plugin_agent_appears_with_claude_plugin_source(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + mkt_path = _setup_marketplace( + tmp_path, [{"name": "testpkg", "has_agents": True}], + {"testpkg@aim": True} + ) + # Patch out other sources + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + result = list_agent_profiles() + names = {p["name"] for p in result} + assert "testpkg-agent" in names + plugin_profile = next(p for p in result if p["name"] == "testpkg-agent") + assert plugin_profile["source"] == "claude_plugin" + + def test_local_agent_beats_plugin_agent_in_dedup(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace( + tmp_path, [{"name": "mypkg", "has_agents": True}], + {"mypkg@aim": True} + ) + # Create a local agent with the same name as the plugin agent + local_store = tmp_path / "local-store" + local_store.mkdir() + (local_store / "mypkg-agent.md").write_text( + "---\nname: mypkg-agent\ndescription: Local override\n---\nLocal prompt" + ) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", local_store + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + result = list_agent_profiles() + plugin_profile = next(p for p in result if p["name"] == "mypkg-agent") + assert plugin_profile["source"] == "local" + + +class TestDiscoverEdgeCases: + """Additional edge-case tests for _discover_claude_plugin_agent_dirs.""" + + def test_empty_enabled_plugins_map(self, tmp_path, monkeypatch): + """enabledPlugins present but empty → no dirs returned.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], + enabled={}) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_orphan_enabled_plugin_no_matching_marketplace(self, tmp_path, monkeypatch): + """enabledPlugins has plugin@X but no marketplace named X.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], + enabled={"p1@nonexistent": True}, mkt_name="aim") + assert _discover_claude_plugin_agent_dirs() == [] + + def test_plugin_source_is_file_not_directory(self, tmp_path, monkeypatch): + """Plugin source resolves to a regular file — agents/ can't exist.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + # Create a file (not dir) at the plugin source path + (mkt_path / "plugin-file").write_text("I am a file") + (mkt_path / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ + "name": "aim", "plugins": [{"name": "fp", "source": "./plugin-file", "version": "1"}] + })) + (claude_dir / "settings.json").write_text(json.dumps({ + "extraKnownMarketplaces": {"aim": {"source": {"path": str(mkt_path), "source": "directory"}}}, + "enabledPlugins": {"fp@aim": True} + })) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_cross_marketplace_name_collision_first_wins(self, tmp_path, monkeypatch): + """Two marketplaces each have a plugin with same agent name — first scanned wins.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + + # Marketplace A (listed first in dict) + mkt_a = tmp_path / "mkt_a" + mkt_a.mkdir() + (mkt_a / ".claude-plugin").mkdir() + (mkt_a / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ + "name": "alpha", "plugins": [{"name": "shared", "source": "./shared", "version": "1"}] + })) + (mkt_a / "shared" / "agents").mkdir(parents=True) + (mkt_a / "shared" / "agents" / "dup.md").write_text( + "---\nname: dup\ndescription: from alpha\n---\nAlpha prompt") + + # Marketplace B + mkt_b = tmp_path / "mkt_b" + mkt_b.mkdir() + (mkt_b / ".claude-plugin").mkdir() + (mkt_b / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ + "name": "beta", "plugins": [{"name": "shared", "source": "./shared", "version": "1"}] + })) + (mkt_b / "shared" / "agents").mkdir(parents=True) + (mkt_b / "shared" / "agents" / "dup.md").write_text( + "---\nname: dup\ndescription: from beta\n---\nBeta prompt") + + (claude_dir / "settings.json").write_text(json.dumps({ + "extraKnownMarketplaces": { + "alpha": {"source": {"path": str(mkt_a), "source": "directory"}}, + "beta": {"source": {"path": str(mkt_b), "source": "directory"}}, + }, + "enabledPlugins": {"shared@alpha": True, "shared@beta": True} + })) + + # Both dirs returned — dedup happens at list_agent_profiles level + result = _discover_claude_plugin_agent_dirs() + assert len(result) == 2 + + # Now test that list_agent_profiles dedup picks first + from unittest.mock import MagicMock + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + profiles = list_agent_profiles() + dup_profile = next(p for p in profiles if p["name"] == "dup") + assert dup_profile["description"] == "from alpha" + + def test_symlink_escape_in_agents_dir(self, tmp_path, monkeypatch): + """Symlink inside agents/ pointing outside plugin root — file is still served. + + Note: containment check is on the plugin dir, not individual agent files. + This test documents current behavior (no per-file containment). + """ + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + (mkt_path / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ + "name": "aim", "plugins": [{"name": "sym", "source": "./sym", "version": "1"}] + })) + (mkt_path / "sym" / "agents").mkdir(parents=True) + + # Create an external file and symlink to it from agents/ + external = tmp_path / "external" + external.mkdir() + (external / "evil.md").write_text("---\nname: evil\ndescription: escaped\n---\nEvil") + (mkt_path / "sym" / "agents" / "evil.md").symlink_to(external / "evil.md") + + (claude_dir / "settings.json").write_text(json.dumps({ + "extraKnownMarketplaces": {"aim": {"source": {"path": str(mkt_path), "source": "directory"}}}, + "enabledPlugins": {"sym@aim": True} + })) + + # The agents dir IS returned (plugin dir containment passes) + result = _discover_claude_plugin_agent_dirs() + assert len(result) == 1 + # The symlinked file IS scanned (no per-file containment check) + from unittest.mock import MagicMock + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + profiles = list_agent_profiles() + names = {p["name"] for p in profiles} + assert "evil" in names # DEFECT: symlink escape not blocked at agent-file level + + +class TestReadAgentProfileSourcePluginIntegration: + """Integration: _read_agent_profile_source falls through plugin dirs.""" + + def test_agent_found_in_plugin_dir(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + from cli_agent_orchestrator.utils.agent_profiles import _read_agent_profile_source + _setup_marketplace(tmp_path, [{"name": "pkg", "has_agents": True}], + {"pkg@aim": True}) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + mock_store = MagicMock() + mock_store.__truediv__ = lambda self, name: MagicMock(is_file=lambda: False) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + result = _read_agent_profile_source("pkg-agent") + assert "Agent from pkg" in result + + def test_agent_not_found_raises(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + from cli_agent_orchestrator.utils.agent_profiles import _read_agent_profile_source + _setup_marketplace(tmp_path, [{"name": "pkg", "has_agents": True}], + {"pkg@aim": True}) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + mock_store = MagicMock() + mock_store.__truediv__ = lambda self, name: MagicMock(is_file=lambda: False) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + with pytest.raises(FileNotFoundError): + _read_agent_profile_source("nonexistent-agent") From 5d8cee4c4fa035117085180ef41fec65a0ccb79d Mon Sep 17 00:00:00 2001 From: Lei Huang Date: Fri, 8 May 2026 17:43:38 +0000 Subject: [PATCH 2/8] style: apply black formatting to test_claude_plugin_discovery.py --- test/utils/test_claude_plugin_discovery.py | 282 +++++++++++++-------- 1 file changed, 182 insertions(+), 100 deletions(-) diff --git a/test/utils/test_claude_plugin_discovery.py b/test/utils/test_claude_plugin_discovery.py index ebfb8badb..17bd15438 100644 --- a/test/utils/test_claude_plugin_discovery.py +++ b/test/utils/test_claude_plugin_discovery.py @@ -12,17 +12,26 @@ ) -def _setup_marketplace(home: Path, plugins: list, enabled: dict, mkt_name="aim", - mkt_source_type="directory", extra_settings=None): +def _setup_marketplace( + home: Path, + plugins: list, + enabled: dict, + mkt_name="aim", + mkt_source_type="directory", + extra_settings=None, +): """Helper to create a fake marketplace structure under a fake home.""" claude_dir = home / ".claude" claude_dir.mkdir(parents=True) mkt_path = home / ".aim" / "cc-plugins" mkt_path.mkdir(parents=True) - settings = {"extraKnownMarketplaces": {mkt_name: {"source": { - "path": str(mkt_path), "source": mkt_source_type - }}}, "enabledPlugins": enabled} + settings = { + "extraKnownMarketplaces": { + mkt_name: {"source": {"path": str(mkt_path), "source": mkt_source_type}} + }, + "enabledPlugins": enabled, + } if extra_settings: settings.update(extra_settings) (claude_dir / "settings.json").write_text(json.dumps(settings)) @@ -42,9 +51,9 @@ def _setup_marketplace(home: Path, plugins: list, enabled: dict, mkt_name="aim", (agents_dir / f"{name}-agent.md").write_text( f"---\nname: {name}-agent\ndescription: Agent from {name}\n---\nPrompt" ) - (mkt_meta_dir / "marketplace.json").write_text(json.dumps({ - "name": mkt_name, "plugins": mkt_json_plugins - })) + (mkt_meta_dir / "marketplace.json").write_text( + json.dumps({"name": mkt_name, "plugins": mkt_json_plugins}) + ) return mkt_path @@ -68,6 +77,7 @@ def test_malformed_settings_json(self, tmp_path, monkeypatch, caplog): claude_dir.mkdir(parents=True) (claude_dir / "settings.json").write_text("not valid json{{{") import logging + with caplog.at_level(logging.WARNING): result = _discover_claude_plugin_agent_dirs() assert result == [] @@ -75,20 +85,25 @@ def test_malformed_settings_json(self, tmp_path, monkeypatch, caplog): def test_non_directory_source_skipped(self, tmp_path, monkeypatch): monkeypatch.setattr(Path, "home", lambda: tmp_path) - _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], - {"p1@aim": True}, mkt_source_type="git") + _setup_marketplace( + tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True}, mkt_source_type="git" + ) assert _discover_claude_plugin_agent_dirs() == [] def test_marketplace_path_not_exists(self, tmp_path, monkeypatch): monkeypatch.setattr(Path, "home", lambda: tmp_path) claude_dir = tmp_path / ".claude" claude_dir.mkdir(parents=True) - (claude_dir / "settings.json").write_text(json.dumps({ - "extraKnownMarketplaces": {"aim": {"source": { - "path": "/nonexistent/path", "source": "directory" - }}}, - "enabledPlugins": {} - })) + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": "/nonexistent/path", "source": "directory"}} + }, + "enabledPlugins": {}, + } + ) + ) assert _discover_claude_plugin_agent_dirs() == [] def test_marketplace_json_missing(self, tmp_path, monkeypatch): @@ -97,25 +112,27 @@ def test_marketplace_json_missing(self, tmp_path, monkeypatch): claude_dir.mkdir(parents=True) mkt_path = tmp_path / ".aim" / "cc-plugins" mkt_path.mkdir(parents=True) - (claude_dir / "settings.json").write_text(json.dumps({ - "extraKnownMarketplaces": {"aim": {"source": { - "path": str(mkt_path), "source": "directory" - }}}, - "enabledPlugins": {"p1@aim": True} - })) + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"p1@aim": True}, + } + ) + ) assert _discover_claude_plugin_agent_dirs() == [] def test_plugin_not_enabled_skipped(self, tmp_path, monkeypatch): monkeypatch.setattr(Path, "home", lambda: tmp_path) - _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], - {"p1@aim": False}) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": False}) assert _discover_claude_plugin_agent_dirs() == [] def test_plugin_enabled_agents_dir_exists(self, tmp_path, monkeypatch): monkeypatch.setattr(Path, "home", lambda: tmp_path) mkt_path = _setup_marketplace( - tmp_path, [{"name": "myplugin", "has_agents": True}], - {"myplugin@aim": True} + tmp_path, [{"name": "myplugin", "has_agents": True}], {"myplugin@aim": True} ) result = _discover_claude_plugin_agent_dirs() assert len(result) == 1 @@ -124,8 +141,9 @@ def test_plugin_enabled_agents_dir_exists(self, tmp_path, monkeypatch): def test_plugin_enabled_agents_dir_missing(self, tmp_path, monkeypatch): monkeypatch.setattr(Path, "home", lambda: tmp_path) # Plugin exists but no agents/ subdir - _setup_marketplace(tmp_path, [{"name": "noagents", "has_agents": False}], - {"noagents@aim": True}) + _setup_marketplace( + tmp_path, [{"name": "noagents", "has_agents": False}], {"noagents@aim": True} + ) assert _discover_claude_plugin_agent_dirs() == [] def test_multiple_marketplaces_multiple_plugins(self, tmp_path, monkeypatch): @@ -137,12 +155,17 @@ def test_multiple_marketplaces_multiple_plugins(self, tmp_path, monkeypatch): mkt1 = tmp_path / "mkt1" mkt1.mkdir() (mkt1 / ".claude-plugin").mkdir() - (mkt1 / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ - "name": "m1", "plugins": [ - {"name": "a", "source": "./a", "version": "1"}, - {"name": "b", "source": "./b", "version": "1"}, - ] - })) + (mkt1 / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "m1", + "plugins": [ + {"name": "a", "source": "./a", "version": "1"}, + {"name": "b", "source": "./b", "version": "1"}, + ], + } + ) + ) (mkt1 / "a" / "agents").mkdir(parents=True) (mkt1 / "a" / "agents" / "a-agent.md").write_text("---\nname: a-agent\n---\n") (mkt1 / "b" / "agents").mkdir(parents=True) @@ -152,21 +175,30 @@ def test_multiple_marketplaces_multiple_plugins(self, tmp_path, monkeypatch): mkt2 = tmp_path / "mkt2" mkt2.mkdir() (mkt2 / ".claude-plugin").mkdir() - (mkt2 / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ - "name": "m2", "plugins": [ - {"name": "c", "source": "./c", "version": "1"}, - ] - })) + (mkt2 / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "m2", + "plugins": [ + {"name": "c", "source": "./c", "version": "1"}, + ], + } + ) + ) (mkt2 / "c" / "agents").mkdir(parents=True) (mkt2 / "c" / "agents" / "c-agent.md").write_text("---\nname: c-agent\n---\n") - (claude_dir / "settings.json").write_text(json.dumps({ - "extraKnownMarketplaces": { - "m1": {"source": {"path": str(mkt1), "source": "directory"}}, - "m2": {"source": {"path": str(mkt2), "source": "directory"}}, - }, - "enabledPlugins": {"a@m1": True, "b@m1": True, "c@m2": True} - })) + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "m1": {"source": {"path": str(mkt1), "source": "directory"}}, + "m2": {"source": {"path": str(mkt2), "source": "directory"}}, + }, + "enabledPlugins": {"a@m1": True, "b@m1": True, "c@m2": True}, + } + ) + ) result = _discover_claude_plugin_agent_dirs() assert len(result) == 3 @@ -178,23 +210,33 @@ def test_path_traversal_blocked(self, tmp_path, monkeypatch, caplog): mkt_path = tmp_path / "mkt" mkt_path.mkdir() (mkt_path / ".claude-plugin").mkdir() - (mkt_path / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ - "name": "evil", "plugins": [ - {"name": "bad", "source": "../../../../etc", "version": "1"}, - ] - })) + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "evil", + "plugins": [ + {"name": "bad", "source": "../../../../etc", "version": "1"}, + ], + } + ) + ) # Create agents dir at the traversal target to ensure it would match # if containment check didn't exist (tmp_path / "etc" / "agents").mkdir(parents=True, exist_ok=True) - (claude_dir / "settings.json").write_text(json.dumps({ - "extraKnownMarketplaces": {"evil": {"source": { - "path": str(mkt_path), "source": "directory" - }}}, - "enabledPlugins": {"bad@evil": True} - })) + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "evil": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"bad@evil": True}, + } + ) + ) import logging + with caplog.at_level(logging.WARNING): result = _discover_claude_plugin_agent_dirs() assert result == [] @@ -207,8 +249,7 @@ class TestListAgentProfilesPluginIntegration: def test_plugin_agent_appears_with_claude_plugin_source(self, tmp_path, monkeypatch): monkeypatch.setattr(Path, "home", lambda: tmp_path) mkt_path = _setup_marketplace( - tmp_path, [{"name": "testpkg", "has_agents": True}], - {"testpkg@aim": True} + tmp_path, [{"name": "testpkg", "has_agents": True}], {"testpkg@aim": True} ) # Patch out other sources monkeypatch.setattr( @@ -222,6 +263,7 @@ def test_plugin_agent_appears_with_claude_plugin_source(self, tmp_path, monkeypa "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] ) from unittest.mock import MagicMock + mock_store = MagicMock() mock_store.iterdir.return_value = [] monkeypatch.setattr( @@ -237,10 +279,7 @@ def test_plugin_agent_appears_with_claude_plugin_source(self, tmp_path, monkeypa def test_local_agent_beats_plugin_agent_in_dedup(self, tmp_path, monkeypatch): monkeypatch.setattr(Path, "home", lambda: tmp_path) - _setup_marketplace( - tmp_path, [{"name": "mypkg", "has_agents": True}], - {"mypkg@aim": True} - ) + _setup_marketplace(tmp_path, [{"name": "mypkg", "has_agents": True}], {"mypkg@aim": True}) # Create a local agent with the same name as the plugin agent local_store = tmp_path / "local-store" local_store.mkdir() @@ -257,6 +296,7 @@ def test_local_agent_beats_plugin_agent_in_dedup(self, tmp_path, monkeypatch): "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] ) from unittest.mock import MagicMock + mock_store = MagicMock() mock_store.iterdir.return_value = [] monkeypatch.setattr( @@ -275,15 +315,18 @@ class TestDiscoverEdgeCases: def test_empty_enabled_plugins_map(self, tmp_path, monkeypatch): """enabledPlugins present but empty → no dirs returned.""" monkeypatch.setattr(Path, "home", lambda: tmp_path) - _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], - enabled={}) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], enabled={}) assert _discover_claude_plugin_agent_dirs() == [] def test_orphan_enabled_plugin_no_matching_marketplace(self, tmp_path, monkeypatch): """enabledPlugins has plugin@X but no marketplace named X.""" monkeypatch.setattr(Path, "home", lambda: tmp_path) - _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], - enabled={"p1@nonexistent": True}, mkt_name="aim") + _setup_marketplace( + tmp_path, + [{"name": "p1", "has_agents": True}], + enabled={"p1@nonexistent": True}, + mkt_name="aim", + ) assert _discover_claude_plugin_agent_dirs() == [] def test_plugin_source_is_file_not_directory(self, tmp_path, monkeypatch): @@ -296,13 +339,24 @@ def test_plugin_source_is_file_not_directory(self, tmp_path, monkeypatch): (mkt_path / ".claude-plugin").mkdir() # Create a file (not dir) at the plugin source path (mkt_path / "plugin-file").write_text("I am a file") - (mkt_path / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ - "name": "aim", "plugins": [{"name": "fp", "source": "./plugin-file", "version": "1"}] - })) - (claude_dir / "settings.json").write_text(json.dumps({ - "extraKnownMarketplaces": {"aim": {"source": {"path": str(mkt_path), "source": "directory"}}}, - "enabledPlugins": {"fp@aim": True} - })) + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "aim", + "plugins": [{"name": "fp", "source": "./plugin-file", "version": "1"}], + } + ) + ) + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"fp@aim": True}, + } + ) + ) assert _discover_claude_plugin_agent_dirs() == [] def test_cross_marketplace_name_collision_first_wins(self, tmp_path, monkeypatch): @@ -315,31 +369,47 @@ def test_cross_marketplace_name_collision_first_wins(self, tmp_path, monkeypatch mkt_a = tmp_path / "mkt_a" mkt_a.mkdir() (mkt_a / ".claude-plugin").mkdir() - (mkt_a / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ - "name": "alpha", "plugins": [{"name": "shared", "source": "./shared", "version": "1"}] - })) + (mkt_a / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "alpha", + "plugins": [{"name": "shared", "source": "./shared", "version": "1"}], + } + ) + ) (mkt_a / "shared" / "agents").mkdir(parents=True) (mkt_a / "shared" / "agents" / "dup.md").write_text( - "---\nname: dup\ndescription: from alpha\n---\nAlpha prompt") + "---\nname: dup\ndescription: from alpha\n---\nAlpha prompt" + ) # Marketplace B mkt_b = tmp_path / "mkt_b" mkt_b.mkdir() (mkt_b / ".claude-plugin").mkdir() - (mkt_b / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ - "name": "beta", "plugins": [{"name": "shared", "source": "./shared", "version": "1"}] - })) + (mkt_b / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "beta", + "plugins": [{"name": "shared", "source": "./shared", "version": "1"}], + } + ) + ) (mkt_b / "shared" / "agents").mkdir(parents=True) (mkt_b / "shared" / "agents" / "dup.md").write_text( - "---\nname: dup\ndescription: from beta\n---\nBeta prompt") + "---\nname: dup\ndescription: from beta\n---\nBeta prompt" + ) - (claude_dir / "settings.json").write_text(json.dumps({ - "extraKnownMarketplaces": { - "alpha": {"source": {"path": str(mkt_a), "source": "directory"}}, - "beta": {"source": {"path": str(mkt_b), "source": "directory"}}, - }, - "enabledPlugins": {"shared@alpha": True, "shared@beta": True} - })) + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "alpha": {"source": {"path": str(mkt_a), "source": "directory"}}, + "beta": {"source": {"path": str(mkt_b), "source": "directory"}}, + }, + "enabledPlugins": {"shared@alpha": True, "shared@beta": True}, + } + ) + ) # Both dirs returned — dedup happens at list_agent_profiles level result = _discover_claude_plugin_agent_dirs() @@ -347,6 +417,7 @@ def test_cross_marketplace_name_collision_first_wins(self, tmp_path, monkeypatch # Now test that list_agent_profiles dedup picks first from unittest.mock import MagicMock + monkeypatch.setattr( "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", tmp_path / "empty-local", @@ -379,9 +450,11 @@ def test_symlink_escape_in_agents_dir(self, tmp_path, monkeypatch): mkt_path = tmp_path / "mkt" mkt_path.mkdir() (mkt_path / ".claude-plugin").mkdir() - (mkt_path / ".claude-plugin" / "marketplace.json").write_text(json.dumps({ - "name": "aim", "plugins": [{"name": "sym", "source": "./sym", "version": "1"}] - })) + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + {"name": "aim", "plugins": [{"name": "sym", "source": "./sym", "version": "1"}]} + ) + ) (mkt_path / "sym" / "agents").mkdir(parents=True) # Create an external file and symlink to it from agents/ @@ -390,16 +463,23 @@ def test_symlink_escape_in_agents_dir(self, tmp_path, monkeypatch): (external / "evil.md").write_text("---\nname: evil\ndescription: escaped\n---\nEvil") (mkt_path / "sym" / "agents" / "evil.md").symlink_to(external / "evil.md") - (claude_dir / "settings.json").write_text(json.dumps({ - "extraKnownMarketplaces": {"aim": {"source": {"path": str(mkt_path), "source": "directory"}}}, - "enabledPlugins": {"sym@aim": True} - })) + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"sym@aim": True}, + } + ) + ) # The agents dir IS returned (plugin dir containment passes) result = _discover_claude_plugin_agent_dirs() assert len(result) == 1 # The symlinked file IS scanned (no per-file containment check) from unittest.mock import MagicMock + monkeypatch.setattr( "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", tmp_path / "empty-local", @@ -427,8 +507,8 @@ class TestReadAgentProfileSourcePluginIntegration: def test_agent_found_in_plugin_dir(self, tmp_path, monkeypatch): monkeypatch.setattr(Path, "home", lambda: tmp_path) from cli_agent_orchestrator.utils.agent_profiles import _read_agent_profile_source - _setup_marketplace(tmp_path, [{"name": "pkg", "has_agents": True}], - {"pkg@aim": True}) + + _setup_marketplace(tmp_path, [{"name": "pkg", "has_agents": True}], {"pkg@aim": True}) monkeypatch.setattr( "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", tmp_path / "empty-local", @@ -440,6 +520,7 @@ def test_agent_found_in_plugin_dir(self, tmp_path, monkeypatch): "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] ) from unittest.mock import MagicMock + mock_store = MagicMock() mock_store.__truediv__ = lambda self, name: MagicMock(is_file=lambda: False) monkeypatch.setattr( @@ -452,8 +533,8 @@ def test_agent_found_in_plugin_dir(self, tmp_path, monkeypatch): def test_agent_not_found_raises(self, tmp_path, monkeypatch): monkeypatch.setattr(Path, "home", lambda: tmp_path) from cli_agent_orchestrator.utils.agent_profiles import _read_agent_profile_source - _setup_marketplace(tmp_path, [{"name": "pkg", "has_agents": True}], - {"pkg@aim": True}) + + _setup_marketplace(tmp_path, [{"name": "pkg", "has_agents": True}], {"pkg@aim": True}) monkeypatch.setattr( "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", tmp_path / "empty-local", @@ -465,6 +546,7 @@ def test_agent_not_found_raises(self, tmp_path, monkeypatch): "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] ) from unittest.mock import MagicMock + mock_store = MagicMock() mock_store.__truediv__ = lambda self, name: MagicMock(is_file=lambda: False) monkeypatch.setattr( From 03e6d3545896eca5c32ead84aec6df59d6d73cf1 Mon Sep 17 00:00:00 2001 From: Lei Huang Date: Fri, 8 May 2026 17:48:57 +0000 Subject: [PATCH 3/8] fix(agent_profiles): strip leading HTML comments before frontmatter parsing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Some profile generators (e.g. AIM `plugins install --local`) prepend HTML comment blocks before the YAML frontmatter delimiter. python-frontmatter requires '---' on the first line, so these profiles silently parse with empty metadata — causing mcpServers, model, and allowedTools to be None at runtime. Add a defensive regex strip at the top of parse_agent_profile_text() so CAO handles these profiles correctly regardless of upstream generator behavior. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/cli_agent_orchestrator/utils/agent_profiles.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/cli_agent_orchestrator/utils/agent_profiles.py b/src/cli_agent_orchestrator/utils/agent_profiles.py index 2eae1f94a..7797cea08 100644 --- a/src/cli_agent_orchestrator/utils/agent_profiles.py +++ b/src/cli_agent_orchestrator/utils/agent_profiles.py @@ -2,6 +2,7 @@ import json import logging +import re from importlib import resources from pathlib import Path from typing import Dict, List @@ -206,6 +207,10 @@ def list_agent_profiles() -> List[Dict]: def parse_agent_profile_text(resolved_text: str, profile_name: str) -> AgentProfile: """Parse an AgentProfile from already-resolved markdown text.""" + # Strip leading HTML comments before the YAML frontmatter fence. + # Some profile generators (e.g. AIM) prepend blocks that + # prevent python-frontmatter from detecting the opening '---' delimiter. + resolved_text = re.sub(r"^\s*", "", resolved_text, flags=re.DOTALL) profile_data = frontmatter.loads(resolved_text) meta = profile_data.metadata meta["system_prompt"] = profile_data.content.strip() From 140d6e9a9dd589fb52c922273f2938cd590667bc Mon Sep 17 00:00:00 2001 From: Lei Huang Date: Fri, 8 May 2026 18:16:54 +0000 Subject: [PATCH 4/8] test(agent_profiles): cover HTML-comment strip before frontmatter parse --- test/utils/test_agent_profiles.py | 47 +++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/test/utils/test_agent_profiles.py b/test/utils/test_agent_profiles.py index 7e5b3eb89..a9e107ae0 100644 --- a/test/utils/test_agent_profiles.py +++ b/test/utils/test_agent_profiles.py @@ -622,3 +622,50 @@ def test_load_agent_profile_builtin_store_fallback_resolves_vars( assert profile.system_prompt == "Body token: builtin-secret" assert profile.mcpServers is not None assert profile.mcpServers["service"]["env"]["API_TOKEN"] == "builtin-secret" + + +class TestParseAgentProfileTextHtmlCommentStrip: + """Regression tests for HTML-comment stripping before frontmatter parsing (03e6d35).""" + + def test_leading_html_comment_stripped_before_frontmatter(self): + """Profile with a leading comment parses correctly.""" + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + text = "\n---\nname: my-agent\ndescription: Test\n---\nPrompt" + profile = parse_agent_profile_text(text, "my-agent") + + assert profile.name == "my-agent" + assert profile.description == "Test" + assert profile.system_prompt == "Prompt" + + def test_no_comment_passthrough(self): + """Profile without a leading comment parses identically (strip is a no-op).""" + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + text = "---\nname: plain\ndescription: Plain\n---\nPlain prompt" + profile = parse_agent_profile_text(text, "plain") + + assert profile.name == "plain" + assert profile.description == "Plain" + assert profile.system_prompt == "Plain prompt" + + def test_multiline_html_comment_stripped(self): + """Multiline comment is stripped (re.DOTALL behavior).""" + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + text = "\n---\nname: multi\ndescription: Multi\n---\nBody" + profile = parse_agent_profile_text(text, "multi") + + assert profile.name == "multi" + assert profile.description == "Multi" + assert profile.system_prompt == "Body" + + def test_non_leading_comment_not_stripped(self): + """Comment that is NOT at the start of text should not be stripped.""" + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + # The text starts with frontmatter, comment appears in the body — should remain. + text = "---\nname: keep\ndescription: Keep\n---\n\nBody" + profile = parse_agent_profile_text(text, "keep") + + assert "" in profile.system_prompt From 6a93db32225257f3c5f922f110300c66498704da Mon Sep 17 00:00:00 2001 From: Lei Huang Date: Mon, 11 May 2026 23:15:15 +0000 Subject: [PATCH 5/8] fix(agent_profiles): strip multiple leading HTML comment blocks The ^-anchored regex introduced in 03e6d35 only matched the first leading HTML comment block. Profiles with multiple consecutive blocks (e.g. AIM-generated profiles with a boilerplate header + per-agent header) would have only the first stripped, causing frontmatter detection to fail. Replace the single-match pattern with a non-capturing repeat group `^(?:\s*)+` so all leading comment blocks are stripped in a single regex call. --- .../utils/agent_profiles.py | 2 +- test/utils/test_agent_profiles.py | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/cli_agent_orchestrator/utils/agent_profiles.py b/src/cli_agent_orchestrator/utils/agent_profiles.py index 7797cea08..915656dde 100644 --- a/src/cli_agent_orchestrator/utils/agent_profiles.py +++ b/src/cli_agent_orchestrator/utils/agent_profiles.py @@ -210,7 +210,7 @@ def parse_agent_profile_text(resolved_text: str, profile_name: str) -> AgentProf # Strip leading HTML comments before the YAML frontmatter fence. # Some profile generators (e.g. AIM) prepend blocks that # prevent python-frontmatter from detecting the opening '---' delimiter. - resolved_text = re.sub(r"^\s*", "", resolved_text, flags=re.DOTALL) + resolved_text = re.sub(r"^(?:\s*)+", "", resolved_text, flags=re.DOTALL) profile_data = frontmatter.loads(resolved_text) meta = profile_data.metadata meta["system_prompt"] = profile_data.content.strip() diff --git a/test/utils/test_agent_profiles.py b/test/utils/test_agent_profiles.py index a9e107ae0..836276dd8 100644 --- a/test/utils/test_agent_profiles.py +++ b/test/utils/test_agent_profiles.py @@ -669,3 +669,22 @@ def test_non_leading_comment_not_stripped(self): profile = parse_agent_profile_text(text, "keep") assert "" in profile.system_prompt + + def test_multiple_leading_html_comments_stripped(self): + """Multiple consecutive leading blocks are all stripped.""" + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + text = ( + "\n" + "\n" + "---\n" + "name: test-agent\n" + "description: Multi-comment\n" + "---\n" + "body" + ) + profile = parse_agent_profile_text(text, "test-agent") + + assert profile.name == "test-agent" + assert profile.description == "Multi-comment" + assert profile.system_prompt == "body" From b43a1d2bf62c06d1c9b37674b71b75910793c7a2 Mon Sep 17 00:00:00 2001 From: Lei Huang Date: Mon, 11 May 2026 23:18:04 +0000 Subject: [PATCH 6/8] fix(agent_profiles): validate per-file containment for plugin agents Individual files inside a plugin's agents/ directory (e.g. symlinks pointing outside the marketplace root) were enumerated without per-file validation. Add _scan_plugin_directory() that wraps _scan_directory with resolve()+is_relative_to() checks against the marketplace root for each file entry. Scope is narrow: only the claude_plugin discovery path uses the new containment check. Other _scan_directory callers (local store, provider dirs, extra dirs) are unchanged. _discover_claude_plugin_agent_dirs() now returns (agents_dir, plugin_root) tuples so callers can pass the root for containment validation. Addresses reviewer comment about individual files escaping containment. --- .../utils/agent_profiles.py | 92 ++++++++++- test/utils/test_agent_profiles.py | 6 +- test/utils/test_claude_plugin_discovery.py | 147 +++++++++++++++++- 3 files changed, 227 insertions(+), 18 deletions(-) diff --git a/src/cli_agent_orchestrator/utils/agent_profiles.py b/src/cli_agent_orchestrator/utils/agent_profiles.py index 915656dde..acebdb2f2 100644 --- a/src/cli_agent_orchestrator/utils/agent_profiles.py +++ b/src/cli_agent_orchestrator/utils/agent_profiles.py @@ -83,8 +83,73 @@ def _scan_directory(directory: Path, source_label: str, profiles: Dict[str, Dict } -def _discover_claude_plugin_agent_dirs() -> List[Path]: - """Walk Claude Code marketplaces and return agent dirs from enabled plugins.""" +def _is_path_contained(path: Path, root: Path) -> bool: + """Return True if *path* resolves to a location inside *root*.""" + try: + path.resolve().relative_to(root.resolve()) + return True + except ValueError: + return False + + +def _scan_plugin_directory( + directory: Path, source_label: str, profiles: Dict[str, Dict], plugin_root: Path +) -> None: + """Scan a plugin directory with per-file path containment validation. + + Like ``_scan_directory`` but skips any entry whose resolved path escapes + *plugin_root*. This prevents symlinks inside a plugin's agents/ directory + from exposing files outside the plugin tree. + """ + if not directory.exists(): + return + resolved_root = plugin_root.resolve() + for item in directory.iterdir(): + if not _is_path_contained(item, resolved_root): + logger.warning( + "Plugin agent file %s resolves outside plugin root %s — skipping", + item, + resolved_root, + ) + continue + if item.is_dir(): + profile_name = item.name + desc = "" + agent_md = item / "agent.md" + if agent_md.exists() and _is_path_contained(agent_md, resolved_root): + try: + data = frontmatter.loads(agent_md.read_text()) + desc = data.metadata.get("description", "") + except Exception: + pass + if profile_name not in profiles: + profiles[profile_name] = { + "name": profile_name, + "description": desc, + "source": source_label, + } + elif item.suffix == ".md" and item.is_file(): + profile_name = item.stem + desc = "" + try: + data = frontmatter.loads(item.read_text()) + desc = data.metadata.get("description", "") + except Exception: + pass + if profile_name not in profiles: + profiles[profile_name] = { + "name": profile_name, + "description": desc, + "source": source_label, + } + + +def _discover_claude_plugin_agent_dirs() -> List[tuple[Path, Path]]: + """Walk Claude Code marketplaces and return agent dirs from enabled plugins. + + Returns a list of ``(agents_dir, marketplace_root)`` tuples so callers can + perform per-file path containment against the marketplace root. + """ settings_path = Path.home() / ".claude" / "settings.json" try: data = json.loads(settings_path.read_text()) @@ -98,7 +163,7 @@ def _discover_claude_plugin_agent_dirs() -> List[Path]: if not marketplaces: return [] - agent_dirs: List[Path] = [] + agent_dirs: List[tuple[Path, Path]] = [] for mkt_name, mkt_config in marketplaces.items(): source = mkt_config.get("source", {}) if source.get("source") != "directory": @@ -133,7 +198,7 @@ def _discover_claude_plugin_agent_dirs() -> List[Path]: continue agents_dir = plugin_dir / "agents" if agents_dir.is_dir(): - agent_dirs.append(agents_dir) + agent_dirs.append((agents_dir, resolved_mkt)) return agent_dirs @@ -195,8 +260,8 @@ def list_agent_profiles() -> List[Dict]: _scan_directory(path, label, profiles) # 4. Claude Code plugin marketplace directories - for plugin_agents_dir in _discover_claude_plugin_agent_dirs(): - _scan_directory(plugin_agents_dir, "claude_plugin", profiles) + for plugin_agents_dir, plugin_root in _discover_claude_plugin_agent_dirs(): + _scan_plugin_directory(plugin_agents_dir, "claude_plugin", profiles, plugin_root) # 5. Extra user-added directories for extra_dir in get_extra_agent_dirs(): @@ -267,10 +332,21 @@ def _lookup_in_directory(directory: Path) -> str | None: if found is not None: return found - for plugin_agents_dir in _discover_claude_plugin_agent_dirs(): + for plugin_agents_dir, plugin_root in _discover_claude_plugin_agent_dirs(): found = _lookup_in_directory(plugin_agents_dir) if found is not None: - return found + # Verify the resolved file stays inside the plugin root + flat = _safe_join(plugin_agents_dir, f"{agent_name}.md") + nested = _safe_join(plugin_agents_dir, agent_name, "agent.md") + candidate = flat if (flat is not None and flat.exists()) else nested + if candidate is not None and _is_path_contained(candidate, plugin_root): + return found + logger.warning( + "Plugin agent file for '%s' resolves outside plugin root %s — skipping", + agent_name, + plugin_root, + ) + continue for extra_dir in get_extra_agent_dirs(): found = _lookup_in_directory(Path(extra_dir)) diff --git a/test/utils/test_agent_profiles.py b/test/utils/test_agent_profiles.py index 836276dd8..88c5ab88f 100644 --- a/test/utils/test_agent_profiles.py +++ b/test/utils/test_agent_profiles.py @@ -286,11 +286,12 @@ def fake_scan(directory, source_label, profiles): @patch("cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.services.settings_service.get_agent_dirs", return_value={}) + @patch("cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.utils.agent_profiles._scan_directory") @patch("cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR") @patch("cli_agent_orchestrator.utils.agent_profiles.resources") def test_list_agent_profiles_includes_builtin_profiles( - self, mock_resources, mock_local_dir, mock_scan, mock_get_agent_dirs, mock_get_extra_dirs + self, mock_resources, mock_local_dir, mock_scan, mock_discover, mock_get_agent_dirs, mock_get_extra_dirs ): """Test that built-in profiles are included and marked with source 'built-in'.""" from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles @@ -406,11 +407,12 @@ def track_scan(directory, source_label, profiles): @patch("cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.services.settings_service.get_agent_dirs", return_value={}) + @patch("cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.utils.agent_profiles._scan_directory") @patch("cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR") @patch("cli_agent_orchestrator.utils.agent_profiles.resources") def test_list_agent_profiles_returns_sorted_by_name( - self, mock_resources, mock_local_dir, mock_scan, mock_get_agent_dirs, mock_get_extra_dirs + self, mock_resources, mock_local_dir, mock_scan, mock_discover, mock_get_agent_dirs, mock_get_extra_dirs ): """Test that returned profiles are sorted alphabetically by name.""" from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles diff --git a/test/utils/test_claude_plugin_discovery.py b/test/utils/test_claude_plugin_discovery.py index 17bd15438..06b7a3f87 100644 --- a/test/utils/test_claude_plugin_discovery.py +++ b/test/utils/test_claude_plugin_discovery.py @@ -1,6 +1,7 @@ """Tests for Claude Code plugin marketplace auto-discovery.""" import json +import sys from pathlib import Path from unittest.mock import patch @@ -136,7 +137,9 @@ def test_plugin_enabled_agents_dir_exists(self, tmp_path, monkeypatch): ) result = _discover_claude_plugin_agent_dirs() assert len(result) == 1 - assert result[0] == (mkt_path / "myplugin" / "agents").resolve() + agents_dir, plugin_root = result[0] + assert agents_dir == (mkt_path / "myplugin" / "agents").resolve() + assert plugin_root == mkt_path.resolve() def test_plugin_enabled_agents_dir_missing(self, tmp_path, monkeypatch): monkeypatch.setattr(Path, "home", lambda: tmp_path) @@ -438,11 +441,11 @@ def test_cross_marketplace_name_collision_first_wins(self, tmp_path, monkeypatch dup_profile = next(p for p in profiles if p["name"] == "dup") assert dup_profile["description"] == "from alpha" - def test_symlink_escape_in_agents_dir(self, tmp_path, monkeypatch): - """Symlink inside agents/ pointing outside plugin root — file is still served. + def test_symlink_escape_in_agents_dir(self, tmp_path, monkeypatch, caplog): + """Symlink inside agents/ pointing outside plugin root — file is now blocked. - Note: containment check is on the plugin dir, not individual agent files. - This test documents current behavior (no per-file containment). + Per-file containment check rejects files that resolve outside the + marketplace root. """ monkeypatch.setattr(Path, "home", lambda: tmp_path) claude_dir = tmp_path / ".claude" @@ -477,7 +480,7 @@ def test_symlink_escape_in_agents_dir(self, tmp_path, monkeypatch): # The agents dir IS returned (plugin dir containment passes) result = _discover_claude_plugin_agent_dirs() assert len(result) == 1 - # The symlinked file IS scanned (no per-file containment check) + # The symlinked file is now blocked by per-file containment from unittest.mock import MagicMock monkeypatch.setattr( @@ -496,9 +499,14 @@ def test_symlink_escape_in_agents_dir(self, tmp_path, monkeypatch): "cli_agent_orchestrator.utils.agent_profiles.resources.files", lambda _pkg: mock_store, ) - profiles = list_agent_profiles() + + import logging + + with caplog.at_level(logging.WARNING): + profiles = list_agent_profiles() names = {p["name"] for p in profiles} - assert "evil" in names # DEFECT: symlink escape not blocked at agent-file level + assert "evil" not in names + assert "resolves outside plugin root" in caplog.text class TestReadAgentProfileSourcePluginIntegration: @@ -555,3 +563,126 @@ def test_agent_not_found_raises(self, tmp_path, monkeypatch): ) with pytest.raises(FileNotFoundError): _read_agent_profile_source("nonexistent-agent") + + +@pytest.mark.skipif(sys.platform == "win32", reason="symlinks require elevated privileges on Windows") +class TestPluginPerFileContainment: + """Tests for per-file path containment in plugin agent scanning.""" + + def test_symlink_outside_plugin_root_rejected(self, tmp_path, monkeypatch, caplog): + """Symlink inside agents/ pointing outside marketplace root is skipped with warning.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + {"name": "aim", "plugins": [{"name": "p1", "source": "./p1", "version": "1"}]} + ) + ) + agents_dir = mkt_path / "p1" / "agents" + agents_dir.mkdir(parents=True) + # Real file inside plugin root + (agents_dir / "safe.md").write_text("---\nname: safe\ndescription: Safe\n---\nOK") + # Symlink escaping to outside + external = tmp_path / "outside" + external.mkdir() + (external / "escape.md").write_text("---\nname: escape\ndescription: Bad\n---\nEvil") + (agents_dir / "escape.md").symlink_to(external / "escape.md") + + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"p1@aim": True}, + } + ) + ) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + import logging + + with caplog.at_level(logging.WARNING): + profiles = list_agent_profiles() + names = {p["name"] for p in profiles} + assert "safe" in names + assert "escape" not in names + assert "resolves outside plugin root" in caplog.text + + def test_symlink_within_plugin_root_accepted(self, tmp_path, monkeypatch): + """Symlink inside agents/ pointing within marketplace root is accepted.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + {"name": "aim", "plugins": [{"name": "p1", "source": "./p1", "version": "1"}]} + ) + ) + agents_dir = mkt_path / "p1" / "agents" + agents_dir.mkdir(parents=True) + # Real file + (agents_dir / "real.md").write_text("---\nname: real\ndescription: Real\n---\nBody") + # Symlink within the marketplace root (points to sibling) + (agents_dir / "alias.md").symlink_to(agents_dir / "real.md") + + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"p1@aim": True}, + } + ) + ) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + profiles = list_agent_profiles() + names = {p["name"] for p in profiles} + # Both real and alias are within the root, both accepted + assert "real" in names + assert "alias" in names From 878f315c82da003592c833283a04af75e54324d0 Mon Sep 17 00:00:00 2001 From: Lei Huang Date: Mon, 11 May 2026 23:19:42 +0000 Subject: [PATCH 7/8] perf(agent_profiles): cache claude plugin discovery with mtime-based invalidation _discover_claude_plugin_agent_dirs() is called on every list_agent_profiles() invocation from the long-running CAO server. Add a module-level cache keyed on the mtime of settings.json and each marketplace.json so repeated calls with unchanged filesystem state return instantly. Cache invalidation is automatic: any mtime change on settings.json triggers a full re-discovery, and mtime changes on individual marketplace.json files are also detected. Expose _reset_plugin_discovery_cache() (underscore-prefixed, test-only) so tests can isolate from each other. An autouse fixture in the test file ensures no cross-test cache pollution. Addresses reviewer comment about repeated filesystem reads on every invocation. --- .../utils/agent_profiles.py | 59 ++++++++- test/utils/test_agent_profiles.py | 26 +++- test/utils/test_claude_plugin_discovery.py | 120 +++++++++++++++++- 3 files changed, 194 insertions(+), 11 deletions(-) diff --git a/src/cli_agent_orchestrator/utils/agent_profiles.py b/src/cli_agent_orchestrator/utils/agent_profiles.py index acebdb2f2..82f1d771d 100644 --- a/src/cli_agent_orchestrator/utils/agent_profiles.py +++ b/src/cli_agent_orchestrator/utils/agent_profiles.py @@ -2,10 +2,11 @@ import json import logging +import os import re from importlib import resources from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional, Tuple import frontmatter @@ -144,26 +145,71 @@ def _scan_plugin_directory( } -def _discover_claude_plugin_agent_dirs() -> List[tuple[Path, Path]]: +_PLUGIN_DIR_CACHE: Optional[Dict] = None + + +def _reset_plugin_discovery_cache() -> None: + """Clear the plugin discovery cache. Intended for test use only.""" + global _PLUGIN_DIR_CACHE + _PLUGIN_DIR_CACHE = None + + +def _get_mtime(path: Path) -> Optional[float]: + """Return mtime of *path*, or None if it cannot be stat'd.""" + try: + return os.stat(path).st_mtime + except OSError: + return None + + +def _discover_claude_plugin_agent_dirs() -> List[Tuple[Path, Path]]: """Walk Claude Code marketplaces and return agent dirs from enabled plugins. Returns a list of ``(agents_dir, marketplace_root)`` tuples so callers can perform per-file path containment against the marketplace root. + + Results are cached at module level and invalidated when the mtime of + settings.json or any marketplace.json changes. """ + global _PLUGIN_DIR_CACHE + settings_path = Path.home() / ".claude" / "settings.json" + settings_mtime = _get_mtime(settings_path) + + # Fast path: if settings.json mtime hasn't changed, check full cache key + if _PLUGIN_DIR_CACHE is not None and _PLUGIN_DIR_CACHE["settings_mtime"] == settings_mtime: + # Verify marketplace.json mtimes haven't changed either + if all(_get_mtime(p) == m for p, m in _PLUGIN_DIR_CACHE["marketplace_mtimes"]): + return _PLUGIN_DIR_CACHE["value"] + + # Cache miss — perform full discovery + result, marketplace_mtimes = _compute_plugin_discovery(settings_path) + _PLUGIN_DIR_CACHE = { + "settings_mtime": settings_mtime, + "marketplace_mtimes": marketplace_mtimes, + "value": result, + } + return result + + +def _compute_plugin_discovery( + settings_path: Path, +) -> Tuple[List[Tuple[Path, Path]], List[Tuple[Path, Optional[float]]]]: + """Perform the actual plugin discovery and return (result, marketplace_mtimes).""" try: data = json.loads(settings_path.read_text()) except (json.JSONDecodeError, OSError): if settings_path.exists(): logger.warning("Failed to read %s", settings_path) - return [] + return [], [] marketplaces = data.get("extraKnownMarketplaces", {}) enabled_plugins = data.get("enabledPlugins", {}) if not marketplaces: - return [] + return [], [] - agent_dirs: List[tuple[Path, Path]] = [] + agent_dirs: List[Tuple[Path, Path]] = [] + marketplace_mtimes: List[Tuple[Path, Optional[float]]] = [] for mkt_name, mkt_config in marketplaces.items(): source = mkt_config.get("source", {}) if source.get("source") != "directory": @@ -173,6 +219,7 @@ def _discover_claude_plugin_agent_dirs() -> List[tuple[Path, Path]]: continue marketplace_json = mkt_path / ".claude-plugin" / "marketplace.json" + marketplace_mtimes.append((marketplace_json, _get_mtime(marketplace_json))) try: mkt_data = json.loads(marketplace_json.read_text()) except (json.JSONDecodeError, OSError): @@ -200,7 +247,7 @@ def _discover_claude_plugin_agent_dirs() -> List[tuple[Path, Path]]: if agents_dir.is_dir(): agent_dirs.append((agents_dir, resolved_mkt)) - return agent_dirs + return agent_dirs, marketplace_mtimes def list_agent_profiles() -> List[Dict]: diff --git a/test/utils/test_agent_profiles.py b/test/utils/test_agent_profiles.py index 88c5ab88f..af9f6ac13 100644 --- a/test/utils/test_agent_profiles.py +++ b/test/utils/test_agent_profiles.py @@ -286,12 +286,21 @@ def fake_scan(directory, source_label, profiles): @patch("cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.services.settings_service.get_agent_dirs", return_value={}) - @patch("cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", return_value=[]) + @patch( + "cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", + return_value=[], + ) @patch("cli_agent_orchestrator.utils.agent_profiles._scan_directory") @patch("cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR") @patch("cli_agent_orchestrator.utils.agent_profiles.resources") def test_list_agent_profiles_includes_builtin_profiles( - self, mock_resources, mock_local_dir, mock_scan, mock_discover, mock_get_agent_dirs, mock_get_extra_dirs + self, + mock_resources, + mock_local_dir, + mock_scan, + mock_discover, + mock_get_agent_dirs, + mock_get_extra_dirs, ): """Test that built-in profiles are included and marked with source 'built-in'.""" from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles @@ -407,12 +416,21 @@ def track_scan(directory, source_label, profiles): @patch("cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.services.settings_service.get_agent_dirs", return_value={}) - @patch("cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", return_value=[]) + @patch( + "cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", + return_value=[], + ) @patch("cli_agent_orchestrator.utils.agent_profiles._scan_directory") @patch("cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR") @patch("cli_agent_orchestrator.utils.agent_profiles.resources") def test_list_agent_profiles_returns_sorted_by_name( - self, mock_resources, mock_local_dir, mock_scan, mock_discover, mock_get_agent_dirs, mock_get_extra_dirs + self, + mock_resources, + mock_local_dir, + mock_scan, + mock_discover, + mock_get_agent_dirs, + mock_get_extra_dirs, ): """Test that returned profiles are sorted alphabetically by name.""" from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles diff --git a/test/utils/test_claude_plugin_discovery.py b/test/utils/test_claude_plugin_discovery.py index 06b7a3f87..ac6a03097 100644 --- a/test/utils/test_claude_plugin_discovery.py +++ b/test/utils/test_claude_plugin_discovery.py @@ -1,6 +1,7 @@ """Tests for Claude Code plugin marketplace auto-discovery.""" import json +import os import sys from pathlib import Path from unittest.mock import patch @@ -9,10 +10,19 @@ from cli_agent_orchestrator.utils.agent_profiles import ( _discover_claude_plugin_agent_dirs, + _reset_plugin_discovery_cache, list_agent_profiles, ) +@pytest.fixture(autouse=True) +def _reset_cache(): + """Reset the plugin discovery cache before each test.""" + _reset_plugin_discovery_cache() + yield + _reset_plugin_discovery_cache() + + def _setup_marketplace( home: Path, plugins: list, @@ -565,7 +575,9 @@ def test_agent_not_found_raises(self, tmp_path, monkeypatch): _read_agent_profile_source("nonexistent-agent") -@pytest.mark.skipif(sys.platform == "win32", reason="symlinks require elevated privileges on Windows") +@pytest.mark.skipif( + sys.platform == "win32", reason="symlinks require elevated privileges on Windows" +) class TestPluginPerFileContainment: """Tests for per-file path containment in plugin agent scanning.""" @@ -686,3 +698,109 @@ def test_symlink_within_plugin_root_accepted(self, tmp_path, monkeypatch): # Both real and alias are within the root, both accepted assert "real" in names assert "alias" in names + + +class TestPluginDiscoveryCache: + """Tests for mtime-based caching of _discover_claude_plugin_agent_dirs.""" + + def test_cache_hit_avoids_recomputation(self, tmp_path, monkeypatch): + """Second call with unchanged filesystem returns cached result without recomputing.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True}) + + # First call — populates cache + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Spy on _compute_plugin_discovery + call_count = {"n": 0} + from cli_agent_orchestrator.utils.agent_profiles import _compute_plugin_discovery + + original_compute = _compute_plugin_discovery + + def counting_compute(*args, **kwargs): + call_count["n"] += 1 + return original_compute(*args, **kwargs) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles._compute_plugin_discovery", + counting_compute, + ) + + # Second call — should hit cache + result2 = _discover_claude_plugin_agent_dirs() + assert result2 == result1 + assert call_count["n"] == 0 + + def test_cache_invalidated_on_settings_mtime_change(self, tmp_path, monkeypatch): + """Touching settings.json invalidates the cache.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True}) + + # First call + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Touch settings.json to bump mtime + settings_path = tmp_path / ".claude" / "settings.json" + import time + + time.sleep(0.01) + os.utime(settings_path, None) + + # Spy on _compute_plugin_discovery + call_count = {"n": 0} + from cli_agent_orchestrator.utils.agent_profiles import _compute_plugin_discovery + + original_compute = _compute_plugin_discovery + + def counting_compute(*args, **kwargs): + call_count["n"] += 1 + return original_compute(*args, **kwargs) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles._compute_plugin_discovery", + counting_compute, + ) + + # Second call — should recompute + result2 = _discover_claude_plugin_agent_dirs() + assert call_count["n"] == 1 + assert result2 == result1 + + def test_cache_invalidated_on_marketplace_json_mtime_change(self, tmp_path, monkeypatch): + """Touching marketplace.json invalidates the cache.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + mkt_path = _setup_marketplace( + tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True} + ) + + # First call + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Touch marketplace.json + marketplace_json = mkt_path / ".claude-plugin" / "marketplace.json" + import time + + time.sleep(0.01) + os.utime(marketplace_json, None) + + # Spy on _compute_plugin_discovery + call_count = {"n": 0} + from cli_agent_orchestrator.utils.agent_profiles import _compute_plugin_discovery + + original_compute = _compute_plugin_discovery + + def counting_compute(*args, **kwargs): + call_count["n"] += 1 + return original_compute(*args, **kwargs) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles._compute_plugin_discovery", + counting_compute, + ) + + # Second call — should recompute + result2 = _discover_claude_plugin_agent_dirs() + assert call_count["n"] == 1 From 22e3e92ff9aa6f49583994e7f13a655413868c78 Mon Sep 17 00:00:00 2001 From: Lei Huang Date: Mon, 11 May 2026 23:45:24 +0000 Subject: [PATCH 8/8] test(agent_profiles): expand coverage for plugin discovery fixes Tester-agent pass adding 7 new tests and 2 mock additions, filling scenario-coverage gaps surfaced by the tester task for PR #231: Fix C (HTML-comment strip): - test_three_leading_html_comments_stripped: edge-case for 3+ blocks Fix A (per-file plugin containment): - test_read_agent_profile_source_rejects_symlink_escape / accepts_regular_plugin_file: cover the second call site (_read_agent_profile_source) end-to-end - test_symlink_escape_in_non_plugin_dir_not_blocked: regression guard asserting the scope stays narrow (claude_plugin only) Fix B (discovery cache): - test_reset_plugin_discovery_cache_clears_state: explicit reset-helper test - test_cache_invalidates_when_new_marketplace_added - test_cache_invalidates_when_marketplace_json_disappears Mocks (home-dir leakage in existing tests): - test_builtin_store_exception_handled: mock _discover_claude_plugin_agent_dirs - test_non_md_builtin_files_skipped: same Coverage on agent_profiles.py: 85%% -> 92%% Impacted test count: 73 -> 80 (with the 2 mock-added tests now stable) Full suite: 1642 / 1642 non-infra tests pass. --- test/utils/test_agent_profiles_coverage.py | 24 +- test/utils/test_claude_plugin_discovery.py | 279 +++++++++++++++++++++ 2 files changed, 301 insertions(+), 2 deletions(-) diff --git a/test/utils/test_agent_profiles_coverage.py b/test/utils/test_agent_profiles_coverage.py index 989170137..c7f45f6b1 100644 --- a/test/utils/test_agent_profiles_coverage.py +++ b/test/utils/test_agent_profiles_coverage.py @@ -300,11 +300,21 @@ def test_builtin_profile_with_parse_error_still_added( @patch("cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.services.settings_service.get_agent_dirs", return_value={}) + @patch( + "cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", + return_value=[], + ) @patch("cli_agent_orchestrator.utils.agent_profiles._scan_directory") @patch("cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR") @patch("cli_agent_orchestrator.utils.agent_profiles.resources") def test_builtin_store_exception_handled( - self, mock_resources, mock_local_dir, mock_scan, mock_get_dirs, mock_get_extra + self, + mock_resources, + mock_local_dir, + mock_scan, + mock_discover, + mock_get_dirs, + mock_get_extra, ): """Exception scanning built-in store should be caught, not crash.""" from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles @@ -368,11 +378,21 @@ def test_extra_dirs_scanned( @patch("cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.services.settings_service.get_agent_dirs", return_value={}) + @patch( + "cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", + return_value=[], + ) @patch("cli_agent_orchestrator.utils.agent_profiles._scan_directory") @patch("cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR") @patch("cli_agent_orchestrator.utils.agent_profiles.resources") def test_non_md_builtin_files_skipped( - self, mock_resources, mock_local_dir, mock_scan, mock_get_dirs, mock_get_extra + self, + mock_resources, + mock_local_dir, + mock_scan, + mock_discover, + mock_get_dirs, + mock_get_extra, ): """Non-md files in built-in store should be ignored.""" from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles diff --git a/test/utils/test_claude_plugin_discovery.py b/test/utils/test_claude_plugin_discovery.py index ac6a03097..bd89a2bfc 100644 --- a/test/utils/test_claude_plugin_discovery.py +++ b/test/utils/test_claude_plugin_discovery.py @@ -804,3 +804,282 @@ def counting_compute(*args, **kwargs): # Second call — should recompute result2 = _discover_claude_plugin_agent_dirs() assert call_count["n"] == 1 + assert result2 == result1 + + def test_reset_plugin_discovery_cache_clears_state(self, tmp_path, monkeypatch): + """Explicit test that _reset_plugin_discovery_cache() forces recomputation.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True}) + + # First call populates cache + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Explicitly clear the cache + _reset_plugin_discovery_cache() + + # Spy on _compute_plugin_discovery + call_count = {"n": 0} + from cli_agent_orchestrator.utils.agent_profiles import _compute_plugin_discovery + + original_compute = _compute_plugin_discovery + + def counting_compute(*args, **kwargs): + call_count["n"] += 1 + return original_compute(*args, **kwargs) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles._compute_plugin_discovery", + counting_compute, + ) + + # Second call should recompute because cache was reset + result2 = _discover_claude_plugin_agent_dirs() + assert call_count["n"] == 1 + assert result2 == result1 + + def test_cache_invalidates_when_new_marketplace_added(self, tmp_path, monkeypatch): + """Adding a new marketplace (new settings.json entry) triggers re-discovery.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + mkt_path = _setup_marketplace( + tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True} + ) + + # First call — one marketplace visible + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Add a second marketplace to settings.json (bumps settings.json mtime) + mkt2_path = tmp_path / ".aim" / "cc-plugins-b" + mkt2_path.mkdir(parents=True) + (mkt2_path / ".claude-plugin").mkdir() + (mkt2_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + {"name": "bim", "plugins": [{"name": "p2", "source": "./p2", "version": "1.0"}]} + ) + ) + (mkt2_path / "p2" / "agents").mkdir(parents=True) + (mkt2_path / "p2" / "agents" / "p2-agent.md").write_text( + "---\nname: p2-agent\ndescription: From bim\n---\nPrompt" + ) + # Rewrite settings.json with both marketplaces + import time + + time.sleep(0.01) + settings_path = tmp_path / ".claude" / "settings.json" + settings_path.write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}}, + "bim": {"source": {"path": str(mkt2_path), "source": "directory"}}, + }, + "enabledPlugins": {"p1@aim": True, "p2@bim": True}, + } + ) + ) + + # Second call — cache invalidated by settings.json mtime change + result2 = _discover_claude_plugin_agent_dirs() + assert len(result2) == 2 + # Both agent dirs present, by plugin_root + roots = {str(pr) for _, pr in result2} + assert str(mkt_path.resolve()) in roots + assert str(mkt2_path.resolve()) in roots + + def test_cache_invalidates_when_marketplace_json_disappears(self, tmp_path, monkeypatch): + """A marketplace.json that disappears between calls triggers re-discovery.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + mkt_path = _setup_marketplace( + tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True} + ) + + # First call — populates cache with one marketplace.json mtime + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Delete the marketplace.json (settings.json unchanged) + marketplace_json = mkt_path / ".claude-plugin" / "marketplace.json" + marketplace_json.unlink() + + # Second call — the cached mtime was a float; _get_mtime now returns None. + # Cache key no longer matches → re-discover. Result shrinks to []. + result2 = _discover_claude_plugin_agent_dirs() + assert result2 == [] + + +class TestReadAgentProfileSourcePerFileContainment: + """Fix A: _read_agent_profile_source per-file containment check for plugin agents.""" + + @pytest.mark.skipif( + sys.platform == "win32", reason="symlinks require elevated privileges on Windows" + ) + def test_read_agent_profile_source_rejects_symlink_escape(self, tmp_path, monkeypatch, caplog): + """load_agent_profile('escape') where escape.md is a symlink outside plugin root. + + NOTE: The per-file check added in Fix A to _read_agent_profile_source + (the logger.warning + continue branch) is effectively defense-in-depth + behind _safe_join, which already filters symlink escapes during the + _lookup_in_directory call. So the user-visible effect is the same + (FileNotFoundError) regardless of which layer rejects — test asserts + the correct end behavior. + """ + from cli_agent_orchestrator.utils.agent_profiles import _read_agent_profile_source + + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + {"name": "aim", "plugins": [{"name": "p1", "source": "./p1", "version": "1"}]} + ) + ) + agents_dir = mkt_path / "p1" / "agents" + agents_dir.mkdir(parents=True) + # Target outside the marketplace root + external = tmp_path / "outside" + external.mkdir() + (external / "escape.md").write_text("---\nname: escape\n---\nEvil") + (agents_dir / "escape.md").symlink_to(external / "escape.md") + + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"p1@aim": True}, + } + ) + ) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.__truediv__ = lambda self, name: MagicMock(is_file=lambda: False) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + with pytest.raises(FileNotFoundError): + _read_agent_profile_source("escape") + + def test_read_agent_profile_source_accepts_regular_plugin_file(self, tmp_path, monkeypatch): + """Sanity check: a regular (non-symlinked) plugin agent file is returned.""" + from cli_agent_orchestrator.utils.agent_profiles import _read_agent_profile_source + + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True}) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.__truediv__ = lambda self, name: MagicMock(is_file=lambda: False) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + result = _read_agent_profile_source("p1-agent") + assert "Agent from p1" in result + + +@pytest.mark.skipif( + sys.platform == "win32", reason="symlinks require elevated privileges on Windows" +) +class TestFixAScopeIsNarrow: + """Fix A stays narrow: only claude_plugin path gets per-file containment. + + A symlink escape inside a non-plugin agent dir (e.g. ~/.kiro/agents/) must + NOT be rejected, because _scan_directory (the non-plugin path) has no + containment check. If a future refactor accidentally broadens Fix A to all + callers, this test catches the regression. + """ + + def test_symlink_escape_in_non_plugin_dir_not_blocked(self, tmp_path, monkeypatch): + """Symlink in a provider dir (not a plugin) is still picked up by _scan_directory.""" + from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles + + # Provider agents dir containing a symlink that escapes + kiro_agents = tmp_path / "kiro-agents" + kiro_agents.mkdir() + external = tmp_path / "outside-kiro" + external.mkdir() + (external / "evil.md").write_text("---\nname: evil\ndescription: Escaped\n---\nOK") + (kiro_agents / "evil.md").symlink_to(external / "evil.md") + + # No plugin discovery (we want to isolate the _scan_directory path) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", + lambda: [], + ) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", + lambda: {"kiro_cli": str(kiro_agents)}, + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + profiles = list_agent_profiles() + names = {p["name"] for p in profiles} + # 'evil' IS present — the fix deliberately did not broaden the check to + # non-plugin paths. This asserts the scope stays narrow. + assert "evil" in names + + +class TestParseAgentProfileTextMultipleComments: + """Fix C extra coverage: three+ consecutive HTML comments stripped.""" + + def test_three_leading_html_comments_stripped(self): + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + text = ( + "\n" + "\n" + "\n" + "---\n" + "name: triple\n" + "description: Triple\n" + "---\n" + "body" + ) + profile = parse_agent_profile_text(text, "triple") + assert profile.name == "triple" + assert profile.description == "Triple" + assert profile.system_prompt == "body"