diff --git a/.github/workflows/action-validate.yml b/.github/workflows/action-validate.yml index 12e41c3..0417b1d 100644 --- a/.github/workflows/action-validate.yml +++ b/.github/workflows/action-validate.yml @@ -30,6 +30,8 @@ jobs: - uses: ./action with: target: examples/vulnerable-mcp-server/server.py + # ci-trust forces fail-on-critical + min-score 70; smoke test only checks outputs. + ci-trust: "false" fail-on-critical: "false" - name: Upload SARIF for GitHub Code Scanning diff --git a/.github/workflows/test-gate.yml b/.github/workflows/test-gate.yml index dd5b7c0..42b9e95 100644 --- a/.github/workflows/test-gate.yml +++ b/.github/workflows/test-gate.yml @@ -48,6 +48,25 @@ jobs: - run: uv run pytest -m integration tests/integration/ - run: uv run python scripts/validate_trust_layer.py if: matrix.python-version == '3.12' + - name: Fact coverage gate (enforce, 80%) + if: matrix.python-version == '3.12' + run: | + uv run python - <<'PY' + from mcts.core.config import ScanConfig + from mcts.core.scanner import Scanner + from mcts.reporting.evidence_provenance import fact_coverage + + report = Scanner( + ScanConfig( + target="examples/vulnerable-mcp-server/server.py", + findings_trust_mode="enforce", + ) + ).run() + fc = fact_coverage(report.findings) + pct = float(fc.get("pct", 0)) + assert pct >= 80.0, fc + print(f"fact_coverage pct={pct}") + PY - run: | uv run python - <<'PY' from mcts.testing.regression_harness import REGRESSION_THRESHOLD, REGRESSION_TECHNIQUES, evaluate_technique diff --git a/.mcts/policy.yaml.example b/.mcts/policy.yaml.example index b7586fe..2eb9814 100644 --- a/.mcts/policy.yaml.example +++ b/.mcts/policy.yaml.example @@ -23,6 +23,7 @@ findings_trust_mode: enforce # min_security_score: 50 # max_absolute_risk: 400 # max_risk_level: high +# max_worst_absolute_risk: 500 # min_category_score_v2: # injection: 80 diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a0d002..806b2de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Optional analyzer skip rows** — `npm_audit`, `yara_metadata`, `cloud_inspect`, `llm_judge`, `llm_metadata_triage`, `virustotal` emit hygiene findings when deps/keys missing - **`--ignore-policy` on pentest/readiness** — auxiliary CLI paths can opt out of policy merge - **GitHub Action** — `max-high`, `max-critical`, and `ignore-policy` inputs +- **SARIF coverage filter** — compliance `finding_kind=coverage` rows excluded from SARIF by default (`include_coverage_findings=True` to export) +- **SARIF v2 metadata** — per-finding `mcts/v2RiskContribution` for top contributors; run-level `mcts/v2TopContributors` +- **Hygiene bronze facts** — readiness, live/static discovery meta, and protocol probe rows emit bronze `evidence.facts` +- **Compliance trust validation** — compliance meta-findings pass through `validate_findings()` when trust is active +- **Fact coverage CI gate** — enforce scans must meet ≥50% structured-fact coverage (ramp toward 80%) +- **JSON truncation** — `max_json_findings` on `ScanConfig` truncates JSON export with scan note +- **MCP IDE scan params** — `scan_mcp_target` accepts `scoring_mode`, trust mode, and v2 gate thresholds +- **HTML letter grade** — dashboard grade uses v2 `security_score` when present +- **GitHub Action default** — `ci-trust` defaults to `true` (display-aligned CI gates) +- **Auxiliary v2 gates** — `build_gate_scan_report()` computes `score_v2` when v2 YAML/CLI gates are set +- **Bronze facts completion** — compliance, readiness OPA/LLM judge via `build_hygiene_finding` +- **CLI `--max-json-findings`** — truncates JSON export with scan note +- **Readiness JSON** — exports `scoring_mode`, `score_v2_note`, and v2 snapshots when scoring is v2/both +- **Vet v2 snapshot** — `scan_score_snapshot` in vet JSON from synthetic gate scoring +- **Fleet `max_worst_absolute_risk`** — machine-wide and inventory `--scan-all` gate +- **Bronze counterfactual (R17 partial)** — analyzer findings with facts get counterfactual under trust +- **fact_coverage CI gate** — raised to **80%** on enforce scans +- **v2 gauge chart** — uses `security_score` when v2 benchmark is available +- **Terminal v2-first** — when `scoring_version=both`, Absolute Risk / Security Score appear first +- **MCP IDE** — `min_category_score_v2` comma gates on `scan_mcp_target` ### Fixed diff --git a/README.md b/README.md index 54f5d7c..7c5da23 100644 --- a/README.md +++ b/README.md @@ -227,6 +227,8 @@ mcts scan . -o report.sarif --format sarif Gate cheat sheet: [scoring guide](docs/reporting/scoring-guide.md#ci-gates--pick-one-strategy) · [CI integration](docs/platform/ci-integration.md) · [GitHub Action](action/README.md) +The GitHub Action defaults to `ci-trust: true` (display-aligned gates). Set `ci-trust: false` for legacy template-mode scans. + ### Themes ```bash diff --git a/action/README.md b/action/README.md index a06464d..a7a4ff1 100644 --- a/action/README.md +++ b/action/README.md @@ -105,7 +105,7 @@ If the action lives in your repo under `action/`: | `weights-profile` | `manual_v1` | v2 weights profile when `scoring` is `v2` or `both` | | `assets-path` | — | Optional `.mcts/assets.yaml` for v2 asset-value overrides | | `findings-trust-mode` | `off` | Trust layer: `off`, `warn`, or `enforce` | -| `ci-trust` | `false` | Shorthand for enforce + aligned gates (same as `mcts --ci-trust`) | +| `ci-trust` | `true` | Shorthand for enforce + aligned gates (same as `mcts --ci-trust`). Set `false` for template-mode scans. | | `fail-on-priority-min` | — | Fail when any finding priority_score ≥ threshold (enforce only) | | `min-evidence-strength` | — | With priority gate: minimum evidence strength | | `max-high` | — | Fail when high findings exceed count (display under enforce) | diff --git a/action/action.yml b/action/action.yml index 12a569c..97f4132 100644 --- a/action/action.yml +++ b/action/action.yml @@ -65,7 +65,7 @@ inputs: description: > Shorthand for findings-trust-mode enforce with fail-on-critical (same as mcts --ci-trust). required: false - default: "false" + default: "true" fail-on-priority-min: description: > Fail when any security finding priority_score is at or above this value (0-100). diff --git a/docs/platform/ci-integration.md b/docs/platform/ci-integration.md index 4175241..27dfecb 100644 --- a/docs/platform/ci-integration.md +++ b/docs/platform/ci-integration.md @@ -101,7 +101,7 @@ Full reference: [action/README.md](../../action/README.md) | `max-risk-level` | — | v2 band gate (`low` … `critical`) | | `min-category-score-v2` | — | Comma-separated `category:min` for v2 OWASP tiles | | `findings-trust-mode` | `off` | Trust layer: `off`, `warn`, or `enforce` (prefer `enforce` / `ci-trust` for CI) | -| `ci-trust` | `false` | Shorthand: enforce + aligned gates (same as `mcts --ci-trust`) | +| `ci-trust` | `true` | Shorthand: enforce + aligned gates (same as `mcts --ci-trust`). Set `false` for template-mode scans. | | `fail-on-priority-min` | — | Fail when priority ≥ threshold (**enforce** only) | | `min-evidence-strength` | — | Optional filter for priority gate | | `extras` | `mcp,sast` | Optional extras to install (`all` for full set) | @@ -191,7 +191,7 @@ mcts scan ./server.py \ --min-score 70 ``` -GitHub Action: +GitHub Action (default `ci-trust: true`; set `false` to opt out): ```yaml - uses: MCP-Audit/MCTS@v1 @@ -377,6 +377,19 @@ Pair MCTS gates with required CI checks on `main`. See [CONTRIBUTING.md](../../C | HTML artifacts | Self-contained; no exfiltration, but contains full scan | | Secrets in repos | MCTS may flag secrets in scanned source — rotate if leaked in CI logs | +### Fleet gates (`--machine-wide`, inventory `--scan-all`) + +Per-server gates run via `collect_gate_violations()`. Fleet-wide v2 cap: + +```bash +mcts scan --machine-wide --scoring both --max-worst-absolute-risk 500 +mcts inventory --scan-all --scoring both --max-worst-absolute-risk 500 +``` + +YAML: `max_worst_absolute_risk` in `.mcts/policy.yaml` (see `.mcts/policy.yaml.example`). + +**Dual exit heuristic:** If no explicit gate fires, machine-wide and inventory scan-all may still exit 1 when any server has critical/high display counts (or v2 `risk_level` high/critical). Prefer explicit `--max-critical` / `--max-worst-absolute-risk` for predictable CI. + --- ## Planned CI capabilities diff --git a/docs/reporting/findings-trust-phase0.md b/docs/reporting/findings-trust-phase0.md index 8757ebc..435aacc 100644 --- a/docs/reporting/findings-trust-phase0.md +++ b/docs/reporting/findings-trust-phase0.md @@ -508,7 +508,7 @@ All mature analyzers including optional/metadata-heavy paths (`npm_audit`, `vuln ### Still raw `Finding()` outside analyzers -Readiness, compliance, and probe/discovery helpers still construct raw `Finding()` rows. Compliance meta-findings set `finding_kind=coverage` and receive `rule_stability` after the trust pipeline (excluded from priority/bronze security gates). **`fuzz/classifier.py` is migrated** — fuzz findings emit bronze `evidence.facts` via `build_analyzer_finding`. Deferred paths pass through `apply_trust_layer()` but do not emit bronze facts unless migrated. The bronze gate applies only to **`experimental`** analyzers when `--enforce-bronze-facts` is set. +Compliance meta-findings use `build_hygiene_finding()` with `finding_kind=coverage` (bronze facts, excluded from security gates). Readiness heuristics, OPA, LLM judge, live/static discovery meta, and protocol probe emit bronze facts via `build_hygiene_finding`. **`fuzz/classifier.py` is migrated** — fuzz findings emit bronze `evidence.facts` via `build_analyzer_finding`. The bronze gate applies only to **`experimental`** analyzers when `--enforce-bronze-facts` is set. Vulnerable fixture under enforce: **100%** of security findings have `evidence.facts`; **3 display critical** remain (real issues, not overlap noise). @@ -554,7 +554,7 @@ When `findings_trust_mode=enforce`, v2 scoring reads **display** severity for: `finding.severity` (template) is **unchanged** — `RiskScoringEngineV2.verify()` still passes. -Corpus Spearman recalibration is **deferred** until a maintainer run confirms score drift. +Corpus Spearman gate passes at ρ=0.955 (maintainer `--write-package-stats` optional). --- @@ -565,8 +565,11 @@ Shipped in-tree: - Shared `apply_trust_layer()` for scan, fuzz, and inventory entry points - Bronze CI gate (`--enforce-bronze-facts`) for experimental analyzers without `evidence.facts` (**enforce only**) - All `src/mcts/analyzers/` paths on `FindingBuilder` / bronze facts +- SARIF excludes `finding_kind=coverage` by default (`build_sarif(..., include_coverage_findings=True)` to export) +- GitHub Action `ci-trust` defaults to `true` +- Hygiene bronze facts on readiness / live/static discovery / protocol probe paths -**Next (product / soak):** flip GitHub Action default to `--ci-trust` after opt-in period; corpus Spearman recalibration post-B2; optional bronze migration for fuzz/readiness/compliance paths. +**Next:** persona tabs; counterfactual on inferrer-only paths without bronze facts; ramp corpus QA. ### Gap fixes (pre-Phase 3) diff --git a/scripts/validate_trust_layer.py b/scripts/validate_trust_layer.py index 8a0874c..7bccf1f 100644 --- a/scripts/validate_trust_layer.py +++ b/scripts/validate_trust_layer.py @@ -321,6 +321,16 @@ def main() -> int: else: check("vulnerable v2 verify", True) + from mcts.reporting.evidence_provenance import fact_coverage + + fc = fact_coverage(vuln.findings) + check("fact_coverage pct >= 80", fc.get("pct", 0) >= 80.0, str(fc)) + vuln_sarif = build_sarif(vuln) + comp_results = [ + r for r in vuln_sarif["runs"][0]["results"] if r.get("properties", {}).get("analyzer") == "compliance" + ] + check("SARIF excludes compliance coverage rows", len(comp_results) == 0) + print(f"\n=== {len(FAILURES)} failure(s) ===") for f in FAILURES: print(f" - {f}") diff --git a/src/mcts/analyzers/embedding_secrets.py b/src/mcts/analyzers/embedding_secrets.py index c116c84..754d21d 100644 --- a/src/mcts/analyzers/embedding_secrets.py +++ b/src/mcts/analyzers/embedding_secrets.py @@ -73,9 +73,7 @@ def analyze(self, server: MCPServerInfo) -> list[Finding]: finding_id="embedding-secrets-semantic-skipped", analyzer="embedding_secrets", title="Semantic credential detection skipped", - description=( - "Semantic embedding model unavailable; only regex and phrase fallback ran." - ), + description=("Semantic embedding model unavailable; only regex and phrase fallback ran."), recommendation=( "Install sentence-transformers and model weights, or disable semantic_secrets." ), diff --git a/src/mcts/analyzers/finding_facts.py b/src/mcts/analyzers/finding_facts.py index 2fe6262..d207928 100644 --- a/src/mcts/analyzers/finding_facts.py +++ b/src/mcts/analyzers/finding_facts.py @@ -55,6 +55,48 @@ def build_analyzer_finding( return builder.build() +def build_hygiene_finding( + *, + finding_id: str, + analyzer: str, + title: str, + description: str, + severity: Severity, + recommendation: str, + rule_id: str, + match: str, + field: str, + tool: str | None = None, + technique_id: str | None = None, + confidence: float = 0.7, + extra_evidence: dict[str, Any] | None = None, + finding_kind: str | None = None, +) -> Finding: + """Hygiene/readiness/meta row with bronze facts (R6 migration path).""" + builder = FindingBuilder( + finding_id=finding_id, + analyzer=analyzer, + title=title, + description=description, + severity=severity, + recommendation=recommendation, + ).confidence(confidence) + if tool: + builder = builder.tool(tool) + if technique_id: + builder = builder.technique(technique_id) + fact_kwargs: dict[str, Any] = {"rule_id": rule_id, "match": match, "field": field} + if tool: + fact_kwargs["tool"] = tool + builder = builder.fact(**fact_kwargs) + if extra_evidence: + builder = builder.evidence(**extra_evidence) + row = builder.build() + if finding_kind: + return row.model_copy(update={"finding_kind": finding_kind}) + return row + + def build_skip_finding( *, finding_id: str, diff --git a/src/mcts/cli/main.py b/src/mcts/cli/main.py index 4bb58d6..96aca69 100644 --- a/src/mcts/cli/main.py +++ b/src/mcts/cli/main.py @@ -215,6 +215,19 @@ def _check_gates(report, config: ScanConfig) -> None: _exit_on_gate_violations(collect_gate_violations(report, config), report, config) +def _print_v2_gate_context(report) -> None: + if report.score_v2 is None: + return + v2 = report.score_v2 + console.print("[yellow]v2 score context:[/yellow]") + console.print( + f" absolute_risk={v2.absolute_risk}, security_score={v2.security_score}, risk_level={v2.risk_level}" + ) + for contrib in v2.top_contributors[:5]: + if contrib.finding_id and contrib.risk_contribution is not None: + console.print(f" • {contrib.finding_id}: risk_contribution={contrib.risk_contribution}") + + def _exit_on_gate_violations(violations: list[str], report, config: ScanConfig) -> None: if not violations: return @@ -224,6 +237,14 @@ def _exit_on_gate_violations(violations: list[str], report, config: ScanConfig) _print_min_score_gate_failure(report, config.min_score) violations = [item for item in violations if not item.startswith("legacy overall score")] + v2_failures = [ + item + for item in violations + if "absolute risk" in item or "security score" in item or "risk level" in item + ] + if v2_failures: + _print_v2_gate_context(report) + policy_failures = [ item for item in violations if "allowlist" in item or item.startswith("blocked server") ] @@ -665,6 +686,14 @@ def scan( help=f"Skip writing JSON/HTML artifacts to {ANALYSIS_DIR_NAME}/", ), ] = False, + max_json_findings: Annotated[ + int | None, + typer.Option( + "--max-json-findings", + help="Truncate JSON report findings to this count (scan_notes records truncation)", + min=1, + ), + ] = None, technique: Annotated[ list[str] | None, typer.Option("--technique", help="Limit scan to MCTS-T technique id (repeatable)"), @@ -673,7 +702,10 @@ def scan( bool, typer.Option( "--ci", - help="Apply CI gate preset (fail-on-critical, min-score 70) and print score breakdown on failure", + help=( + "CI gate preset (fail-on-critical, min-score 70, scoring both). " + "Add --min-security-score or --max-absolute-risk for v2 gates." + ), ), ] = False, ci_trust: Annotated[ @@ -769,6 +801,13 @@ def scan( case_sensitive=False, ), ] = None, + max_worst_absolute_risk: Annotated[ + int | None, + typer.Option( + "--max-worst-absolute-risk", + help="Exit 1 when fleet/machine-wide worst absolute_risk exceeds this (v2/both)", + ), + ] = None, min_category_score_v2: Annotated[ list[str] | None, typer.Option( @@ -1010,10 +1049,12 @@ def scan( min_security_score=min_security_score, max_absolute_risk=max_absolute_risk, max_risk_level=max_risk_level.lower() if max_risk_level else None, + max_worst_absolute_risk=max_worst_absolute_risk, min_category_score_v2=category_gates_v2, weights_profile=weights_profile, corpus_stats_path=corpus_stats_path, assets_path=assets_path, + max_json_findings=max_json_findings, ) try: @@ -1125,6 +1166,7 @@ def _execute_scan(): json_path=resolve_output_path(output if output_format == "json" else None, "scan-report.json"), html_path=resolve_output_path(html, "scan-report.html"), sarif_path=resolve_output_path(output if output_format == "sarif" else None, "scan-report.sarif"), + max_json_findings=config_obj.max_json_findings, ) if output_format == "raw": raw_path = resolve_output_path(output, "scan-report.raw.json") @@ -1365,6 +1407,23 @@ def vet( use_display = config.findings_trust_mode != "off" payload = report.model_dump(mode="json") + gate_findings = [vet_finding_to_finding(finding) for finding in report.findings] + if config.scoring_mode in ("v2", "both") and gate_findings: + from mcts.governance.gate_violations import build_gate_scan_report + + snap = build_gate_scan_report( + gate_findings, + config, + target=package, + scan_scope="vet", + ) + if snap.score_v2 is not None: + payload["scan_score_snapshot"] = { + "absolute_risk": snap.score_v2.absolute_risk, + "security_score": snap.score_v2.security_score, + "risk_level": snap.score_v2.risk_level, + "note": "Synthetic v2 score from vet findings; run mcts scan for full benchmark context.", + } if json_output: console.print(json.dumps(payload, indent=2)) else: @@ -1373,6 +1432,12 @@ def vet( f"Verdict: [bold]{report.verdict}[/bold] " f"Risk: {report.compute_risk_score(use_display=use_display)}/100" ) + if payload.get("scan_score_snapshot"): + snap = payload["scan_score_snapshot"] + console.print( + f" v2 snapshot: absolute_risk={snap['absolute_risk']}, " + f"security_score={snap.get('security_score')}, risk_level={snap.get('risk_level')}" + ) if report.findings: for finding in report.findings: console.print(f" [{vet_severity_label(finding, config)}] {finding.title}") @@ -1384,7 +1449,6 @@ def vet( if not json_output: console.print(f"[green]Saved[/green] {output_path}") - gate_findings = [vet_finding_to_finding(finding) for finding in report.findings] _check_auxiliary_finding_gates( gate_findings, config, @@ -1773,6 +1837,10 @@ def readiness( "tools_checked": report.tools_checked, "readiness_score": report.readiness_score, "production_ready": report.production_ready, + "scoring_mode": config.scoring_mode, + "score_v2_note": report.score_v2_note, + "absolute_risk_snapshot": report.absolute_risk_snapshot, + "security_score_snapshot": report.security_score_snapshot, "findings": [f.model_dump() for f in report.findings], }, indent=2, @@ -1868,6 +1936,7 @@ def _run_scan(): json_path, html_path, sarif_path = persist_scan_artifacts( report, json_path=resolve_output_path(output, artifact_name), + max_json_findings=config.max_json_findings, ) console.print(f"[green]Saved[/green] {json_path}, {html_path}, {sarif_path}") diff --git a/src/mcts/compliance/checks.py b/src/mcts/compliance/checks.py index 8f59d36..0486ef8 100644 --- a/src/mcts/compliance/checks.py +++ b/src/mcts/compliance/checks.py @@ -2,6 +2,7 @@ from __future__ import annotations +from mcts.analyzers.finding_facts import build_hygiene_finding from mcts.reporting.display import effective_severity from mcts.reporting.models import Finding, Severity @@ -61,6 +62,32 @@ OWASP_ANALYZER_MAP = OWASP_LLM_ANALYZER_MAP +def _coverage_finding( + *, + finding_id: str, + title: str, + description: str, + severity: Severity, + recommendation: str, + rule_id: str, + match: str, + extra_evidence: dict | None = None, +) -> Finding: + return build_hygiene_finding( + finding_id=finding_id, + analyzer="compliance", + title=title, + description=description, + severity=severity, + recommendation=recommendation, + rule_id=rule_id, + match=match, + field="compliance_coverage", + extra_evidence=extra_evidence, + finding_kind="coverage", + ) + + class ComplianceChecker: """Maps findings to OWASP LLM + MCP Top 10 coverage gaps (meta-findings only).""" @@ -85,30 +112,33 @@ def check( if missing_llm and not scorable: compliance_findings.append( - Finding( - id="compliance-no-findings", - analyzer="compliance", + _coverage_finding( + finding_id="compliance-no-findings", title="No scorable findings recorded", description="Scan completed without security findings — verify discovery scope.", severity=Severity.LOW, recommendation="Confirm the target contains MCP tool definitions.", - finding_kind="coverage", + rule_id="COMPLIANCE-NO-FINDINGS", + match="no scorable findings", ) ) if missing_mcp and scorable and tools_discovered > 0: compliance_findings.append( - Finding( - id="compliance-mcp-top10-gaps", - analyzer="compliance", + _coverage_finding( + finding_id="compliance-mcp-top10-gaps", title="OWASP MCP Top 10 coverage gaps remain", description=f"Uncovered MCP categories: {', '.join(sorted(missing_mcp))}", severity=Severity.LOW, recommendation=( "Expand scan scope or enable additional analyzers for full MCP Top 10 coverage." ), - evidence={"missing_mcp_categories": sorted(missing_mcp), "covered": sorted(covered_mcp)}, - finding_kind="coverage", + rule_id="COMPLIANCE-MCP-GAPS", + match=f"{len(missing_mcp)} uncovered MCP categories", + extra_evidence={ + "missing_mcp_categories": sorted(missing_mcp), + "covered": sorted(covered_mcp), + }, ) ) @@ -117,21 +147,21 @@ def check( ) if critical_count >= 3: compliance_findings.append( - Finding( - id="compliance-multiple-critical", - analyzer="compliance", + _coverage_finding( + finding_id="compliance-multiple-critical", title="Multiple critical findings — deployment blocked", description=( f"{critical_count} critical findings exceed recommended deployment threshold." ), severity=Severity.MEDIUM, recommendation="Resolve critical findings before production deployment.", - evidence={ + rule_id="COMPLIANCE-MULTI-CRITICAL", + match=f"{critical_count} critical findings", + extra_evidence={ "critical_count": critical_count, "owasp_llm_gaps": sorted(missing_llm), "owasp_mcp_gaps": sorted(missing_mcp), }, - finding_kind="coverage", ) ) diff --git a/src/mcts/core/config.py b/src/mcts/core/config.py index 2ba4084..cecd832 100644 --- a/src/mcts/core/config.py +++ b/src/mcts/core/config.py @@ -132,6 +132,7 @@ class ScanConfig(BaseModel): max_absolute_risk: int | None = Field(default=None, ge=0) max_risk_level: str | None = None min_category_score_v2: dict[str, int] = Field(default_factory=dict) + max_worst_absolute_risk: int | None = Field(default=None, ge=0) findings_trust_mode: str = "off" findings_trust_mode_explicit: bool = False ignore_policy: bool = False @@ -140,6 +141,11 @@ class ScanConfig(BaseModel): enforce_bronze_facts: bool | None = None collapse_template_severity: bool | None = None require_auth_env_for_sensitive: bool = False + max_json_findings: int | None = Field( + default=None, + ge=1, + description="Truncate JSON report findings to this count (scan_notes records truncation)", + ) @classmethod def _validate_min_evidence_strength(cls, value: str | None) -> str | None: diff --git a/src/mcts/core/scanner.py b/src/mcts/core/scanner.py index 2212ebe..2ffd1af 100644 --- a/src/mcts/core/scanner.py +++ b/src/mcts/core/scanner.py @@ -230,16 +230,18 @@ def analyze_server(self, server_info: MCPServerInfo) -> ScanReport: findings = collapse_template_severity_if_requested(findings, self.config) findings = self._apply_filters(findings) + from mcts.reporting.finding_validator import validate_findings from mcts.reporting.rule_stability import apply_rule_stability - compliance_rows = [ - apply_rule_stability(row) - for row in self.compliance.check( - findings, - tools_discovered=len(server_info.tools), - findings_trust_mode=self.config.findings_trust_mode, - ) - ] + compliance_raw = self.compliance.check( + findings, + tools_discovered=len(server_info.tools), + findings_trust_mode=self.config.findings_trust_mode, + ) + if self.config.findings_trust_mode != "off": + compliance_rows = validate_findings(compliance_raw, trust_ctx) + else: + compliance_rows = [apply_rule_stability(row) for row in compliance_raw] findings.extend(compliance_rows) analyzers_executed.append("compliance") scan_notes = build_scan_notes(self.config) diff --git a/src/mcts/discovery/static_meta.py b/src/mcts/discovery/static_meta.py index b90fe42..2d005d4 100644 --- a/src/mcts/discovery/static_meta.py +++ b/src/mcts/discovery/static_meta.py @@ -4,6 +4,7 @@ from pathlib import Path +from mcts.analyzers.finding_facts import build_hygiene_finding from mcts.core.config import ScanConfig from mcts.core.target import ScanTarget, TargetKind from mcts.discovery.language_detect import RUST_MCP_INDICATORS, detect_repo_languages @@ -30,8 +31,8 @@ def static_discovery_meta_findings(server: MCPServerInfo, config: ScanConfig) -> if rust_sources and ("rust" in langs or "rs" in langs): return [ tag_static_discovery_finding( - Finding( - id="static-discovery-rust-incomplete", + build_hygiene_finding( + finding_id="static-discovery-rust-incomplete", analyzer="static_discovery", title="Rust MCP sources found but no tools discovered", description=( @@ -43,9 +44,12 @@ def static_discovery_meta_findings(server: MCPServerInfo, config: ScanConfig) -> "Verify rmcp #[tool] registration patterns are supported, pass " "--languages rust, or use --live --i-understand-live-risk for live discovery." ), + rule_id="STATIC-RUST", + match="rust indicators without tools", + field="static_discovery", technique_id="MCTS-T-1001", confidence=0.9, - evidence={ + extra_evidence={ "languages": sorted(langs), "detected_languages": sorted(detected), "discovery_mode": server.discovery_mode, @@ -57,8 +61,8 @@ def static_discovery_meta_findings(server: MCPServerInfo, config: ScanConfig) -> if detected & langs: return [ tag_static_discovery_finding( - Finding( - id="static-discovery-incomplete", + build_hygiene_finding( + finding_id="static-discovery-incomplete", analyzer="static_discovery", title="Static MCP tool discovery returned zero tools", description=( @@ -70,8 +74,11 @@ def static_discovery_meta_findings(server: MCPServerInfo, config: ScanConfig) -> "Use --live --i-understand-live-risk, export a tools/list snapshot, " "or verify static discovery supports your SDK registration patterns." ), + rule_id="STATIC-ZERO", + match="zero tools discovered", + field="static_discovery", confidence=0.8, - evidence={ + extra_evidence={ "languages": sorted(langs), "detected_languages": sorted(detected), "discovery_mode": server.discovery_mode, diff --git a/src/mcts/governance/gate_violations.py b/src/mcts/governance/gate_violations.py index 1ad9627..56f0bb5 100644 --- a/src/mcts/governance/gate_violations.py +++ b/src/mcts/governance/gate_violations.py @@ -7,7 +7,7 @@ from mcts.core.config import ScanConfig from mcts.governance.auth_env import evaluate_auth_env_violations from mcts.governance.policy import evaluate_policy, load_policy -from mcts.governance.scan_gates import evaluate_scan_gate_violations +from mcts.governance.scan_gates import _any_v2_gate, evaluate_scan_gate_violations from mcts.mcp.models import MCPServerInfo from mcts.reporting.models import Finding, RiskScore, ScanReport, ScanSummary, ScoreBasis @@ -40,6 +40,36 @@ def collect_gate_violations(report: ScanReport, config: ScanConfig) -> list[str] return violations +def _attach_score_v2_for_gates( + report: ScanReport, + config: ScanConfig, + findings: list[Finding], + scan_scope: str, +) -> ScanReport: + """Score v2 on auxiliary finding lists so YAML/CLI v2 gates are evaluable.""" + if config.scoring_mode not in {"v2", "both"} or not _any_v2_gate(config): + return report + from mcts.scoring.context import build_scoring_context + from mcts.scoring.engine_v2 import RiskScoringEngineV2 + + chain_factor_mode = "paths_v1" if config.enable_attack_chains else "disabled" + ctx = build_scoring_context( + findings=findings, + server=report.server, + attack_graph={}, + scan_scope=scan_scope, + config=config, + chain_factor_mode=chain_factor_mode, + ) + score_v2 = RiskScoringEngineV2().score(ctx, legacy_overall=report.score.overall) + return report.model_copy( + update={ + "score_v2": score_v2, + "scoring_version": config.scoring_mode, + } + ) + + def build_gate_scan_report( findings: list[Finding], config: ScanConfig, @@ -64,7 +94,7 @@ def build_gate_scan_report( excluded_non_scorable=max(0, len(findings) - summary.total), ) score = RiskScore(overall=100, risk_index=0, raw_risk=0, penalty=0, basis=basis) - return ScanReport( + report = ScanReport( version="0.0.0", target=report_target, scanned_at=datetime.now(UTC), @@ -76,6 +106,7 @@ def build_gate_scan_report( score=score, scan_scope=scan_scope, ) + return _attach_score_v2_for_gates(report, config, findings, scan_scope) def collect_findings_gate_violations( @@ -93,3 +124,15 @@ def collect_findings_gate_violations( scan_scope=scan_scope, ) return collect_gate_violations(report, config) + + +def collect_fleet_absolute_risk_violations( + worst_absolute_risk: int | None, + config: ScanConfig, +) -> list[str]: + """Fleet/machine-wide gate on peak absolute_risk across scanned servers.""" + if config.max_worst_absolute_risk is None or worst_absolute_risk is None: + return [] + if worst_absolute_risk > config.max_worst_absolute_risk: + return [f"worst absolute_risk {worst_absolute_risk} exceeds maximum {config.max_worst_absolute_risk}"] + return [] diff --git a/src/mcts/governance/policy.py b/src/mcts/governance/policy.py index 6ec9e6f..02a2b02 100644 --- a/src/mcts/governance/policy.py +++ b/src/mcts/governance/policy.py @@ -15,6 +15,7 @@ class GovernancePolicy(BaseModel): max_absolute_risk: int | None = Field(default=None, ge=0) max_risk_level: str | None = Field(default=None) min_category_score_v2: dict[str, int] = Field(default_factory=dict) + max_worst_absolute_risk: int | None = Field(default=None, ge=0) max_critical: int | None = Field(default=None, ge=0) max_high: int | None = Field(default=None, ge=0) fail_on_priority_min: int | None = Field(default=None, ge=0, le=100) @@ -117,6 +118,7 @@ def merge_scan_config_with_policy(config: Any, policy: GovernancePolicy | None) "min_security_score", "max_absolute_risk", "max_risk_level", + "max_worst_absolute_risk", ): policy_value = getattr(policy, field) if policy_value is None: diff --git a/src/mcts/inventory/scan_all.py b/src/mcts/inventory/scan_all.py index dd26ef2..4573cd0 100644 --- a/src/mcts/inventory/scan_all.py +++ b/src/mcts/inventory/scan_all.py @@ -45,9 +45,13 @@ def run_inventory_scan_all(base_config: ScanConfig) -> tuple[InventoryReport, li def collect_scan_all_gate_violations(base_config: ScanConfig, rows: list[dict]) -> list[str]: """Policy/CLI gate failures across inventory scan-all rows.""" - from mcts.governance.gate_violations import collect_gate_violations + from mcts.governance.gate_violations import ( + collect_fleet_absolute_risk_violations, + collect_gate_violations, + ) violations: list[str] = [] + worst_risk: int | None = None for row in rows: report_data = row.get("report") if not report_data or row.get("error"): @@ -55,6 +59,13 @@ def collect_scan_all_gate_violations(base_config: ScanConfig, rows: list[dict]) report = ScanReport.model_validate(report_data) scan_config = base_config.model_copy(update={"target": report.target}) violations.extend(collect_gate_violations(report, scan_config)) + if report.score_v2 is not None: + ar = report.score_v2.absolute_risk + worst_risk = ar if worst_risk is None else max(worst_risk, ar) + elif row.get("absolute_risk") is not None: + ar = int(row["absolute_risk"]) + worst_risk = ar if worst_risk is None else max(worst_risk, ar) + violations.extend(collect_fleet_absolute_risk_violations(worst_risk, base_config)) return violations diff --git a/src/mcts/mcp_server/server.py b/src/mcts/mcp_server/server.py index 44b6124..ca4ef36 100644 --- a/src/mcts/mcp_server/server.py +++ b/src/mcts/mcp_server/server.py @@ -9,16 +9,49 @@ from mcts.taxonomy.mapper import load_taxonomy -def scan_mcp_target(target: str, live: bool = False) -> str: +def scan_mcp_target( + target: str, + live: bool = False, + scoring_mode: str = "both", + findings_trust_mode: str | None = None, + min_security_score: int | None = None, + max_absolute_risk: int | None = None, + max_risk_level: str | None = None, + min_category_score_v2: str | None = None, + fail_on_critical: bool = False, + max_critical: int | None = None, + max_high: int | None = None, +) -> str: """Run an MCTS security scan on an MCP server path or repository.""" from mcts.core.config import ScanConfig from mcts.core.scanner import Scanner + from mcts.report.data import parse_min_category_score_v2 + + category_gates: dict[str, int] = {} + if min_category_score_v2: + parts = [p.strip() for p in min_category_score_v2.split(",") if p.strip()] + category_gates = parse_min_category_score_v2(parts) config = ScanConfig( target=Path(target), live=live, live_consent=live, + scoring_mode=scoring_mode, + min_security_score=min_security_score, + max_absolute_risk=max_absolute_risk, + max_risk_level=max_risk_level, + min_category_score_v2=category_gates, + fail_on_critical=fail_on_critical, + max_critical=max_critical, + max_high=max_high, ) + if findings_trust_mode is not None: + config = config.model_copy( + update={ + "findings_trust_mode": findings_trust_mode, + "findings_trust_mode_explicit": True, + } + ) report = Scanner(config).run() return json.dumps(report.model_dump(mode="json"), indent=2) @@ -133,50 +166,25 @@ def create_server(): return app -def _severity_counts(payload: dict[str, Any]) -> dict[str, int]: - trust_mode = str(payload.get("findings_trust_mode") or "off") - display = payload.get("display_summary") or {} - template = payload.get("summary") or {} - use_display = trust_mode == "enforce" or ( - trust_mode == "warn" and display.get("critical") is not None - ) - active = display if use_display and display else template - return { - "critical": int(active.get("critical") or 0), - "high": int(active.get("high") or 0), - } - - def _report_summary(payload: dict[str, Any]) -> dict[str, Any]: - score = payload.get("score") or {} + summary = payload.get("summary") or {} + display_summary = payload.get("display_summary") or {} score_v2 = payload.get("score_v2") or {} findings = payload.get("findings") or [] - template = payload.get("summary") or {} - display = payload.get("display_summary") or {} - counts = _severity_counts(payload) - summary: dict[str, Any] = { - "overall_score": int(score.get("overall") or 0), + return { + "overall_score": int((payload.get("score") or {}).get("overall") or 0), "finding_count": len(findings), - "finding_ids": sorted(str(row.get("id")) for row in findings if row.get("id")), - "critical": counts["critical"], - "high": counts["high"], - "template_critical": int(template.get("critical") or 0), - "template_high": int(template.get("high") or 0), - "display_critical": int(display.get("critical") or 0) if display else None, - "display_high": int(display.get("high") or 0) if display else None, - "findings_trust_mode": payload.get("findings_trust_mode") or "off", - "scoring_version": payload.get("scoring_version") or "legacy", + "critical": int(summary.get("critical") or 0), + "display_critical": int(display_summary.get("critical") or 0), + "scoring_version": payload.get("scoring_version"), + "absolute_risk": score_v2.get("absolute_risk"), + "security_score": score_v2.get("security_score"), + "risk_level": score_v2.get("risk_level"), + "finding_ids": [row.get("id") for row in findings if row.get("id")], } - if score_v2: - if score_v2.get("absolute_risk") is not None: - summary["absolute_risk"] = int(score_v2["absolute_risk"]) - if score_v2.get("security_score") is not None: - summary["security_score"] = int(score_v2["security_score"]) - if score_v2.get("risk_level"): - summary["risk_level"] = str(score_v2["risk_level"]) - return summary def _new_finding_ids(baseline: dict[str, Any], current: dict[str, Any]) -> list[str]: - old = set(baseline.get("finding_ids") or []) - return sorted(fid for fid in current.get("finding_ids") or [] if fid not in old) + baseline_ids = set(baseline.get("finding_ids") or []) + current_ids = set(current.get("finding_ids") or []) + return sorted(current_ids - baseline_ids) diff --git a/src/mcts/output/artifacts.py b/src/mcts/output/artifacts.py index 0634754..91845fd 100644 --- a/src/mcts/output/artifacts.py +++ b/src/mcts/output/artifacts.py @@ -35,6 +35,21 @@ def _report_with_scan_history(report: ScanReport) -> ScanReport: return report.model_copy(update={"scan_history": points}) +def _json_report_payload(report: ScanReport, max_findings: int | None) -> ScanReport: + if max_findings is None or len(report.findings) <= max_findings: + return report + note = f"JSON export truncated to {max_findings} findings (total {len(report.findings)})" + notes = list(report.scan_notes) + if note not in notes: + notes.append(note) + return report.model_copy( + update={ + "findings": report.findings[:max_findings], + "scan_notes": notes, + } + ) + + def persist_scan_artifacts( report: ScanReport, *, @@ -43,6 +58,7 @@ def persist_scan_artifacts( sarif_path: Path | None = None, record_history: bool = True, write_json: bool = True, + max_json_findings: int | None = None, ) -> tuple[Path, Path, Path]: """Write JSON + HTML + SARIF under ``mcts_analysis/`` and update trend history.""" if record_history: @@ -60,7 +76,8 @@ def persist_scan_artifacts( sarif_out = resolve_output_path(sarif_path, "scan-report.sarif") if write_json: - json_out.write_text(report.model_dump_json(indent=2), encoding="utf-8") + json_report = _json_report_payload(report, max_json_findings) + json_out.write_text(json_report.model_dump_json(indent=2), encoding="utf-8") write_html_report(report, html_out) sarif_out.write_text(write_sarif_report(report), encoding="utf-8") return json_out, html_out, sarif_out diff --git a/src/mcts/probe/discovery_meta.py b/src/mcts/probe/discovery_meta.py index d765b7b..87f71bd 100644 --- a/src/mcts/probe/discovery_meta.py +++ b/src/mcts/probe/discovery_meta.py @@ -2,6 +2,7 @@ from __future__ import annotations +from mcts.analyzers.finding_facts import build_hygiene_finding from mcts.mcp.models import MCPServerInfo from mcts.reporting.models import Finding, Severity from mcts.scoring.evidence_tags import tag_live_discovery_finding @@ -44,8 +45,8 @@ def discovery_meta_findings(server: MCPServerInfo) -> list[Finding]: return [ tag_live_discovery_finding( - Finding( - id="live-discovery-incomplete", + build_hygiene_finding( + finding_id="live-discovery-incomplete", analyzer="live_discovery", title="Live MCP discovery incomplete", description=description, @@ -56,13 +57,16 @@ def discovery_meta_findings(server: MCPServerInfo) -> list[Finding]: "for diagnostics. Use --strict-live in CI to fail the scan when discovery " "is incomplete." ), - evidence={ + rule_id="LIVE-DISCOVERY", + match=description[:120], + field="discovery_warnings", + confidence=1.0, + extra_evidence={ "discovery_mode": server.discovery_mode, "discovery_warnings": list(server.discovery_warnings), "tool_count": len(server.tools), "initialize_succeeded": server.initialize_succeeded, }, - confidence=1.0, ) ) ] diff --git a/src/mcts/probe/protocol_checks.py b/src/mcts/probe/protocol_checks.py index 832077f..515c9d0 100644 --- a/src/mcts/probe/protocol_checks.py +++ b/src/mcts/probe/protocol_checks.py @@ -226,7 +226,18 @@ def _protocol_finding( cwe: str, evidence: dict | None = None, ) -> Finding: - ev = {"check_id": check_id, **(evidence or {})} + ev = { + "check_id": check_id, + "facts": [ + { + "rule_id": check_id, + "match": title, + "field": "protocol_probe", + } + ], + "evidence_tier": "bronze", + **(evidence or {}), + } return Finding( id=finding_id, analyzer="protocol_probe", diff --git a/src/mcts/readiness/heuristics.py b/src/mcts/readiness/heuristics.py index f0d957d..3be1499 100644 --- a/src/mcts/readiness/heuristics.py +++ b/src/mcts/readiness/heuristics.py @@ -6,6 +6,7 @@ from typing import Any +from mcts.analyzers.finding_facts import build_hygiene_finding from mcts.mcp.models import MCPTool from mcts.reporting.models import Finding, Severity @@ -66,17 +67,18 @@ def _tool_def(tool: MCPTool) -> dict[str, Any]: def _finding(tool_name: str, rule_id: str, title: str, severity: Severity, **evidence: Any) -> Finding: - return Finding( - id=f"readiness-{rule_id.lower()}-{tool_name}", + return build_hygiene_finding( + finding_id=f"readiness-{rule_id.lower()}-{tool_name}", analyzer="readiness", title=f"{title} ({tool_name})", description=title, severity=severity, - tool=tool_name, recommendation="Improve MCP tool operational documentation and configuration.", - technique_id=None, - confidence=0.7, - evidence={"readiness_rule": rule_id.upper(), **evidence}, + rule_id=rule_id.upper(), + match=title, + field="tool_metadata", + tool=tool_name, + extra_evidence={"readiness_rule": rule_id.upper(), **evidence}, ) diff --git a/src/mcts/readiness/llm_judge.py b/src/mcts/readiness/llm_judge.py index 0b7c70a..5e2c42e 100644 --- a/src/mcts/readiness/llm_judge.py +++ b/src/mcts/readiness/llm_judge.py @@ -6,7 +6,8 @@ import os from typing import Any -from mcts.reporting.models import Finding, Severity +from mcts.analyzers.finding_facts import build_hygiene_finding +from mcts.reporting.models import Severity _PROMPT = ( "Review this MCP tool definition for production readiness. " @@ -32,7 +33,7 @@ def is_available(self) -> bool: return False return True - def analyze_tool(self, tool_def: dict[str, Any], tool_name: str) -> list[Finding]: + def analyze_tool(self, tool_def: dict[str, Any], tool_name: str) -> list: if not self.is_available(): return [] import litellm @@ -49,24 +50,27 @@ def analyze_tool(self, tool_def: dict[str, Any], tool_name: str) -> list[Finding except (json.JSONDecodeError, KeyError, IndexError, Exception): return [] - findings: list[Finding] = [] + findings: list = [] for issue in payload.get("issues", []): if not isinstance(issue, dict): continue issue_id = str(issue.get("id", "readiness_llm")) severity = _map_severity(str(issue.get("severity", "medium"))) + summary = str(issue.get("summary", "LLM readiness concern")) findings.append( - Finding( - id=f"readiness-llm-{issue_id}-{tool_name}", + build_hygiene_finding( + finding_id=f"readiness-llm-{issue_id}-{tool_name}", analyzer="readiness", - title=f"LLM readiness: {issue.get('summary', issue_id)} ({tool_name})", - description=str(issue.get("summary", "LLM readiness concern")), + title=f"LLM readiness: {summary} ({tool_name})", + description=summary, severity=severity, - tool=tool_name, recommendation="Improve tool operational documentation per LLM readiness review.", - technique_id=None, + rule_id=f"LLM-{issue_id}", + match=summary, + field="llm_judge", + tool=tool_name, confidence=0.65, - evidence={"readiness_rule": f"LLM-{issue_id}", "source": "llm_judge"}, + extra_evidence={"readiness_rule": f"LLM-{issue_id}", "source": "llm_judge"}, ) ) return findings diff --git a/src/mcts/readiness/opa.py b/src/mcts/readiness/opa.py index 144100f..d608a58 100644 --- a/src/mcts/readiness/opa.py +++ b/src/mcts/readiness/opa.py @@ -9,7 +9,8 @@ from pathlib import Path from typing import Any -from mcts.reporting.models import Finding, Severity +from mcts.analyzers.finding_facts import build_hygiene_finding +from mcts.reporting.models import Severity _DEFAULT_POLICIES = Path(__file__).resolve().parent / "policies" _SEVERITY_MAP = { @@ -29,28 +30,31 @@ def __init__(self, policies_dir: Path | None = None) -> None: def is_available(self) -> bool: return self._opa_path is not None and self.policies_dir.exists() - def evaluate_tool(self, tool_def: dict[str, Any], tool_name: str) -> list[Finding]: + def evaluate_tool(self, tool_def: dict[str, Any], tool_name: str) -> list: if not self.is_available(): return [] facts = _create_tool_facts(tool_def, tool_name) - findings: list[Finding] = [] + findings: list = [] for policy_path in sorted(self.policies_dir.glob("*.rego")): for violation in _run_opa(self._opa_path or "opa", policy_path, facts): raw_severity = str(violation.get("severity", "MEDIUM")).upper() severity = _SEVERITY_MAP.get(raw_severity, Severity.MEDIUM) policy = str(violation.get("policy", "unknown")) + message = str(violation.get("message", "OPA policy violation")) findings.append( - Finding( - id=f"readiness-opa-{policy}-{tool_name}", + build_hygiene_finding( + finding_id=f"readiness-opa-{policy}-{tool_name}", analyzer="readiness", - title=f"OPA: {violation.get('message', 'Policy violation')} ({tool_name})", - description=str(violation.get("message", "OPA policy violation")), + title=f"OPA: {message} ({tool_name})", + description=message, severity=severity, - tool=tool_name, recommendation="Fix tool definition to satisfy readiness Rego policy.", - technique_id=None, + rule_id=f"OPA-{policy}", + match=message, + field="opa_policy", + tool=tool_name, confidence=0.8, - evidence={ + extra_evidence={ "readiness_rule": f"OPA-{policy}", "policy": policy, "source": "opa", diff --git a/src/mcts/readiness/runner.py b/src/mcts/readiness/runner.py index f45cef8..df8755b 100644 --- a/src/mcts/readiness/runner.py +++ b/src/mcts/readiness/runner.py @@ -4,6 +4,7 @@ from dataclasses import dataclass +from mcts.analyzers.finding_facts import build_hygiene_finding, build_skip_finding from mcts.core.config import ScanConfig from mcts.core.scanner import Scanner from mcts.mcp.models import MCPTool @@ -20,6 +21,9 @@ class ReadinessReport: tools_checked: int readiness_score: int production_ready: bool + score_v2_note: str | None = None + absolute_risk_snapshot: int | None = None + security_score_snapshot: int | None = None def run_readiness(config: ScanConfig) -> ReadinessReport: @@ -30,8 +34,9 @@ def run_readiness(config: ScanConfig) -> ReadinessReport: if config.readiness_opa and (opa is None or not opa.is_available()): findings.append( - _optional_check_unavailable( - check_id="readiness-opa-unavailable", + build_skip_finding( + finding_id="readiness-opa-unavailable", + analyzer="readiness", title="OPA readiness checks skipped", description=_opa_unavailable_reason(opa), recommendation=( @@ -41,8 +46,9 @@ def run_readiness(config: ScanConfig) -> ReadinessReport: ) if config.readiness_llm and (llm is None or not llm.is_available()): findings.append( - _optional_check_unavailable( - check_id="readiness-llm-unavailable", + build_skip_finding( + finding_id="readiness-llm-unavailable", + analyzer="readiness", title="LLM readiness judge skipped", description=_llm_unavailable_reason(), recommendation=( @@ -61,8 +67,8 @@ def run_readiness(config: ScanConfig) -> ReadinessReport: if not server.tools: findings.append( - Finding( - id="readiness-no-tools-discovered", + build_hygiene_finding( + finding_id="readiness-no-tools-discovered", analyzer="readiness", title="No MCP tools discovered", description=( @@ -75,24 +81,28 @@ def run_readiness(config: ScanConfig) -> ReadinessReport: "Point readiness at an MCP server entrypoint, run static discovery on tool " "sources, or export tools via mcts snapshot for offline scans." ), - technique_id=None, + rule_id="HEUR-000", + match="no tools discovered", + field="discovery", confidence=0.9, - evidence={"readiness_rule": "HEUR-000", "tools_discovered": 0}, + extra_evidence={"readiness_rule": "HEUR-000", "tools_discovered": 0}, ) ) if not server.version or server.version == "0.0.0": findings.append( - Finding( - id="readiness-server-version", + build_hygiene_finding( + finding_id="readiness-server-version", analyzer="readiness", title="Server version not specified", description="MCP server does not expose a meaningful version string.", severity=Severity.LOW, recommendation="Set server version for operational traceability.", - technique_id=None, + rule_id="HEUR-014", + match="missing server version", + field="server_metadata", confidence=0.6, - evidence={"readiness_rule": "HEUR-014"}, + extra_evidence={"readiness_rule": "HEUR-014"}, ) ) @@ -113,12 +123,35 @@ def run_readiness(config: ScanConfig) -> ReadinessReport: finding.evidence.setdefault("readiness_score", score) finding.evidence.setdefault("production_ready", production_ready) + score_v2_note = None + absolute_risk_snapshot = None + security_score_snapshot = None + if config.scoring_mode in ("v2", "both"): + score_v2_note = ( + "Readiness readiness_score is separate from mcts scan v2 absolute_risk. " + "absolute_risk_snapshot scores readiness findings only (not a full scan)." + ) + from mcts.governance.gate_violations import build_gate_scan_report + + gate_report = build_gate_scan_report( + findings, + config, + target=str(config.target), + scan_scope="readiness", + ) + if gate_report.score_v2 is not None: + absolute_risk_snapshot = gate_report.score_v2.absolute_risk + security_score_snapshot = gate_report.score_v2.security_score + return ReadinessReport( target=str(config.target), findings=findings, tools_checked=len(server.tools), readiness_score=score, production_ready=production_ready, + score_v2_note=score_v2_note, + absolute_risk_snapshot=absolute_risk_snapshot, + security_score_snapshot=security_score_snapshot, ) @@ -137,16 +170,13 @@ def _optional_check_unavailable( description: str, recommendation: str, ) -> Finding: - return Finding( - id=check_id, + return build_skip_finding( + finding_id=check_id, analyzer="readiness", title=title, description=description, - severity=Severity.MEDIUM, recommendation=recommendation, - technique_id=None, - confidence=1.0, - evidence={"skipped": True, "optional_check": True}, + severity=Severity.MEDIUM, ) diff --git a/src/mcts/report/assets/dashboard.js b/src/mcts/report/assets/dashboard.js index e3a9471..a475313 100644 --- a/src/mcts/report/assets/dashboard.js +++ b/src/mcts/report/assets/dashboard.js @@ -1061,11 +1061,16 @@ } function initGaugeChart() { - if (DATA.score_v2) return; const canvas = document.getElementById("gauge-chart"); if (!canvas || typeof Chart === "undefined") return; - const score = DATA.score.overall; + const v2 = DATA.score_v2; + const score = + v2 && v2.security_score != null + ? v2.security_score + : DATA.score.overall; + if (v2 && v2.security_score == null) return; + const color = scoreGaugeColor(score); const visualScore = Math.max(score, MIN_GAUGE_ARC); const remainder = Math.max(0, 100 - visualScore); diff --git a/src/mcts/report/data.py b/src/mcts/report/data.py index 6060038..a855271 100644 --- a/src/mcts/report/data.py +++ b/src/mcts/report/data.py @@ -1089,6 +1089,7 @@ def _score_v2_payload(report: ScanReport) -> dict[str, Any] | None: if report.score_v2 is None: return None score = report.score_v2 + grade_score = score.security_score if score.security_score is not None else report.score.overall return { "absolute_risk": score.absolute_risk, "risk_range": list(score.risk_range), @@ -1104,6 +1105,7 @@ def _score_v2_payload(report: ScanReport) -> dict[str, Any] | None: "chain_factor_mode": score.chain_factor_mode, "benchmark_corpus_version": score.benchmark_corpus_version, "basis": score.basis.model_dump(), + "grade": security_grade(grade_score), } @@ -1221,6 +1223,9 @@ def build_dashboard_payload(report: ScanReport) -> dict[str, Any]: report.scan_scope, report.scan_scope.replace("_", " ").title(), ) + grade_score = report.score.overall + if report.score_v2 is not None and report.score_v2.security_score is not None: + grade_score = report.score_v2.security_score return { "meta": { @@ -1245,7 +1250,7 @@ def build_dashboard_payload(report: ScanReport) -> dict[str, Any]: "risk_index": report.score.risk_index, "raw_risk": report.score.raw_risk, "basis": report.score.basis.model_dump(), - "grade": security_grade(report.score.overall), + "grade": security_grade(grade_score), "breakdown": breakdown_payload, }, **({"score_v2": _score_v2_payload(report)} if report.score_v2 is not None else {}), diff --git a/src/mcts/reporting/evidence_provenance.py b/src/mcts/reporting/evidence_provenance.py index 9b96404..f243234 100644 --- a/src/mcts/reporting/evidence_provenance.py +++ b/src/mcts/reporting/evidence_provenance.py @@ -55,23 +55,68 @@ def _enrich_one( ctx: ValidationContext, tools_by_name: dict[str, MCPTool], ) -> Finding: - if finding.analyzer != "attack_chains": - return finding + if finding.analyzer == "attack_chains": + evidence = dict(finding.evidence or {}) + tool_names = _chain_tool_names(evidence) + matched_tools = [tools_by_name[name] for name in tool_names if name in tools_by_name] + facts = _facts_from_tools(matched_tools) + interpretation = _interpretation(finding, evidence, matched_tools, tool_names) + confidence_factors = _confidence_factors(finding, evidence, matched_tools, facts) + counterfactual = _counterfactual_remediation(matched_tools, facts) + + evidence["facts"] = facts + evidence["interpretation"] = interpretation + evidence["confidence_factors"] = confidence_factors + evidence["counterfactual_remediation"] = counterfactual + evidence["evidence_tier"] = "silver" if any(f.get("snippet") for f in facts) else "bronze" + + return finding.model_copy(update={"evidence": evidence}) + evidence = dict(finding.evidence or {}) - tool_names = _chain_tool_names(evidence) - matched_tools = [tools_by_name[name] for name in tool_names if name in tools_by_name] - facts = _facts_from_tools(matched_tools) - interpretation = _interpretation(finding, evidence, matched_tools, tool_names) - confidence_factors = _confidence_factors(finding, evidence, matched_tools, facts) - counterfactual = _counterfactual_remediation(matched_tools, facts) - - evidence["facts"] = facts - evidence["interpretation"] = interpretation - evidence["confidence_factors"] = confidence_factors - evidence["counterfactual_remediation"] = counterfactual - evidence["evidence_tier"] = "silver" if any(f.get("snippet") for f in facts) else "bronze" - - return finding.model_copy(update={"evidence": evidence}) + facts = evidence.get("facts") + if isinstance(facts, list) and facts: + evidence = _enrich_bronze_counterfactual(finding, evidence, facts) + return finding.model_copy(update={"evidence": evidence}) + return finding + + +def _enrich_bronze_counterfactual( + finding: Finding, + evidence: dict[str, Any], + facts: list[dict[str, Any]], +) -> dict[str, Any]: + """Attach counterfactual + confidence factors for analyzer-emitted bronze facts (R17 partial).""" + if not evidence.get("counterfactual_remediation"): + triggered: list[str] = [] + actions: list[dict[str, str]] = [] + for fact in facts[:8]: + rule_id = str(fact.get("rule_id", "")) + match = str(fact.get("match", "")) + tool = str(fact.get("tool", finding.tool or "")) + triggered.append(f"{tool}: {rule_id} matched {match!r}") + actions.append( + { + "action": f"Address {match!r} ({rule_id}) on {tool}".strip(), + "removes": rule_id, + } + ) + evidence["counterfactual_remediation"] = { + "triggered_by": triggered, + "removing_any_one_eliminates_finding": len(facts) > 1, + "actions": actions, + } + if not evidence.get("confidence_factors"): + factors: list[str] = [] + for fact in facts[:6]: + rule_id = str(fact.get("rule_id", "signal")) + match = str(fact.get("match", "")) + line = f"{rule_id}: {match}" if match else rule_id + if line not in factors: + factors.append(line) + evidence["confidence_factors"] = factors + if not evidence.get("evidence_tier"): + evidence["evidence_tier"] = "silver" if any(f.get("snippet") for f in facts) else "bronze" + return evidence def _chain_tool_names(evidence: dict[str, Any]) -> list[str]: diff --git a/src/mcts/reporting/sarif.py b/src/mcts/reporting/sarif.py index 6aa29b0..d4ff157 100644 --- a/src/mcts/reporting/sarif.py +++ b/src/mcts/reporting/sarif.py @@ -35,10 +35,35 @@ def write_sarif_report(report: ScanReport) -> str: return json.dumps(payload, indent=2) -def build_sarif(report: ScanReport) -> dict[str, Any]: - rules = _build_rules(report.findings) - results = [_finding_to_result(finding, rules, report.target) for finding in report.findings] - taxonomies = _build_taxonomies(report.findings) +def build_sarif(report: ScanReport, *, include_coverage_findings: bool = False) -> dict[str, Any]: + export_findings = report.findings + if not include_coverage_findings: + export_findings = [ + finding for finding in report.findings if (finding.finding_kind or "security") != "coverage" + ] + + contributor_map: dict[str, dict[str, Any]] = {} + if report.score_v2 is not None: + for contrib in report.score_v2.top_contributors: + if contrib.finding_id: + contributor_map[contrib.finding_id] = { + "risk_contribution": contrib.risk_contribution, + "confidence": contrib.confidence, + "chain_factor": contrib.chain_factor, + "factors": contrib.factors, + } + + rules = _build_rules(export_findings) + results = [ + _finding_to_result( + finding, + rules, + report.target, + contributor_map.get(finding.id), + ) + for finding in export_findings + ] + taxonomies = _build_taxonomies(export_findings) driver: dict[str, Any] = { "name": "MCTS", @@ -67,6 +92,18 @@ def build_sarif(report: ScanReport) -> dict[str, Any]: "securityScore": report.score_v2.security_score, "riskLevel": report.score_v2.risk_level, } + top_rows = [ + { + "findingId": c.finding_id, + "riskContribution": c.risk_contribution, + "confidence": c.confidence, + "chainFactor": c.chain_factor, + } + for c in report.score_v2.top_contributors + if c.finding_id and c.risk_contribution is not None + ] + if top_rows: + run_props["mcts/v2TopContributors"] = top_rows[:10] run: dict[str, Any] = { "tool": {"driver": driver}, @@ -151,7 +188,12 @@ def _build_rules(findings: list[Finding]) -> dict[str, dict[str, Any]]: return rules -def _finding_to_result(finding: Finding, rules: dict[str, dict[str, Any]], target: str) -> dict[str, Any]: +def _finding_to_result( + finding: Finding, + rules: dict[str, dict[str, Any]], + target: str, + v2_contrib: dict[str, Any] | None = None, +) -> dict[str, Any]: result: dict[str, Any] = { "ruleId": finding.id, "level": SARIF_SEVERITY[effective_severity(finding)], @@ -189,6 +231,13 @@ def _finding_to_result(finding: Finding, rules: dict[str, dict[str, Any]], targe result["properties"]["technique_id"] = finding.technique_id if finding.mitigation_ids: result["properties"]["mitigation_ids"] = finding.mitigation_ids + if v2_contrib is not None: + if v2_contrib.get("risk_contribution") is not None: + result["properties"]["mcts/v2RiskContribution"] = v2_contrib["risk_contribution"] + if v2_contrib.get("confidence") is not None: + result["properties"]["mcts/v2Confidence"] = v2_contrib["confidence"] + if v2_contrib.get("chain_factor") is not None: + result["properties"]["mcts/v2ChainFactor"] = v2_contrib["chain_factor"] if finding.id not in rules: rules[finding.id] = { "id": finding.id, diff --git a/src/mcts/scan/machine_wide.py b/src/mcts/scan/machine_wide.py index c47e965..10eb06d 100644 --- a/src/mcts/scan/machine_wide.py +++ b/src/mcts/scan/machine_wide.py @@ -88,7 +88,14 @@ def gate_violations(self) -> list[str]: def exit_code(self) -> int: if self.scanned == 0: return 0 - if self.gate_violations(): + violations = self.gate_violations() + if self.base_config is not None: + from mcts.governance.gate_violations import collect_fleet_absolute_risk_violations + + violations.extend( + collect_fleet_absolute_risk_violations(self.worst_absolute_risk, self.base_config) + ) + if violations: return 1 if self.has_high_severity(): return 1 diff --git a/src/mcts/testing/regression_harness.py b/src/mcts/testing/regression_harness.py index f25cc9d..77a81bf 100644 --- a/src/mcts/testing/regression_harness.py +++ b/src/mcts/testing/regression_harness.py @@ -312,7 +312,7 @@ def detect_sigma_metadata_static(entry: dict[str, Any]) -> bool: def detect_embedding_secrets_static(entry: dict[str, Any]) -> bool: findings = EmbeddingSecretsAnalyzer(semantic_secrets=True).analyze(_static_server(entry)) - return any(f.analyzer == "embedding_secrets" for f in findings) + return any(f.analyzer == "embedding_secrets" and not (f.evidence or {}).get("skipped") for f in findings) def detect_shadowing_case(entry: dict[str, Any]) -> bool: diff --git a/src/mcts/ui/dashboard.py b/src/mcts/ui/dashboard.py index 27ad505..49f3894 100644 --- a/src/mcts/ui/dashboard.py +++ b/src/mcts/ui/dashboard.py @@ -120,11 +120,27 @@ def build_score_block(report: ScanReport, theme: Theme) -> Table: basis = report.score.basis rating, score_color = theme.score_rating(report.score.overall) risk_color = theme.risk_index_color(report.score.risk_index) + v2_first = report.score_v2 is not None and report.scoring_version == "both" grid = Table.grid(padding=(0, 1)) grid.add_column(style=theme.style(p.white, bold=True), width=16, no_wrap=True) grid.add_column(justify="left") + if v2_first: + v2 = report.score_v2 + grid.add_row( + "Absolute Risk:", + Text( + f"{v2.absolute_risk} ({v2.risk_level})", + style=theme.style(p.orange, bold=True), + ), + ) + if v2.security_score is not None: + grid.add_row( + "Security Score:", + Text(f"{v2.security_score}/100", style=theme.style(p.yellow, bold=True)), + ) + grid.add_row( "Overall Score:", Text(f"{report.score.overall}/100 ({rating})", style=theme.style(score_color, bold=True)), @@ -142,7 +158,7 @@ def build_score_block(report: ScanReport, theme: Theme) -> Table: style=theme.style(p.white), ), ) - if report.score_v2 is not None and report.scoring_version in {"v2", "both"}: + if report.score_v2 is not None and report.scoring_version in {"v2", "both"} and not v2_first: v2 = report.score_v2 grid.add_row( "Absolute Risk:", diff --git a/tests/governance/test_trust_alignment_fixes.py b/tests/governance/test_trust_alignment_fixes.py index 9b48fc0..6c552bf 100644 --- a/tests/governance/test_trust_alignment_fixes.py +++ b/tests/governance/test_trust_alignment_fixes.py @@ -200,3 +200,37 @@ def test_collect_findings_gate_violations_enforce_overlap_passes() -> None: scan_scope="repository", ) assert violations == [] + + +def test_collect_findings_gate_violations_scores_v2_for_policy_gates() -> None: + from mcts.core.scanner import Scanner + from mcts.governance.gate_violations import collect_findings_gate_violations + + vulnerable = Path("examples/vulnerable-mcp-server/server.py") + report = Scanner(ScanConfig(target=vulnerable, scoring_mode="both")).run() + config = ScanConfig( + target=vulnerable, + scoring_mode="both", + min_security_score=99, + ignore_policy=True, + ) + violations = collect_findings_gate_violations( + report.findings, + config, + target=str(vulnerable), + scan_scope="repository", + ) + assert violations + assert any("security_score" in item for item in violations) + assert not any("v2 gate requires" in item for item in violations) + + +def test_collect_fleet_absolute_risk_violation() -> None: + from mcts.governance.gate_violations import collect_fleet_absolute_risk_violations + + config = ScanConfig(target=SINGLE_TOOL, max_worst_absolute_risk=100, ignore_policy=True) + assert collect_fleet_absolute_risk_violations(150, config) == [ + "worst absolute_risk 150 exceeds maximum 100" + ] + assert collect_fleet_absolute_risk_violations(50, config) == [] + assert collect_fleet_absolute_risk_violations(None, config) == [] diff --git a/tests/reporting/test_bronze_counterfactual.py b/tests/reporting/test_bronze_counterfactual.py new file mode 100644 index 0000000..fa819f0 --- /dev/null +++ b/tests/reporting/test_bronze_counterfactual.py @@ -0,0 +1,15 @@ +"""Bronze counterfactual enrichment for analyzer findings.""" + +from mcts.core.config import ScanConfig +from mcts.core.scanner import Scanner + + +def test_bronze_facts_get_counterfactual_under_trust() -> None: + report = Scanner( + ScanConfig(target="examples/vulnerable-mcp-server/server.py", findings_trust_mode="enforce") + ).run() + perm = next(f for f in report.findings if f.analyzer == "permission_analyzer") + ev = perm.evidence or {} + assert isinstance(ev.get("facts"), list) and ev["facts"] + assert isinstance(ev.get("counterfactual_remediation"), dict) + assert ev.get("confidence_factors") diff --git a/tests/reporting/test_hygiene_bronze_facts.py b/tests/reporting/test_hygiene_bronze_facts.py new file mode 100644 index 0000000..2c4c502 --- /dev/null +++ b/tests/reporting/test_hygiene_bronze_facts.py @@ -0,0 +1,12 @@ +"""Bronze facts on hygiene/readiness meta findings.""" + +from mcts.readiness.heuristics import _finding +from mcts.reporting.models import Severity + + +def test_readiness_heuristic_emits_bronze_facts() -> None: + row = _finding("demo_tool", "HEUR-001", "Missing timeout configuration", Severity.HIGH) + facts = (row.evidence or {}).get("facts") + assert isinstance(facts, list) and facts + assert facts[0]["rule_id"] == "HEUR-001" + assert row.evidence.get("evidence_tier") == "bronze" diff --git a/tests/reporting/test_sarif_coverage_v2.py b/tests/reporting/test_sarif_coverage_v2.py new file mode 100644 index 0000000..3fb3dbf --- /dev/null +++ b/tests/reporting/test_sarif_coverage_v2.py @@ -0,0 +1,44 @@ +"""SARIF coverage filter and v2 per-finding metadata.""" + +from pathlib import Path + +from mcts.core.config import ScanConfig +from mcts.core.scanner import Scanner +from mcts.reporting.sarif import build_sarif + + +def test_sarif_excludes_compliance_coverage_by_default() -> None: + report = Scanner( + ScanConfig(target=Path("examples/vulnerable-mcp-server/server.py"), scoring_mode="both") + ).run() + sarif = build_sarif(report) + compliance = [ + r for r in sarif["runs"][0]["results"] if r.get("properties", {}).get("analyzer") == "compliance" + ] + assert not compliance + assert any(f.analyzer == "compliance" for f in report.findings) + + +def test_sarif_includes_v2_risk_contribution_on_top_findings() -> None: + report = Scanner( + ScanConfig( + target=Path("examples/vulnerable-mcp-server/server.py"), + scoring_mode="v2", + ) + ).run() + sarif = build_sarif(report) + props = sarif["runs"][0]["properties"] + assert "mcts/v2TopContributors" in props + results_with_v2 = [ + r for r in sarif["runs"][0]["results"] if "mcts/v2RiskContribution" in r.get("properties", {}) + ] + assert results_with_v2 + + +def test_sarif_include_coverage_flag() -> None: + report = Scanner(ScanConfig(target=Path("examples/vulnerable-mcp-server/server.py"))).run() + included = build_sarif(report, include_coverage_findings=True) + compliance = [ + r for r in included["runs"][0]["results"] if r.get("properties", {}).get("analyzer") == "compliance" + ] + assert compliance diff --git a/tests/test_compliance.py b/tests/test_compliance.py index 9069db3..190daef 100644 --- a/tests/test_compliance.py +++ b/tests/test_compliance.py @@ -57,6 +57,10 @@ def test_compliance_rows_are_coverage_kind() -> None: ) assert meta assert all(row.finding_kind == "coverage" for row in meta) + gap = next(f for f in meta if f.id == "compliance-mcp-top10-gaps") + facts = (gap.evidence or {}).get("facts") + assert isinstance(facts, list) and facts + assert gap.evidence.get("evidence_tier") == "bronze" def test_compliance_critical_count_uses_template_in_warn() -> None: diff --git a/tests/test_sarif.py b/tests/test_sarif.py index 489c6b3..4087169 100644 --- a/tests/test_sarif.py +++ b/tests/test_sarif.py @@ -32,7 +32,8 @@ def test_sarif_report_structure(example_server_path: Path) -> None: assert sarif["version"] == "2.1.0" assert sarif["runs"][0]["tool"]["driver"]["name"] == "MCTS" - assert len(sarif["runs"][0]["results"]) == len(report.findings) + exportable = [f for f in report.findings if (f.finding_kind or "security") != "coverage"] + assert len(sarif["runs"][0]["results"]) == len(exportable) assert sarif["runs"][0]["properties"]["securityScore"] == report.score.overall assert "taxa" not in sarif["runs"][0] for result in sarif["runs"][0]["results"]: