From 4dfe4805251f7dd76a881c7915d791cf5d746d75 Mon Sep 17 00:00:00 2001 From: hello-args Date: Thu, 11 Jun 2026 21:45:01 +0530 Subject: [PATCH 1/2] fix(oauth): scope repo JSON URL analysis to OAuth config keys (#164) Skip fixture and data JSON during repo walks, stop regex-scraping non-OAuth fields, and only flag plaintext HTTP on OAuth endpoint keys. --- CHANGELOG.md | 1 + src/mcts/analyzers/oauth_config.py | 42 ++++++++++------ tests/test_oauth_json_scope.py | 81 ++++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 16 deletions(-) create mode 100644 tests/test_oauth_json_scope.py diff --git a/CHANGELOG.md b/CHANGELOG.md index a256777..106f32e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `-o` / `--output` to `mcts doctor` and surface scan subcommands for CI artifact paths (#156, #157). - Accept `--no-progress` on `readiness`, `fuzz`, `scan-mcp`, and surface scan subcommands for shared CI scripts (#158). - Explain when `mcts doctor --deep` import checks are skipped (no MCP config or no `-m` module in launch args). +- Scope OAuth HTTP findings to OAuth config keys and skip fixture/data JSON during repo scans (#164). ### Changed diff --git a/src/mcts/analyzers/oauth_config.py b/src/mcts/analyzers/oauth_config.py index dae0bfd..5341415 100644 --- a/src/mcts/analyzers/oauth_config.py +++ b/src/mcts/analyzers/oauth_config.py @@ -94,7 +94,18 @@ "impersonate", ) -URL_PATTERN = re.compile(r"https?://[^\s\"']+", re.I) +_OAUTH_KEY_NAMES = {key.lower() for key in OAUTH_URL_KEYS} + +JSON_SCAN_SKIP_DIRS = frozenset( + { + "data", + "fixtures", + "test_data", + "processed", + "__fixtures__", + "tests", + } +) class OAuthConfigAnalyzer(BaseAnalyzer): @@ -147,7 +158,7 @@ def _analyze_url(self, url: str, key_path: str, source: str, seen: set[str]) -> return findings seen.add(finding_key) - if parsed.scheme == "http" and host: + if parsed.scheme == "http" and host and _is_oauth_endpoint_key_path(key_path): findings.append( _oauth_finding( finding_id=f"oauth-http-{abs(hash(finding_key))}", @@ -496,18 +507,25 @@ def _json_files_under(root: Path) -> list[Path]: for path in root.rglob("*.json"): if any(part.startswith(".") for part in path.parts): continue + if any(part in JSON_SCAN_SKIP_DIRS for part in path.parts): + continue files.append(path) return files[:50] +def _is_oauth_endpoint_key_path(key_path: str) -> bool: + if not key_path or key_path == "$text": + return False + leaf = key_path.rsplit(".", 1)[-1].lower() + if leaf in _OAUTH_KEY_NAMES: + return True + return "oauth" in leaf + + def _extract_oauth_urls(source: str) -> list[tuple[str, str]]: payload = _load_json(source) if payload is None: - try: - text = Path(source).read_text(encoding="utf-8", errors="ignore") - except OSError: - return [] - return _urls_from_text(text) + return [] urls: list[tuple[str, str]] = [] _walk_json(payload, "$", urls) @@ -519,13 +537,9 @@ def _walk_json(node: object, prefix: str, out: list[tuple[str, str]]) -> None: for key, value in node.items(): key_path = f"{prefix}.{key}" if isinstance(value, str): - if key.lower() in {k.lower() for k in OAUTH_URL_KEYS} or "oauth" in key.lower(): + if key.lower() in _OAUTH_KEY_NAMES or "oauth" in key.lower(): if value.startswith("http"): out.append((value, key_path)) - elif value.startswith("http") and any( - marker in value.lower() for marker in ("oauth", "authorize", "token") - ): - out.append((value, key_path)) else: _walk_json(value, key_path, out) elif isinstance(node, list): @@ -533,10 +547,6 @@ def _walk_json(node: object, prefix: str, out: list[tuple[str, str]]) -> None: _walk_json(item, f"{prefix}[{index}]", out) -def _urls_from_text(text: str) -> list[tuple[str, str]]: - return [(match.group(0), "$text") for match in URL_PATTERN.finditer(text)] - - def _iter_oauth_blocks(node: object, prefix: str = "$") -> list[tuple[str, dict]]: blocks: list[tuple[str, dict]] = [] if isinstance(node, dict): diff --git a/tests/test_oauth_json_scope.py b/tests/test_oauth_json_scope.py new file mode 100644 index 0000000..69b5b86 --- /dev/null +++ b/tests/test_oauth_json_scope.py @@ -0,0 +1,81 @@ +"""Tests for OAuth URL extraction scope in repo JSON scans.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from mcts.analyzers.oauth_config import OAuthConfigAnalyzer +from mcts.mcp.models import MCPServerInfo + + +def _server() -> MCPServerInfo: + return MCPServerInfo(name="test", tools=[], source_files={}) + + +def test_scraped_json_http_urls_are_not_flagged_as_oauth(tmp_path: Path) -> None: + data_dir = tmp_path / "data" + data_dir.mkdir() + payload = { + "records": [ + { + "homepage": "http://example.com/docs", + "callback": "http://example.com/api/oauth/callback", + } + ] + } + (data_dir / "scraped.json").write_text(json.dumps(payload), encoding="utf-8") + + findings = OAuthConfigAnalyzer(target=tmp_path).analyze(_server()) + assert not any(f.title == "OAuth endpoint uses plaintext HTTP" for f in findings) + + +def test_oauth_authorization_endpoint_http_still_flagged(tmp_path: Path) -> None: + config = tmp_path / "oauth-settings.json" + config.write_text( + json.dumps( + { + "oauth": { + "authorization_endpoint": "http://auth.example.com/oauth2/authorize", + } + } + ), + encoding="utf-8", + ) + + findings = OAuthConfigAnalyzer(target=tmp_path).analyze(_server()) + assert any( + f.analyzer == "oauth_config" + and f.title == "OAuth endpoint uses plaintext HTTP" + and f.severity.value == "high" + for f in findings + ) + + +def test_non_oauth_http_field_is_ignored(tmp_path: Path) -> None: + config = tmp_path / "metadata.json" + config.write_text( + json.dumps({"documentation": "http://example.com/guide", "version": "1.0"}), + encoding="utf-8", + ) + + findings = OAuthConfigAnalyzer(target=tmp_path).analyze(_server()) + assert not any(f.analyzer == "oauth_config" for f in findings) + + +def test_fixtures_directory_json_is_skipped(tmp_path: Path) -> None: + fixtures = tmp_path / "fixtures" + fixtures.mkdir() + (fixtures / "oauth.json").write_text( + json.dumps( + { + "oauth": { + "authorization_endpoint": "http://auth.example.com/oauth2/authorize", + } + } + ), + encoding="utf-8", + ) + + findings = OAuthConfigAnalyzer(target=tmp_path).analyze(_server()) + assert not findings From 2b183752eb2dcbf8cbca1f7e2050764296b6078a Mon Sep 17 00:00:00 2001 From: hello-args Date: Thu, 11 Jun 2026 21:48:06 +0530 Subject: [PATCH 2/2] style(oauth): satisfy ruff SIM102 in oauth JSON URL walker --- src/mcts/analyzers/oauth_config.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/mcts/analyzers/oauth_config.py b/src/mcts/analyzers/oauth_config.py index 5341415..af58bac 100644 --- a/src/mcts/analyzers/oauth_config.py +++ b/src/mcts/analyzers/oauth_config.py @@ -536,11 +536,13 @@ def _walk_json(node: object, prefix: str, out: list[tuple[str, str]]) -> None: if isinstance(node, dict): for key, value in node.items(): key_path = f"{prefix}.{key}" - if isinstance(value, str): - if key.lower() in _OAUTH_KEY_NAMES or "oauth" in key.lower(): - if value.startswith("http"): - out.append((value, key_path)) - else: + if ( + isinstance(value, str) + and (key.lower() in _OAUTH_KEY_NAMES or "oauth" in key.lower()) + and value.startswith("http") + ): + out.append((value, key_path)) + elif not isinstance(value, str): _walk_json(value, key_path, out) elif isinstance(node, list): for index, item in enumerate(node):