Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add `-o` / `--output` to `mcts doctor` and surface scan subcommands for CI artifact paths (#156, #157).
- Accept `--no-progress` on `readiness`, `fuzz`, `scan-mcp`, and surface scan subcommands for shared CI scripts (#158).
- Explain when `mcts doctor --deep` import checks are skipped (no MCP config or no `-m` module in launch args).
- Scope OAuth HTTP findings to OAuth config keys and skip fixture/data JSON during repo scans (#164).
- Classify SQL database tools separately from filesystem tools so names like `read_query` are not flagged for path traversal (#165).
- Exclude design prompt markdown under `docs/prompts/` from default instruction discovery (#162).

Expand Down
52 changes: 32 additions & 20 deletions src/mcts/analyzers/oauth_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,18 @@
"impersonate",
)

URL_PATTERN = re.compile(r"https?://[^\s\"']+", re.I)
_OAUTH_KEY_NAMES = {key.lower() for key in OAUTH_URL_KEYS}

JSON_SCAN_SKIP_DIRS = frozenset(
{
"data",
"fixtures",
"test_data",
"processed",
"__fixtures__",
"tests",
}
)


class OAuthConfigAnalyzer(BaseAnalyzer):
Expand Down Expand Up @@ -147,7 +158,7 @@ def _analyze_url(self, url: str, key_path: str, source: str, seen: set[str]) ->
return findings
seen.add(finding_key)

if parsed.scheme == "http" and host:
if parsed.scheme == "http" and host and _is_oauth_endpoint_key_path(key_path):
findings.append(
_oauth_finding(
finding_id=f"oauth-http-{abs(hash(finding_key))}",
Expand Down Expand Up @@ -496,18 +507,25 @@ def _json_files_under(root: Path) -> list[Path]:
for path in root.rglob("*.json"):
if any(part.startswith(".") for part in path.parts):
continue
if any(part in JSON_SCAN_SKIP_DIRS for part in path.parts):
continue
files.append(path)
return files[:50]


def _is_oauth_endpoint_key_path(key_path: str) -> bool:
if not key_path or key_path == "$text":
return False
leaf = key_path.rsplit(".", 1)[-1].lower()
if leaf in _OAUTH_KEY_NAMES:
return True
return "oauth" in leaf


def _extract_oauth_urls(source: str) -> list[tuple[str, str]]:
payload = _load_json(source)
if payload is None:
try:
text = Path(source).read_text(encoding="utf-8", errors="ignore")
except OSError:
return []
return _urls_from_text(text)
return []

urls: list[tuple[str, str]] = []
_walk_json(payload, "$", urls)
Expand All @@ -518,25 +536,19 @@ def _walk_json(node: object, prefix: str, out: list[tuple[str, str]]) -> None:
if isinstance(node, dict):
for key, value in node.items():
key_path = f"{prefix}.{key}"
if isinstance(value, str):
if key.lower() in {k.lower() for k in OAUTH_URL_KEYS} or "oauth" in key.lower():
if value.startswith("http"):
out.append((value, key_path))
elif value.startswith("http") and any(
marker in value.lower() for marker in ("oauth", "authorize", "token")
):
out.append((value, key_path))
else:
if (
isinstance(value, str)
and (key.lower() in _OAUTH_KEY_NAMES or "oauth" in key.lower())
and value.startswith("http")
):
out.append((value, key_path))
elif not isinstance(value, str):
_walk_json(value, key_path, out)
elif isinstance(node, list):
for index, item in enumerate(node):
_walk_json(item, f"{prefix}[{index}]", out)


def _urls_from_text(text: str) -> list[tuple[str, str]]:
return [(match.group(0), "$text") for match in URL_PATTERN.finditer(text)]


def _iter_oauth_blocks(node: object, prefix: str = "$") -> list[tuple[str, dict]]:
blocks: list[tuple[str, dict]] = []
if isinstance(node, dict):
Expand Down
81 changes: 81 additions & 0 deletions tests/test_oauth_json_scope.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Tests for OAuth URL extraction scope in repo JSON scans."""

from __future__ import annotations

import json
from pathlib import Path

from mcts.analyzers.oauth_config import OAuthConfigAnalyzer
from mcts.mcp.models import MCPServerInfo


def _server() -> MCPServerInfo:
return MCPServerInfo(name="test", tools=[], source_files={})


def test_scraped_json_http_urls_are_not_flagged_as_oauth(tmp_path: Path) -> None:
data_dir = tmp_path / "data"
data_dir.mkdir()
payload = {
"records": [
{
"homepage": "http://example.com/docs",
"callback": "http://example.com/api/oauth/callback",
}
]
}
(data_dir / "scraped.json").write_text(json.dumps(payload), encoding="utf-8")

findings = OAuthConfigAnalyzer(target=tmp_path).analyze(_server())
assert not any(f.title == "OAuth endpoint uses plaintext HTTP" for f in findings)


def test_oauth_authorization_endpoint_http_still_flagged(tmp_path: Path) -> None:
config = tmp_path / "oauth-settings.json"
config.write_text(
json.dumps(
{
"oauth": {
"authorization_endpoint": "http://auth.example.com/oauth2/authorize",
}
}
),
encoding="utf-8",
)

findings = OAuthConfigAnalyzer(target=tmp_path).analyze(_server())
assert any(
f.analyzer == "oauth_config"
and f.title == "OAuth endpoint uses plaintext HTTP"
and f.severity.value == "high"
for f in findings
)


def test_non_oauth_http_field_is_ignored(tmp_path: Path) -> None:
config = tmp_path / "metadata.json"
config.write_text(
json.dumps({"documentation": "http://example.com/guide", "version": "1.0"}),
encoding="utf-8",
)

findings = OAuthConfigAnalyzer(target=tmp_path).analyze(_server())
assert not any(f.analyzer == "oauth_config" for f in findings)


def test_fixtures_directory_json_is_skipped(tmp_path: Path) -> None:
fixtures = tmp_path / "fixtures"
fixtures.mkdir()
(fixtures / "oauth.json").write_text(
json.dumps(
{
"oauth": {
"authorization_endpoint": "http://auth.example.com/oauth2/authorize",
}
}
),
encoding="utf-8",
)

findings = OAuthConfigAnalyzer(target=tmp_path).analyze(_server())
assert not findings
Loading