diff --git a/src/mcts/analyzers/data_leakage.py b/src/mcts/analyzers/data_leakage.py index 96b2fc3..73bc44a 100644 --- a/src/mcts/analyzers/data_leakage.py +++ b/src/mcts/analyzers/data_leakage.py @@ -40,6 +40,24 @@ HIDDEN_CHAR_PATTERN = re.compile(r"[\u200b-\u200f\ufeff\u202a-\u202e]") +LOGGING_CALL_PATTERN = re.compile( + r""" + ^\s* + (?: + print + |console\.(?:log|info|warn|warning|error|debug) + |(?:logger|logging|log)\.(?:log|info|warn|warning|error|debug|exception|critical) + |(?:self\.)?logger\.(?:log|info|warn|warning|error|debug|exception|critical) + ) + \s*\( + """, + re.VERBOSE, +) + + +def _is_logging_statement(line: str) -> bool: + return bool(LOGGING_CALL_PATTERN.search(line)) + class DataLeakageAnalyzer(BaseAnalyzer): """Scans tool metadata and source files for exposed secrets.""" @@ -101,6 +119,8 @@ def _scan_source_files(self, server: MCPServerInfo) -> list[Finding]: for label, pattern, severity in SECRET_PATTERNS: if not pattern.search(line): continue + if label == "Internal URL" and _is_logging_statement(line): + continue finding_id = f"leak-src-{file_path}-{line_no}-{label.lower().replace(' ', '-')}" if finding_id in seen: continue diff --git a/tests/test_analyzers.py b/tests/test_analyzers.py index c550a76..cb7e787 100644 --- a/tests/test_analyzers.py +++ b/tests/test_analyzers.py @@ -3,8 +3,10 @@ from pathlib import Path from mcts.analyzers.command_execution import CommandExecutionAnalyzer +from mcts.analyzers.data_leakage import DataLeakageAnalyzer from mcts.core.config import ScanConfig from mcts.discovery.static import StaticDiscovery +from mcts.mcp.models import MCPServerInfo from mcts.reporting.models import Severity @@ -24,3 +26,25 @@ def test_data_leakage_scans_source_files(example_server_path: Path) -> None: report = Scanner(ScanConfig(target=example_server_path)).run() source_findings = [f for f in report.findings if f.analyzer == "data_leakage" and f.location] assert source_findings or any(f.analyzer == "data_leakage" for f in report.findings) + + +def test_data_leakage_ignores_loopback_urls_in_log_messages() -> None: + server = MCPServerInfo( + name="perseus", + source_files={ + "mcp.py": "\n".join( + [ + "print(f'Perseus MCP SSE server listening on http://127.0.0.1:{port}')", + "print(f' SSE endpoint: http://127.0.0.1:{port}/sse')", + "logger.info('Server card: http://localhost:9000/.well-known/mcp/server-card.json')", + "callback_url = 'http://127.0.0.1:9000/message'", + ] + ) + }, + ) + + findings = DataLeakageAnalyzer().analyze(server) + + assert len(findings) == 1 + assert findings[0].location + assert findings[0].location.line == 4