diff --git a/CHANGELOG.md b/CHANGELOG.md index 4999e68d..97d67277 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Auto-discover agents from enabled Claude Code plugin marketplaces (`~/.claude/settings.json → extraKnownMarketplaces`). Default `agent_dirs.claude_code` path changed from `~/.aws/cli-agent-orchestrator/agent-store` to `~/.claude/agents/`; users with a saved `agent_dirs.claude_code` are unaffected. + ## [2.1.1] - 2026-04-28 ### Added diff --git a/docs/settings.md b/docs/settings.md index 360ee6ae..b8d24ba6 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -74,6 +74,43 @@ Add additional directories that are scanned for agent profiles across all provid } ``` +## Claude Code Plugin Marketplace Auto-Discovery + +CAO automatically discovers agent profiles from Claude Code plugin marketplaces. When plugins are installed via AIM (or any tool that registers marketplaces in Claude Code's settings), their agents appear in CAO without manual configuration. + +### How It Works + +On each profile scan, CAO reads `~/.claude/settings.json` and: + +1. Iterates `extraKnownMarketplaces` entries with `source.source == "directory"`. +2. Reads each marketplace's `.claude-plugin/marketplace.json` for the plugin list. +3. For each plugin, checks `enabledPlugins["@"]` — only enabled plugins are scanned. +4. If the plugin has an `agents/` subdirectory, its `.md` profiles are included. + +Discovered profiles appear with `source: "claude_plugin"` in `GET /agents/profiles`. + +### Precedence + +Plugin agents are scanned **after** the local store and provider directories but **before** extra custom directories. If a local agent has the same name as a plugin agent, the local one wins (first-match dedup). + +### Disabling a Plugin's CAO Visibility + +Toggle the plugin off in `~/.claude/settings.json`: + +```json +{ + "enabledPlugins": { + "MyPlugin@aim": false + } +} +``` + +CAO will stop listing that plugin's agents on the next request. + +### Security + +Plugin paths are validated to stay within their marketplace root directory. Any plugin whose resolved path escapes the marketplace root is skipped with a warning. + ## API Endpoints | Method | Endpoint | Description | diff --git a/src/cli_agent_orchestrator/services/settings_service.py b/src/cli_agent_orchestrator/services/settings_service.py index 61c01a3e..3fabc4be 100644 --- a/src/cli_agent_orchestrator/services/settings_service.py +++ b/src/cli_agent_orchestrator/services/settings_service.py @@ -15,7 +15,7 @@ _DEFAULTS = { "kiro_cli": str(Path.home() / ".kiro" / "agents"), "q_cli": str(Path.home() / ".aws" / "amazonq" / "cli-agents"), - "claude_code": str(Path.home() / ".aws" / "cli-agent-orchestrator" / "agent-store"), + "claude_code": str(Path.home() / ".claude" / "agents"), "codex": str(Path.home() / ".aws" / "cli-agent-orchestrator" / "agent-store"), "cao_installed": str(Path.home() / ".aws" / "cli-agent-orchestrator" / "agent-context"), } diff --git a/src/cli_agent_orchestrator/utils/agent_profiles.py b/src/cli_agent_orchestrator/utils/agent_profiles.py index 834a1aae..82f1d771 100644 --- a/src/cli_agent_orchestrator/utils/agent_profiles.py +++ b/src/cli_agent_orchestrator/utils/agent_profiles.py @@ -1,9 +1,12 @@ """Agent profile utilities.""" +import json import logging +import os +import re from importlib import resources from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional, Tuple import frontmatter @@ -81,6 +84,172 @@ def _scan_directory(directory: Path, source_label: str, profiles: Dict[str, Dict } +def _is_path_contained(path: Path, root: Path) -> bool: + """Return True if *path* resolves to a location inside *root*.""" + try: + path.resolve().relative_to(root.resolve()) + return True + except ValueError: + return False + + +def _scan_plugin_directory( + directory: Path, source_label: str, profiles: Dict[str, Dict], plugin_root: Path +) -> None: + """Scan a plugin directory with per-file path containment validation. + + Like ``_scan_directory`` but skips any entry whose resolved path escapes + *plugin_root*. This prevents symlinks inside a plugin's agents/ directory + from exposing files outside the plugin tree. + """ + if not directory.exists(): + return + resolved_root = plugin_root.resolve() + for item in directory.iterdir(): + if not _is_path_contained(item, resolved_root): + logger.warning( + "Plugin agent file %s resolves outside plugin root %s — skipping", + item, + resolved_root, + ) + continue + if item.is_dir(): + profile_name = item.name + desc = "" + agent_md = item / "agent.md" + if agent_md.exists() and _is_path_contained(agent_md, resolved_root): + try: + data = frontmatter.loads(agent_md.read_text()) + desc = data.metadata.get("description", "") + except Exception: + pass + if profile_name not in profiles: + profiles[profile_name] = { + "name": profile_name, + "description": desc, + "source": source_label, + } + elif item.suffix == ".md" and item.is_file(): + profile_name = item.stem + desc = "" + try: + data = frontmatter.loads(item.read_text()) + desc = data.metadata.get("description", "") + except Exception: + pass + if profile_name not in profiles: + profiles[profile_name] = { + "name": profile_name, + "description": desc, + "source": source_label, + } + + +_PLUGIN_DIR_CACHE: Optional[Dict] = None + + +def _reset_plugin_discovery_cache() -> None: + """Clear the plugin discovery cache. Intended for test use only.""" + global _PLUGIN_DIR_CACHE + _PLUGIN_DIR_CACHE = None + + +def _get_mtime(path: Path) -> Optional[float]: + """Return mtime of *path*, or None if it cannot be stat'd.""" + try: + return os.stat(path).st_mtime + except OSError: + return None + + +def _discover_claude_plugin_agent_dirs() -> List[Tuple[Path, Path]]: + """Walk Claude Code marketplaces and return agent dirs from enabled plugins. + + Returns a list of ``(agents_dir, marketplace_root)`` tuples so callers can + perform per-file path containment against the marketplace root. + + Results are cached at module level and invalidated when the mtime of + settings.json or any marketplace.json changes. + """ + global _PLUGIN_DIR_CACHE + + settings_path = Path.home() / ".claude" / "settings.json" + settings_mtime = _get_mtime(settings_path) + + # Fast path: if settings.json mtime hasn't changed, check full cache key + if _PLUGIN_DIR_CACHE is not None and _PLUGIN_DIR_CACHE["settings_mtime"] == settings_mtime: + # Verify marketplace.json mtimes haven't changed either + if all(_get_mtime(p) == m for p, m in _PLUGIN_DIR_CACHE["marketplace_mtimes"]): + return _PLUGIN_DIR_CACHE["value"] + + # Cache miss — perform full discovery + result, marketplace_mtimes = _compute_plugin_discovery(settings_path) + _PLUGIN_DIR_CACHE = { + "settings_mtime": settings_mtime, + "marketplace_mtimes": marketplace_mtimes, + "value": result, + } + return result + + +def _compute_plugin_discovery( + settings_path: Path, +) -> Tuple[List[Tuple[Path, Path]], List[Tuple[Path, Optional[float]]]]: + """Perform the actual plugin discovery and return (result, marketplace_mtimes).""" + try: + data = json.loads(settings_path.read_text()) + except (json.JSONDecodeError, OSError): + if settings_path.exists(): + logger.warning("Failed to read %s", settings_path) + return [], [] + + marketplaces = data.get("extraKnownMarketplaces", {}) + enabled_plugins = data.get("enabledPlugins", {}) + if not marketplaces: + return [], [] + + agent_dirs: List[Tuple[Path, Path]] = [] + marketplace_mtimes: List[Tuple[Path, Optional[float]]] = [] + for mkt_name, mkt_config in marketplaces.items(): + source = mkt_config.get("source", {}) + if source.get("source") != "directory": + continue + mkt_path = Path(source.get("path", "")) + if not mkt_path.is_dir(): + continue + + marketplace_json = mkt_path / ".claude-plugin" / "marketplace.json" + marketplace_mtimes.append((marketplace_json, _get_mtime(marketplace_json))) + try: + mkt_data = json.loads(marketplace_json.read_text()) + except (json.JSONDecodeError, OSError): + logger.warning("Failed to read %s", marketplace_json) + continue + + resolved_mkt = mkt_path.resolve() + for plugin in mkt_data.get("plugins", []): + plugin_name = plugin.get("name", "") + if not enabled_plugins.get(f"{plugin_name}@{mkt_name}", False): + continue + plugin_source = plugin.get("source", "") + plugin_dir = (mkt_path / plugin_source).resolve() + # Path containment check + try: + plugin_dir.relative_to(resolved_mkt) + except ValueError: + logger.warning( + "Plugin path %s escapes marketplace root %s — skipping", + plugin_dir, + resolved_mkt, + ) + continue + agents_dir = plugin_dir / "agents" + if agents_dir.is_dir(): + agent_dirs.append((agents_dir, resolved_mkt)) + + return agent_dirs, marketplace_mtimes + + def list_agent_profiles() -> List[Dict]: """Discover all available agent profiles from all configured directories. @@ -137,7 +306,11 @@ def list_agent_profiles() -> List[Dict]: continue _scan_directory(path, label, profiles) - # 4. Extra user-added directories + # 4. Claude Code plugin marketplace directories + for plugin_agents_dir, plugin_root in _discover_claude_plugin_agent_dirs(): + _scan_plugin_directory(plugin_agents_dir, "claude_plugin", profiles, plugin_root) + + # 5. Extra user-added directories for extra_dir in get_extra_agent_dirs(): _scan_directory(Path(extra_dir), "custom", profiles) @@ -146,6 +319,10 @@ def list_agent_profiles() -> List[Dict]: def parse_agent_profile_text(resolved_text: str, profile_name: str) -> AgentProfile: """Parse an AgentProfile from already-resolved markdown text.""" + # Strip leading HTML comments before the YAML frontmatter fence. + # Some profile generators (e.g. AIM) prepend blocks that + # prevent python-frontmatter from detecting the opening '---' delimiter. + resolved_text = re.sub(r"^(?:\s*)+", "", resolved_text, flags=re.DOTALL) profile_data = frontmatter.loads(resolved_text) meta = profile_data.metadata meta["system_prompt"] = profile_data.content.strip() @@ -202,6 +379,22 @@ def _lookup_in_directory(directory: Path) -> str | None: if found is not None: return found + for plugin_agents_dir, plugin_root in _discover_claude_plugin_agent_dirs(): + found = _lookup_in_directory(plugin_agents_dir) + if found is not None: + # Verify the resolved file stays inside the plugin root + flat = _safe_join(plugin_agents_dir, f"{agent_name}.md") + nested = _safe_join(plugin_agents_dir, agent_name, "agent.md") + candidate = flat if (flat is not None and flat.exists()) else nested + if candidate is not None and _is_path_contained(candidate, plugin_root): + return found + logger.warning( + "Plugin agent file for '%s' resolves outside plugin root %s — skipping", + agent_name, + plugin_root, + ) + continue + for extra_dir in get_extra_agent_dirs(): found = _lookup_in_directory(Path(extra_dir)) if found is not None: diff --git a/test/utils/test_agent_profiles.py b/test/utils/test_agent_profiles.py index 7e5b3eb8..af9f6ac1 100644 --- a/test/utils/test_agent_profiles.py +++ b/test/utils/test_agent_profiles.py @@ -286,11 +286,21 @@ def fake_scan(directory, source_label, profiles): @patch("cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.services.settings_service.get_agent_dirs", return_value={}) + @patch( + "cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", + return_value=[], + ) @patch("cli_agent_orchestrator.utils.agent_profiles._scan_directory") @patch("cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR") @patch("cli_agent_orchestrator.utils.agent_profiles.resources") def test_list_agent_profiles_includes_builtin_profiles( - self, mock_resources, mock_local_dir, mock_scan, mock_get_agent_dirs, mock_get_extra_dirs + self, + mock_resources, + mock_local_dir, + mock_scan, + mock_discover, + mock_get_agent_dirs, + mock_get_extra_dirs, ): """Test that built-in profiles are included and marked with source 'built-in'.""" from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles @@ -406,11 +416,21 @@ def track_scan(directory, source_label, profiles): @patch("cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.services.settings_service.get_agent_dirs", return_value={}) + @patch( + "cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", + return_value=[], + ) @patch("cli_agent_orchestrator.utils.agent_profiles._scan_directory") @patch("cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR") @patch("cli_agent_orchestrator.utils.agent_profiles.resources") def test_list_agent_profiles_returns_sorted_by_name( - self, mock_resources, mock_local_dir, mock_scan, mock_get_agent_dirs, mock_get_extra_dirs + self, + mock_resources, + mock_local_dir, + mock_scan, + mock_discover, + mock_get_agent_dirs, + mock_get_extra_dirs, ): """Test that returned profiles are sorted alphabetically by name.""" from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles @@ -622,3 +642,69 @@ def test_load_agent_profile_builtin_store_fallback_resolves_vars( assert profile.system_prompt == "Body token: builtin-secret" assert profile.mcpServers is not None assert profile.mcpServers["service"]["env"]["API_TOKEN"] == "builtin-secret" + + +class TestParseAgentProfileTextHtmlCommentStrip: + """Regression tests for HTML-comment stripping before frontmatter parsing (03e6d35).""" + + def test_leading_html_comment_stripped_before_frontmatter(self): + """Profile with a leading comment parses correctly.""" + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + text = "\n---\nname: my-agent\ndescription: Test\n---\nPrompt" + profile = parse_agent_profile_text(text, "my-agent") + + assert profile.name == "my-agent" + assert profile.description == "Test" + assert profile.system_prompt == "Prompt" + + def test_no_comment_passthrough(self): + """Profile without a leading comment parses identically (strip is a no-op).""" + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + text = "---\nname: plain\ndescription: Plain\n---\nPlain prompt" + profile = parse_agent_profile_text(text, "plain") + + assert profile.name == "plain" + assert profile.description == "Plain" + assert profile.system_prompt == "Plain prompt" + + def test_multiline_html_comment_stripped(self): + """Multiline comment is stripped (re.DOTALL behavior).""" + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + text = "\n---\nname: multi\ndescription: Multi\n---\nBody" + profile = parse_agent_profile_text(text, "multi") + + assert profile.name == "multi" + assert profile.description == "Multi" + assert profile.system_prompt == "Body" + + def test_non_leading_comment_not_stripped(self): + """Comment that is NOT at the start of text should not be stripped.""" + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + # The text starts with frontmatter, comment appears in the body — should remain. + text = "---\nname: keep\ndescription: Keep\n---\n\nBody" + profile = parse_agent_profile_text(text, "keep") + + assert "" in profile.system_prompt + + def test_multiple_leading_html_comments_stripped(self): + """Multiple consecutive leading blocks are all stripped.""" + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + text = ( + "\n" + "\n" + "---\n" + "name: test-agent\n" + "description: Multi-comment\n" + "---\n" + "body" + ) + profile = parse_agent_profile_text(text, "test-agent") + + assert profile.name == "test-agent" + assert profile.description == "Multi-comment" + assert profile.system_prompt == "body" diff --git a/test/utils/test_agent_profiles_coverage.py b/test/utils/test_agent_profiles_coverage.py index 98917013..c7f45f6b 100644 --- a/test/utils/test_agent_profiles_coverage.py +++ b/test/utils/test_agent_profiles_coverage.py @@ -300,11 +300,21 @@ def test_builtin_profile_with_parse_error_still_added( @patch("cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.services.settings_service.get_agent_dirs", return_value={}) + @patch( + "cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", + return_value=[], + ) @patch("cli_agent_orchestrator.utils.agent_profiles._scan_directory") @patch("cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR") @patch("cli_agent_orchestrator.utils.agent_profiles.resources") def test_builtin_store_exception_handled( - self, mock_resources, mock_local_dir, mock_scan, mock_get_dirs, mock_get_extra + self, + mock_resources, + mock_local_dir, + mock_scan, + mock_discover, + mock_get_dirs, + mock_get_extra, ): """Exception scanning built-in store should be caught, not crash.""" from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles @@ -368,11 +378,21 @@ def test_extra_dirs_scanned( @patch("cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", return_value=[]) @patch("cli_agent_orchestrator.services.settings_service.get_agent_dirs", return_value={}) + @patch( + "cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", + return_value=[], + ) @patch("cli_agent_orchestrator.utils.agent_profiles._scan_directory") @patch("cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR") @patch("cli_agent_orchestrator.utils.agent_profiles.resources") def test_non_md_builtin_files_skipped( - self, mock_resources, mock_local_dir, mock_scan, mock_get_dirs, mock_get_extra + self, + mock_resources, + mock_local_dir, + mock_scan, + mock_discover, + mock_get_dirs, + mock_get_extra, ): """Non-md files in built-in store should be ignored.""" from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles diff --git a/test/utils/test_claude_plugin_discovery.py b/test/utils/test_claude_plugin_discovery.py new file mode 100644 index 00000000..bd89a2bf --- /dev/null +++ b/test/utils/test_claude_plugin_discovery.py @@ -0,0 +1,1085 @@ +"""Tests for Claude Code plugin marketplace auto-discovery.""" + +import json +import os +import sys +from pathlib import Path +from unittest.mock import patch + +import pytest + +from cli_agent_orchestrator.utils.agent_profiles import ( + _discover_claude_plugin_agent_dirs, + _reset_plugin_discovery_cache, + list_agent_profiles, +) + + +@pytest.fixture(autouse=True) +def _reset_cache(): + """Reset the plugin discovery cache before each test.""" + _reset_plugin_discovery_cache() + yield + _reset_plugin_discovery_cache() + + +def _setup_marketplace( + home: Path, + plugins: list, + enabled: dict, + mkt_name="aim", + mkt_source_type="directory", + extra_settings=None, +): + """Helper to create a fake marketplace structure under a fake home.""" + claude_dir = home / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = home / ".aim" / "cc-plugins" + mkt_path.mkdir(parents=True) + + settings = { + "extraKnownMarketplaces": { + mkt_name: {"source": {"path": str(mkt_path), "source": mkt_source_type}} + }, + "enabledPlugins": enabled, + } + if extra_settings: + settings.update(extra_settings) + (claude_dir / "settings.json").write_text(json.dumps(settings)) + + # Create marketplace.json + mkt_meta_dir = mkt_path / ".claude-plugin" + mkt_meta_dir.mkdir(parents=True) + mkt_json_plugins = [] + for p in plugins: + name = p["name"] + source = p.get("source", f"./{name}") + mkt_json_plugins.append({"name": name, "source": source, "version": "1.0"}) + # Create plugin agents dir if requested + if p.get("has_agents", False): + agents_dir = mkt_path / source.lstrip("./") / "agents" + agents_dir.mkdir(parents=True) + (agents_dir / f"{name}-agent.md").write_text( + f"---\nname: {name}-agent\ndescription: Agent from {name}\n---\nPrompt" + ) + (mkt_meta_dir / "marketplace.json").write_text( + json.dumps({"name": mkt_name, "plugins": mkt_json_plugins}) + ) + return mkt_path + + +class TestDiscoverClaudePluginAgentDirs: + """Unit tests for _discover_claude_plugin_agent_dirs.""" + + def test_settings_json_missing(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_no_extra_known_marketplaces(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + (claude_dir / "settings.json").write_text(json.dumps({"someOtherKey": True})) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_malformed_settings_json(self, tmp_path, monkeypatch, caplog): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + (claude_dir / "settings.json").write_text("not valid json{{{") + import logging + + with caplog.at_level(logging.WARNING): + result = _discover_claude_plugin_agent_dirs() + assert result == [] + assert "Failed to read" in caplog.text + + def test_non_directory_source_skipped(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace( + tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True}, mkt_source_type="git" + ) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_marketplace_path_not_exists(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": "/nonexistent/path", "source": "directory"}} + }, + "enabledPlugins": {}, + } + ) + ) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_marketplace_json_missing(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / ".aim" / "cc-plugins" + mkt_path.mkdir(parents=True) + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"p1@aim": True}, + } + ) + ) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_plugin_not_enabled_skipped(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": False}) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_plugin_enabled_agents_dir_exists(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + mkt_path = _setup_marketplace( + tmp_path, [{"name": "myplugin", "has_agents": True}], {"myplugin@aim": True} + ) + result = _discover_claude_plugin_agent_dirs() + assert len(result) == 1 + agents_dir, plugin_root = result[0] + assert agents_dir == (mkt_path / "myplugin" / "agents").resolve() + assert plugin_root == mkt_path.resolve() + + def test_plugin_enabled_agents_dir_missing(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + # Plugin exists but no agents/ subdir + _setup_marketplace( + tmp_path, [{"name": "noagents", "has_agents": False}], {"noagents@aim": True} + ) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_multiple_marketplaces_multiple_plugins(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + + # Marketplace 1 + mkt1 = tmp_path / "mkt1" + mkt1.mkdir() + (mkt1 / ".claude-plugin").mkdir() + (mkt1 / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "m1", + "plugins": [ + {"name": "a", "source": "./a", "version": "1"}, + {"name": "b", "source": "./b", "version": "1"}, + ], + } + ) + ) + (mkt1 / "a" / "agents").mkdir(parents=True) + (mkt1 / "a" / "agents" / "a-agent.md").write_text("---\nname: a-agent\n---\n") + (mkt1 / "b" / "agents").mkdir(parents=True) + (mkt1 / "b" / "agents" / "b-agent.md").write_text("---\nname: b-agent\n---\n") + + # Marketplace 2 + mkt2 = tmp_path / "mkt2" + mkt2.mkdir() + (mkt2 / ".claude-plugin").mkdir() + (mkt2 / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "m2", + "plugins": [ + {"name": "c", "source": "./c", "version": "1"}, + ], + } + ) + ) + (mkt2 / "c" / "agents").mkdir(parents=True) + (mkt2 / "c" / "agents" / "c-agent.md").write_text("---\nname: c-agent\n---\n") + + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "m1": {"source": {"path": str(mkt1), "source": "directory"}}, + "m2": {"source": {"path": str(mkt2), "source": "directory"}}, + }, + "enabledPlugins": {"a@m1": True, "b@m1": True, "c@m2": True}, + } + ) + ) + + result = _discover_claude_plugin_agent_dirs() + assert len(result) == 3 + + def test_path_traversal_blocked(self, tmp_path, monkeypatch, caplog): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "evil", + "plugins": [ + {"name": "bad", "source": "../../../../etc", "version": "1"}, + ], + } + ) + ) + # Create agents dir at the traversal target to ensure it would match + # if containment check didn't exist + (tmp_path / "etc" / "agents").mkdir(parents=True, exist_ok=True) + + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "evil": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"bad@evil": True}, + } + ) + ) + + import logging + + with caplog.at_level(logging.WARNING): + result = _discover_claude_plugin_agent_dirs() + assert result == [] + assert "escapes marketplace root" in caplog.text + + +class TestListAgentProfilesPluginIntegration: + """Integration tests: plugin agents appear in list_agent_profiles.""" + + def test_plugin_agent_appears_with_claude_plugin_source(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + mkt_path = _setup_marketplace( + tmp_path, [{"name": "testpkg", "has_agents": True}], {"testpkg@aim": True} + ) + # Patch out other sources + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + result = list_agent_profiles() + names = {p["name"] for p in result} + assert "testpkg-agent" in names + plugin_profile = next(p for p in result if p["name"] == "testpkg-agent") + assert plugin_profile["source"] == "claude_plugin" + + def test_local_agent_beats_plugin_agent_in_dedup(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "mypkg", "has_agents": True}], {"mypkg@aim": True}) + # Create a local agent with the same name as the plugin agent + local_store = tmp_path / "local-store" + local_store.mkdir() + (local_store / "mypkg-agent.md").write_text( + "---\nname: mypkg-agent\ndescription: Local override\n---\nLocal prompt" + ) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", local_store + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + result = list_agent_profiles() + plugin_profile = next(p for p in result if p["name"] == "mypkg-agent") + assert plugin_profile["source"] == "local" + + +class TestDiscoverEdgeCases: + """Additional edge-case tests for _discover_claude_plugin_agent_dirs.""" + + def test_empty_enabled_plugins_map(self, tmp_path, monkeypatch): + """enabledPlugins present but empty → no dirs returned.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], enabled={}) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_orphan_enabled_plugin_no_matching_marketplace(self, tmp_path, monkeypatch): + """enabledPlugins has plugin@X but no marketplace named X.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace( + tmp_path, + [{"name": "p1", "has_agents": True}], + enabled={"p1@nonexistent": True}, + mkt_name="aim", + ) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_plugin_source_is_file_not_directory(self, tmp_path, monkeypatch): + """Plugin source resolves to a regular file — agents/ can't exist.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + # Create a file (not dir) at the plugin source path + (mkt_path / "plugin-file").write_text("I am a file") + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "aim", + "plugins": [{"name": "fp", "source": "./plugin-file", "version": "1"}], + } + ) + ) + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"fp@aim": True}, + } + ) + ) + assert _discover_claude_plugin_agent_dirs() == [] + + def test_cross_marketplace_name_collision_first_wins(self, tmp_path, monkeypatch): + """Two marketplaces each have a plugin with same agent name — first scanned wins.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + + # Marketplace A (listed first in dict) + mkt_a = tmp_path / "mkt_a" + mkt_a.mkdir() + (mkt_a / ".claude-plugin").mkdir() + (mkt_a / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "alpha", + "plugins": [{"name": "shared", "source": "./shared", "version": "1"}], + } + ) + ) + (mkt_a / "shared" / "agents").mkdir(parents=True) + (mkt_a / "shared" / "agents" / "dup.md").write_text( + "---\nname: dup\ndescription: from alpha\n---\nAlpha prompt" + ) + + # Marketplace B + mkt_b = tmp_path / "mkt_b" + mkt_b.mkdir() + (mkt_b / ".claude-plugin").mkdir() + (mkt_b / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + { + "name": "beta", + "plugins": [{"name": "shared", "source": "./shared", "version": "1"}], + } + ) + ) + (mkt_b / "shared" / "agents").mkdir(parents=True) + (mkt_b / "shared" / "agents" / "dup.md").write_text( + "---\nname: dup\ndescription: from beta\n---\nBeta prompt" + ) + + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "alpha": {"source": {"path": str(mkt_a), "source": "directory"}}, + "beta": {"source": {"path": str(mkt_b), "source": "directory"}}, + }, + "enabledPlugins": {"shared@alpha": True, "shared@beta": True}, + } + ) + ) + + # Both dirs returned — dedup happens at list_agent_profiles level + result = _discover_claude_plugin_agent_dirs() + assert len(result) == 2 + + # Now test that list_agent_profiles dedup picks first + from unittest.mock import MagicMock + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + profiles = list_agent_profiles() + dup_profile = next(p for p in profiles if p["name"] == "dup") + assert dup_profile["description"] == "from alpha" + + def test_symlink_escape_in_agents_dir(self, tmp_path, monkeypatch, caplog): + """Symlink inside agents/ pointing outside plugin root — file is now blocked. + + Per-file containment check rejects files that resolve outside the + marketplace root. + """ + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + {"name": "aim", "plugins": [{"name": "sym", "source": "./sym", "version": "1"}]} + ) + ) + (mkt_path / "sym" / "agents").mkdir(parents=True) + + # Create an external file and symlink to it from agents/ + external = tmp_path / "external" + external.mkdir() + (external / "evil.md").write_text("---\nname: evil\ndescription: escaped\n---\nEvil") + (mkt_path / "sym" / "agents" / "evil.md").symlink_to(external / "evil.md") + + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"sym@aim": True}, + } + ) + ) + + # The agents dir IS returned (plugin dir containment passes) + result = _discover_claude_plugin_agent_dirs() + assert len(result) == 1 + # The symlinked file is now blocked by per-file containment + from unittest.mock import MagicMock + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + import logging + + with caplog.at_level(logging.WARNING): + profiles = list_agent_profiles() + names = {p["name"] for p in profiles} + assert "evil" not in names + assert "resolves outside plugin root" in caplog.text + + +class TestReadAgentProfileSourcePluginIntegration: + """Integration: _read_agent_profile_source falls through plugin dirs.""" + + def test_agent_found_in_plugin_dir(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + from cli_agent_orchestrator.utils.agent_profiles import _read_agent_profile_source + + _setup_marketplace(tmp_path, [{"name": "pkg", "has_agents": True}], {"pkg@aim": True}) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.__truediv__ = lambda self, name: MagicMock(is_file=lambda: False) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + result = _read_agent_profile_source("pkg-agent") + assert "Agent from pkg" in result + + def test_agent_not_found_raises(self, tmp_path, monkeypatch): + monkeypatch.setattr(Path, "home", lambda: tmp_path) + from cli_agent_orchestrator.utils.agent_profiles import _read_agent_profile_source + + _setup_marketplace(tmp_path, [{"name": "pkg", "has_agents": True}], {"pkg@aim": True}) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.__truediv__ = lambda self, name: MagicMock(is_file=lambda: False) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + with pytest.raises(FileNotFoundError): + _read_agent_profile_source("nonexistent-agent") + + +@pytest.mark.skipif( + sys.platform == "win32", reason="symlinks require elevated privileges on Windows" +) +class TestPluginPerFileContainment: + """Tests for per-file path containment in plugin agent scanning.""" + + def test_symlink_outside_plugin_root_rejected(self, tmp_path, monkeypatch, caplog): + """Symlink inside agents/ pointing outside marketplace root is skipped with warning.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + {"name": "aim", "plugins": [{"name": "p1", "source": "./p1", "version": "1"}]} + ) + ) + agents_dir = mkt_path / "p1" / "agents" + agents_dir.mkdir(parents=True) + # Real file inside plugin root + (agents_dir / "safe.md").write_text("---\nname: safe\ndescription: Safe\n---\nOK") + # Symlink escaping to outside + external = tmp_path / "outside" + external.mkdir() + (external / "escape.md").write_text("---\nname: escape\ndescription: Bad\n---\nEvil") + (agents_dir / "escape.md").symlink_to(external / "escape.md") + + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"p1@aim": True}, + } + ) + ) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + import logging + + with caplog.at_level(logging.WARNING): + profiles = list_agent_profiles() + names = {p["name"] for p in profiles} + assert "safe" in names + assert "escape" not in names + assert "resolves outside plugin root" in caplog.text + + def test_symlink_within_plugin_root_accepted(self, tmp_path, monkeypatch): + """Symlink inside agents/ pointing within marketplace root is accepted.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + {"name": "aim", "plugins": [{"name": "p1", "source": "./p1", "version": "1"}]} + ) + ) + agents_dir = mkt_path / "p1" / "agents" + agents_dir.mkdir(parents=True) + # Real file + (agents_dir / "real.md").write_text("---\nname: real\ndescription: Real\n---\nBody") + # Symlink within the marketplace root (points to sibling) + (agents_dir / "alias.md").symlink_to(agents_dir / "real.md") + + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"p1@aim": True}, + } + ) + ) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + profiles = list_agent_profiles() + names = {p["name"] for p in profiles} + # Both real and alias are within the root, both accepted + assert "real" in names + assert "alias" in names + + +class TestPluginDiscoveryCache: + """Tests for mtime-based caching of _discover_claude_plugin_agent_dirs.""" + + def test_cache_hit_avoids_recomputation(self, tmp_path, monkeypatch): + """Second call with unchanged filesystem returns cached result without recomputing.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True}) + + # First call — populates cache + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Spy on _compute_plugin_discovery + call_count = {"n": 0} + from cli_agent_orchestrator.utils.agent_profiles import _compute_plugin_discovery + + original_compute = _compute_plugin_discovery + + def counting_compute(*args, **kwargs): + call_count["n"] += 1 + return original_compute(*args, **kwargs) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles._compute_plugin_discovery", + counting_compute, + ) + + # Second call — should hit cache + result2 = _discover_claude_plugin_agent_dirs() + assert result2 == result1 + assert call_count["n"] == 0 + + def test_cache_invalidated_on_settings_mtime_change(self, tmp_path, monkeypatch): + """Touching settings.json invalidates the cache.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True}) + + # First call + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Touch settings.json to bump mtime + settings_path = tmp_path / ".claude" / "settings.json" + import time + + time.sleep(0.01) + os.utime(settings_path, None) + + # Spy on _compute_plugin_discovery + call_count = {"n": 0} + from cli_agent_orchestrator.utils.agent_profiles import _compute_plugin_discovery + + original_compute = _compute_plugin_discovery + + def counting_compute(*args, **kwargs): + call_count["n"] += 1 + return original_compute(*args, **kwargs) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles._compute_plugin_discovery", + counting_compute, + ) + + # Second call — should recompute + result2 = _discover_claude_plugin_agent_dirs() + assert call_count["n"] == 1 + assert result2 == result1 + + def test_cache_invalidated_on_marketplace_json_mtime_change(self, tmp_path, monkeypatch): + """Touching marketplace.json invalidates the cache.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + mkt_path = _setup_marketplace( + tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True} + ) + + # First call + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Touch marketplace.json + marketplace_json = mkt_path / ".claude-plugin" / "marketplace.json" + import time + + time.sleep(0.01) + os.utime(marketplace_json, None) + + # Spy on _compute_plugin_discovery + call_count = {"n": 0} + from cli_agent_orchestrator.utils.agent_profiles import _compute_plugin_discovery + + original_compute = _compute_plugin_discovery + + def counting_compute(*args, **kwargs): + call_count["n"] += 1 + return original_compute(*args, **kwargs) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles._compute_plugin_discovery", + counting_compute, + ) + + # Second call — should recompute + result2 = _discover_claude_plugin_agent_dirs() + assert call_count["n"] == 1 + assert result2 == result1 + + def test_reset_plugin_discovery_cache_clears_state(self, tmp_path, monkeypatch): + """Explicit test that _reset_plugin_discovery_cache() forces recomputation.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True}) + + # First call populates cache + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Explicitly clear the cache + _reset_plugin_discovery_cache() + + # Spy on _compute_plugin_discovery + call_count = {"n": 0} + from cli_agent_orchestrator.utils.agent_profiles import _compute_plugin_discovery + + original_compute = _compute_plugin_discovery + + def counting_compute(*args, **kwargs): + call_count["n"] += 1 + return original_compute(*args, **kwargs) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles._compute_plugin_discovery", + counting_compute, + ) + + # Second call should recompute because cache was reset + result2 = _discover_claude_plugin_agent_dirs() + assert call_count["n"] == 1 + assert result2 == result1 + + def test_cache_invalidates_when_new_marketplace_added(self, tmp_path, monkeypatch): + """Adding a new marketplace (new settings.json entry) triggers re-discovery.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + mkt_path = _setup_marketplace( + tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True} + ) + + # First call — one marketplace visible + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Add a second marketplace to settings.json (bumps settings.json mtime) + mkt2_path = tmp_path / ".aim" / "cc-plugins-b" + mkt2_path.mkdir(parents=True) + (mkt2_path / ".claude-plugin").mkdir() + (mkt2_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + {"name": "bim", "plugins": [{"name": "p2", "source": "./p2", "version": "1.0"}]} + ) + ) + (mkt2_path / "p2" / "agents").mkdir(parents=True) + (mkt2_path / "p2" / "agents" / "p2-agent.md").write_text( + "---\nname: p2-agent\ndescription: From bim\n---\nPrompt" + ) + # Rewrite settings.json with both marketplaces + import time + + time.sleep(0.01) + settings_path = tmp_path / ".claude" / "settings.json" + settings_path.write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}}, + "bim": {"source": {"path": str(mkt2_path), "source": "directory"}}, + }, + "enabledPlugins": {"p1@aim": True, "p2@bim": True}, + } + ) + ) + + # Second call — cache invalidated by settings.json mtime change + result2 = _discover_claude_plugin_agent_dirs() + assert len(result2) == 2 + # Both agent dirs present, by plugin_root + roots = {str(pr) for _, pr in result2} + assert str(mkt_path.resolve()) in roots + assert str(mkt2_path.resolve()) in roots + + def test_cache_invalidates_when_marketplace_json_disappears(self, tmp_path, monkeypatch): + """A marketplace.json that disappears between calls triggers re-discovery.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + mkt_path = _setup_marketplace( + tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True} + ) + + # First call — populates cache with one marketplace.json mtime + result1 = _discover_claude_plugin_agent_dirs() + assert len(result1) == 1 + + # Delete the marketplace.json (settings.json unchanged) + marketplace_json = mkt_path / ".claude-plugin" / "marketplace.json" + marketplace_json.unlink() + + # Second call — the cached mtime was a float; _get_mtime now returns None. + # Cache key no longer matches → re-discover. Result shrinks to []. + result2 = _discover_claude_plugin_agent_dirs() + assert result2 == [] + + +class TestReadAgentProfileSourcePerFileContainment: + """Fix A: _read_agent_profile_source per-file containment check for plugin agents.""" + + @pytest.mark.skipif( + sys.platform == "win32", reason="symlinks require elevated privileges on Windows" + ) + def test_read_agent_profile_source_rejects_symlink_escape(self, tmp_path, monkeypatch, caplog): + """load_agent_profile('escape') where escape.md is a symlink outside plugin root. + + NOTE: The per-file check added in Fix A to _read_agent_profile_source + (the logger.warning + continue branch) is effectively defense-in-depth + behind _safe_join, which already filters symlink escapes during the + _lookup_in_directory call. So the user-visible effect is the same + (FileNotFoundError) regardless of which layer rejects — test asserts + the correct end behavior. + """ + from cli_agent_orchestrator.utils.agent_profiles import _read_agent_profile_source + + monkeypatch.setattr(Path, "home", lambda: tmp_path) + claude_dir = tmp_path / ".claude" + claude_dir.mkdir(parents=True) + mkt_path = tmp_path / "mkt" + mkt_path.mkdir() + (mkt_path / ".claude-plugin").mkdir() + (mkt_path / ".claude-plugin" / "marketplace.json").write_text( + json.dumps( + {"name": "aim", "plugins": [{"name": "p1", "source": "./p1", "version": "1"}]} + ) + ) + agents_dir = mkt_path / "p1" / "agents" + agents_dir.mkdir(parents=True) + # Target outside the marketplace root + external = tmp_path / "outside" + external.mkdir() + (external / "escape.md").write_text("---\nname: escape\n---\nEvil") + (agents_dir / "escape.md").symlink_to(external / "escape.md") + + (claude_dir / "settings.json").write_text( + json.dumps( + { + "extraKnownMarketplaces": { + "aim": {"source": {"path": str(mkt_path), "source": "directory"}} + }, + "enabledPlugins": {"p1@aim": True}, + } + ) + ) + + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.__truediv__ = lambda self, name: MagicMock(is_file=lambda: False) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + with pytest.raises(FileNotFoundError): + _read_agent_profile_source("escape") + + def test_read_agent_profile_source_accepts_regular_plugin_file(self, tmp_path, monkeypatch): + """Sanity check: a regular (non-symlinked) plugin agent file is returned.""" + from cli_agent_orchestrator.utils.agent_profiles import _read_agent_profile_source + + monkeypatch.setattr(Path, "home", lambda: tmp_path) + _setup_marketplace(tmp_path, [{"name": "p1", "has_agents": True}], {"p1@aim": True}) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", lambda: {} + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.__truediv__ = lambda self, name: MagicMock(is_file=lambda: False) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + result = _read_agent_profile_source("p1-agent") + assert "Agent from p1" in result + + +@pytest.mark.skipif( + sys.platform == "win32", reason="symlinks require elevated privileges on Windows" +) +class TestFixAScopeIsNarrow: + """Fix A stays narrow: only claude_plugin path gets per-file containment. + + A symlink escape inside a non-plugin agent dir (e.g. ~/.kiro/agents/) must + NOT be rejected, because _scan_directory (the non-plugin path) has no + containment check. If a future refactor accidentally broadens Fix A to all + callers, this test catches the regression. + """ + + def test_symlink_escape_in_non_plugin_dir_not_blocked(self, tmp_path, monkeypatch): + """Symlink in a provider dir (not a plugin) is still picked up by _scan_directory.""" + from cli_agent_orchestrator.utils.agent_profiles import list_agent_profiles + + # Provider agents dir containing a symlink that escapes + kiro_agents = tmp_path / "kiro-agents" + kiro_agents.mkdir() + external = tmp_path / "outside-kiro" + external.mkdir() + (external / "evil.md").write_text("---\nname: evil\ndescription: Escaped\n---\nOK") + (kiro_agents / "evil.md").symlink_to(external / "evil.md") + + # No plugin discovery (we want to isolate the _scan_directory path) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles._discover_claude_plugin_agent_dirs", + lambda: [], + ) + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.LOCAL_AGENT_STORE_DIR", + tmp_path / "empty-local", + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_agent_dirs", + lambda: {"kiro_cli": str(kiro_agents)}, + ) + monkeypatch.setattr( + "cli_agent_orchestrator.services.settings_service.get_extra_agent_dirs", lambda: [] + ) + from unittest.mock import MagicMock + + mock_store = MagicMock() + mock_store.iterdir.return_value = [] + monkeypatch.setattr( + "cli_agent_orchestrator.utils.agent_profiles.resources.files", + lambda _pkg: mock_store, + ) + + profiles = list_agent_profiles() + names = {p["name"] for p in profiles} + # 'evil' IS present — the fix deliberately did not broaden the check to + # non-plugin paths. This asserts the scope stays narrow. + assert "evil" in names + + +class TestParseAgentProfileTextMultipleComments: + """Fix C extra coverage: three+ consecutive HTML comments stripped.""" + + def test_three_leading_html_comments_stripped(self): + from cli_agent_orchestrator.utils.agent_profiles import parse_agent_profile_text + + text = ( + "\n" + "\n" + "\n" + "---\n" + "name: triple\n" + "description: Triple\n" + "---\n" + "body" + ) + profile = parse_agent_profile_text(text, "triple") + assert profile.name == "triple" + assert profile.description == "Triple" + assert profile.system_prompt == "body"