Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/mcts/analyzers/data_leakage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@

HIDDEN_CHAR_PATTERN = re.compile(r"[\u200b-\u200f\ufeff\u202a-\u202e]")

LOGGING_CALL_PATTERN = re.compile(
r"""
^\s*
(?:
print
|console\.(?:log|info|warn|warning|error|debug)
|(?:logger|logging|log)\.(?:log|info|warn|warning|error|debug|exception|critical)
|(?:self\.)?logger\.(?:log|info|warn|warning|error|debug|exception|critical)
)
\s*\(
""",
re.VERBOSE,
)


def _is_logging_statement(line: str) -> bool:
return bool(LOGGING_CALL_PATTERN.search(line))


class DataLeakageAnalyzer(BaseAnalyzer):
"""Scans tool metadata and source files for exposed secrets."""
Expand Down Expand Up @@ -101,6 +119,8 @@ def _scan_source_files(self, server: MCPServerInfo) -> list[Finding]:
for label, pattern, severity in SECRET_PATTERNS:
if not pattern.search(line):
continue
if label == "Internal URL" and _is_logging_statement(line):
continue
finding_id = f"leak-src-{file_path}-{line_no}-{label.lower().replace(' ', '-')}"
if finding_id in seen:
continue
Expand Down
24 changes: 24 additions & 0 deletions tests/test_analyzers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from pathlib import Path

from mcts.analyzers.command_execution import CommandExecutionAnalyzer
from mcts.analyzers.data_leakage import DataLeakageAnalyzer
from mcts.core.config import ScanConfig
from mcts.discovery.static import StaticDiscovery
from mcts.mcp.models import MCPServerInfo
from mcts.reporting.models import Severity


Expand All @@ -24,3 +26,25 @@ def test_data_leakage_scans_source_files(example_server_path: Path) -> None:
report = Scanner(ScanConfig(target=example_server_path)).run()
source_findings = [f for f in report.findings if f.analyzer == "data_leakage" and f.location]
assert source_findings or any(f.analyzer == "data_leakage" for f in report.findings)


def test_data_leakage_ignores_loopback_urls_in_log_messages() -> None:
server = MCPServerInfo(
name="perseus",
source_files={
"mcp.py": "\n".join(
[
"print(f'Perseus MCP SSE server listening on http://127.0.0.1:{port}')",
"print(f' SSE endpoint: http://127.0.0.1:{port}/sse')",
"logger.info('Server card: http://localhost:9000/.well-known/mcp/server-card.json')",
"callback_url = 'http://127.0.0.1:9000/message'",
]
)
},
)

findings = DataLeakageAnalyzer().analyze(server)

assert len(findings) == 1
assert findings[0].location
assert findings[0].location.line == 4
Loading