Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions src/wardline/core/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from __future__ import annotations

import hashlib
from dataclasses import dataclass, replace
from dataclasses import dataclass, field, replace
from datetime import date
from pathlib import Path
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -45,7 +45,7 @@ def _fp(*parts: str) -> str:
@dataclass(frozen=True, slots=True)
class ScanSummary:
total: int # every finding (defects + facts/metrics)
active: int # non-suppressed DEFECTs the gate population
active: int # non-suppressed DEFECTs in the emitted findings
baselined: int
waived: int
judged: int
Expand All @@ -66,6 +66,10 @@ class ScanResult:
# this exact run instead of re-deriving. Never serialised over MCP.
context: AnalysisContext | None
scanned_paths: tuple[str, ...] = ()
# Unsuppressed findings used by fail-on gates. Repository-controlled baseline,
# waiver, and judged files annotate emitted findings, but must not be able to
# hide defects from CI gates that run on untrusted pull-request content.
gate_findings: list[Finding] = field(default_factory=list)


@dataclass(frozen=True, slots=True)
Expand Down Expand Up @@ -186,6 +190,9 @@ def run_scan(
waivers = WaiverSet(parse_waivers(cfg.waivers))
judged = load_judged(root / ".wardline" / "judged.yaml")
findings = apply_suppressions(raw, baseline, waivers, today=date.today(), judged=judged)
# Keep a separate gate population that applies only operator-supplied scan
# scoping (for example --new-since), not repository-controlled suppressions.
gate_findings = list(raw)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve line-less defect quarantine in gate population

When a rule or local pack emits a normal-file DEFECT with line_start=None, apply_suppressions intentionally replaces it with a WLN-ENGINE-LINELESS-DEFECT FACT so the invalid, collision-prone defect is not treated as an active finding. Building gate_findings directly from raw bypasses that tool-owned invariant, so scan --fail-on can now fail on a defect that is omitted from the emitted findings and was previously quarantined for fingerprint-safety reasons; only repository-controlled baseline/waiver/judged suppressions should be skipped here.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor waivers from explicit operator configs

This treats every waiver as untrusted by gating on the completely raw findings, but run_scan(..., config_path=...) and the CLI --config path can be an operator-supplied config outside the repository. In that CI setup, centrally managed waivers in the explicit config are no longer honored by --fail-on, even though the nearby config handling explicitly preserves operator-provided policy; only waivers loaded from the repository's default wardline.yaml should be excluded from the gate population.

Useful? React with 👍 / 👎.


if new_since is not None:
changed_files = get_changed_files_since(new_since, root)
Expand All @@ -195,18 +202,22 @@ def run_scan(
else:
affected = set()

new_findings = []
for f in findings:
if f.kind is Kind.DEFECT and f.suppressed is SuppressionState.ACTIVE:
is_new = (f.location.path in changed_files) or (f.qualname is not None and f.qualname in affected)
if not is_new:
f = replace(
f,
suppressed=SuppressionState.BASELINED,
suppression_reason=f"delta: unchanged since {new_since}",
)
new_findings.append(f)
findings = new_findings
def apply_delta_scope(candidates: list[Finding]) -> list[Finding]:
scoped = []
for f in candidates:
if f.kind is Kind.DEFECT and f.suppressed is SuppressionState.ACTIVE:
is_new = (f.location.path in changed_files) or (f.qualname is not None and f.qualname in affected)
if not is_new:
f = replace(
f,
suppressed=SuppressionState.BASELINED,
suppression_reason=f"delta: unchanged since {new_since}",
)
scoped.append(f)
return scoped

findings = apply_delta_scope(findings)
gate_findings = apply_delta_scope(gate_findings)

defects = [f for f in findings if f.kind is Kind.DEFECT]
summary = ScanSummary(
Expand All @@ -227,12 +238,14 @@ def run_scan(
path.relative_to(resolved_root).as_posix() if path.is_relative_to(resolved_root) else path.as_posix()
for path in files
),
gate_findings=gate_findings,
)


def gate_decision(result: ScanResult, fail_on: Severity | None) -> GateDecision:
"""Translate a scan into a pass/fail verdict. A trip is data, not an error."""
if fail_on is None:
return GateDecision(tripped=False, fail_on=None, exit_class=0)
tripped = gate_trips(result.findings, fail_on)
gate_findings = result.gate_findings or result.findings

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Surface explanations for gate-only suppressed defects

When the only above-threshold defect is baselined, waived, or judged, this switch makes gate.tripped true while the MCP response still emits that finding as suppressed; _scan only attaches inline explanations for SuppressionState.ACTIVE findings (src/wardline/mcp/server.py:160-164). Agents using scan with explain: true and fail_on can therefore receive a failing gate with no explained active defect to fix, so the gate-relevant population needs to be exposed/explained separately or the emitted finding needs a gate-relevant marker.

Useful? React with 👍 / 👎.

tripped = gate_trips(gate_findings, fail_on)
return GateDecision(tripped=tripped, fail_on=fail_on.value, exit_class=1 if tripped else 0)
22 changes: 12 additions & 10 deletions tests/unit/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def test_scan_fail_on_inert_without_flag(tmp_path) -> None:
assert res.exit_code == 0, res.output # no --fail-on -> never gates


def test_scan_baseline_suppresses_and_clears_gate(tmp_path) -> None:
def test_scan_baseline_annotates_but_does_not_clear_gate(tmp_path) -> None:
proj = tmp_path / "proj"
proj.mkdir()
_write(proj, "svc.py", _LEAKY)
Expand All @@ -359,9 +359,10 @@ def test_scan_baseline_suppresses_and_clears_gate(tmp_path) -> None:
"version: 1\nentries:\n - fingerprint: " + fp + "\n rule_id: PY-WL-101\n path: svc.py\n message: m\n",
encoding="utf-8",
)
# Second scan: the defect is baselined -> annotated + gate clears.
# Second scan: the defect is baselined for reporting, but fail-on still trips
# because repository-controlled suppressions must not bypass the CI gate.
res = CliRunner().invoke(scan, [str(proj), "--output", str(out), "--fail-on", "ERROR"])
assert res.exit_code == 0, res.output
assert res.exit_code == 1, res.output
findings2 = [_json.loads(ln) for ln in out.read_text().splitlines() if ln.strip()]
leak = next(f for f in findings2 if f["rule_id"] == "PY-WL-101")
assert leak["suppressed"] == "baselined" # annotate-and-keep
Expand Down Expand Up @@ -466,10 +467,11 @@ def test_baseline_create_writes_file_and_suppresses_next_scan(tmp_path) -> None:
doc = _yaml.safe_load(bl.read_text())
assert doc["version"] == 1 and len(doc["entries"]) >= 1
assert "baselined" in res.output
# Next scan: the captured defect is now baselined, gate clears.
# Next scan: the captured defect is now baselined for reporting, but the
# untrusted repository baseline must not clear the fail-on gate.
out = tmp_path / "f.jsonl"
res2 = runner.invoke(scan, [str(proj), "--output", str(out), "--fail-on", "ERROR"])
assert res2.exit_code == 0, res2.output
assert res2.exit_code == 1, res2.output


def test_baseline_create_refuses_if_exists(tmp_path) -> None:
Expand Down Expand Up @@ -957,9 +959,9 @@ def test_judge_low_confidence_fp_held_back_from_write(monkeypatch, tmp_path) ->
assert not (proj / ".wardline" / "judged.yaml").exists()


def test_judge_write_then_scan_gate_is_cleared(monkeypatch, tmp_path) -> None:
# The regression that pins the headline panel finding: a JUDGED FP written by
# `judge --write` must suppress the finding for `scan --fail-on` too.
def test_judge_write_then_scan_still_trips_gate(monkeypatch, tmp_path) -> None:
# JUDGED findings are still annotated in scan output, but repository-controlled
# judged state must not suppress the fail-on gate.
import wardline.cli.judge as judge_cli
from wardline.cli.main import cli

Expand All @@ -974,9 +976,9 @@ def test_judge_write_then_scan_gate_is_cleared(monkeypatch, tmp_path) -> None:
jres = CliRunner().invoke(cli, ["judge", str(proj), "--write"])
assert jres.exit_code == 0, jres.output
assert (proj / ".wardline" / "judged.yaml").exists()
# 3) scan now sees the JUDGED suppression -> gate cleared, summary shows it
# 3) scan now sees the JUDGED suppression for reporting, but the gate still trips.
after = CliRunner().invoke(cli, ["scan", str(proj), "--output", str(out), "--fail-on", "INFO"])
assert after.exit_code == 0, after.output
assert after.exit_code == 1, after.output
assert "judged" in after.output


Expand Down
56 changes: 52 additions & 4 deletions tests/unit/core/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from wardline.core.errors import ConfigError
from wardline.core.finding import Kind, Severity, SuppressionState
from wardline.core.finding import Finding, Kind, Location, Severity, SuppressionState
from wardline.core.run import ScanResult, ScanSummary, gate_decision, run_scan

FIXTURE = Path("tests/fixtures/sample_project")
Expand All @@ -28,7 +28,7 @@ def test_run_scan_returns_findings_summary_and_context() -> None:
# invariants (total == len(findings); active == active-defect count), which
# hold for any fixture regardless of finding count.
assert result.summary.total == len(result.findings)
# active is the count of non-suppressed DEFECTs (the gate population)
# active is the count of non-suppressed DEFECTs in the emitted findings
active = sum(1 for f in result.findings if f.kind is Kind.DEFECT and f.suppressed is SuppressionState.ACTIVE)
assert result.summary.active == active
# context is carried for explain_finding to reuse
Expand All @@ -47,6 +47,34 @@ def test_gate_decision_trips_on_active_error(tmp_path: Path) -> None:
assert decision.fail_on == "ERROR"


def test_gate_decision_uses_unsuppressed_gate_population() -> None:
suppressed = Finding(
rule_id="PY-WL-101",
message="m",
severity=Severity.ERROR,
kind=Kind.DEFECT,
location=Location(path="svc.py", line_start=1),
fingerprint="a" * 64,
suppressed=SuppressionState.BASELINED,
)
active_gate_copy = Finding(
rule_id="PY-WL-101",
message="m",
severity=Severity.ERROR,
kind=Kind.DEFECT,
location=Location(path="svc.py", line_start=1),
fingerprint="a" * 64,
)
result = ScanResult(
findings=[suppressed],
summary=ScanSummary(total=1, active=0, baselined=1, waived=0, judged=0),
files_scanned=1,
context=None,
gate_findings=[active_gate_copy],
)

assert gate_decision(result, Severity.ERROR).tripped is True

def test_gate_decision_none_threshold_never_trips() -> None:
result = run_scan(FIXTURE)
decision = gate_decision(result, None)
Expand Down Expand Up @@ -111,9 +139,29 @@ def test_run_scan_baselined_count_distinguishes_categories(tmp_path: Path) -> No
assert result.summary.waived == 0
assert result.summary.judged == 0
assert result.summary.active == 0
# And the gate clears now that the only ERROR defect is suppressed.
assert gate_decision(result, Severity.ERROR).tripped is False
# The emitted finding is suppressed, but fail-on gates over the unsuppressed
# population so repository-controlled baselines cannot hide defects in CI.
assert gate_decision(result, Severity.ERROR).tripped is True


def test_gate_decision_ignores_repo_controlled_waivers(tmp_path: Path) -> None:
proj = tmp_path / "proj"
proj.mkdir()
(proj / "svc.py").write_text(_LEAKY, encoding="utf-8")

first = run_scan(proj)
leak = next(f for f in first.findings if f.rule_id == "PY-WL-101")
(proj / "wardline.yaml").write_text(
"waivers:\n"
f" - fingerprint: {leak.fingerprint}\n"
" reason: attacker-controlled waiver\n",
encoding="utf-8",
)

result = run_scan(proj)
assert result.summary.waived == 1
assert result.summary.active == 0
assert gate_decision(result, Severity.ERROR).tripped is True

def test_run_scan_counts_unanalyzed_parse_error(tmp_path: Path) -> None:
# (b) A file that cannot be parsed is discovered-but-not-analysed: a
Expand Down
Loading