From 017f82c7a7f4a8079647f6482d15a00232ff243a Mon Sep 17 00:00:00 2001 From: John Morrissey <544926+tachyon-beep@users.noreply.github.com> Date: Fri, 5 Jun 2026 19:31:49 +1000 Subject: [PATCH] fix(scanner): reject spoofed wardline decorators --- src/wardline/scanner/analyzer.py | 4 +- src/wardline/scanner/pipeline.py | 26 ++++++++- .../scanner/taint/decorator_provider.py | 56 ++++++++++++++++--- src/wardline/scanner/taint/provider.py | 8 ++- .../scanner/taint/test_decorator_provider.py | 32 ++++++++++- tests/unit/scanner/test_pipeline.py | 35 ++++++++++++ 6 files changed, 142 insertions(+), 19 deletions(-) diff --git a/src/wardline/scanner/analyzer.py b/src/wardline/scanner/analyzer.py index 2d23fa79..53398acd 100644 --- a/src/wardline/scanner/analyzer.py +++ b/src/wardline/scanner/analyzer.py @@ -131,7 +131,7 @@ def _analyze_inner(self, files: Sequence[Path], config: WardlineConfig, *, root: if self._cache is not None: result = resolve_project_taints( modules=modules, - provider_fingerprint=self._provider.fingerprint(), + provider_fingerprint=parse_stage.provider_fingerprint, summary_cache=self._cache, dirty_modules=frozenset(dirty_modules), config=config, @@ -139,7 +139,7 @@ def _analyze_inner(self, files: Sequence[Path], config: WardlineConfig, *, root: else: result = resolve_project_taints( modules=modules, - provider_fingerprint=self._provider.fingerprint(), + provider_fingerprint=parse_stage.provider_fingerprint, config=config, ) diff --git a/src/wardline/scanner/pipeline.py b/src/wardline/scanner/pipeline.py index 5d2fb46b..130fd5f7 100644 --- a/src/wardline/scanner/pipeline.py +++ b/src/wardline/scanner/pipeline.py @@ -7,7 +7,7 @@ from collections.abc import Sequence from dataclasses import dataclass from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, cast from wardline.core.finding import Finding, Kind, Location, Severity from wardline.core.qualname import module_dotted_name @@ -59,6 +59,15 @@ class ParseProjectOutput: files: list[ParsedFile] parse_findings: list[Finding] dirty_modules: frozenset[str] + provider_fingerprint: str + + +def _provider_fingerprint_for_project(provider: TaintSourceProvider, project_modules: frozenset[str]) -> str: + project_fingerprint = getattr(provider, "fingerprint_for_project", None) + if callable(project_fingerprint): + typed_project_fingerprint = cast(Any, project_fingerprint) + return str(typed_project_fingerprint(project_modules)) + return provider.fingerprint() def run_parse_project_stage(stage_input: ParseProjectInput) -> ParseProjectOutput: @@ -68,6 +77,17 @@ def run_parse_project_stage(stage_input: ParseProjectInput) -> ParseProjectOutpu parse_findings: list[Finding] = [] dirty_modules: set[str] = set() root = stage_input.root.resolve() + project_modules = frozenset( + module + for path in stage_input.files + if ( + module := module_dotted_name( + path.relative_to(root).as_posix() if path.is_relative_to(root) else path.as_posix() + ) + ) + is not None + ) + provider_fingerprint = _provider_fingerprint_for_project(stage_input.provider, project_modules) for path in stage_input.files: relpath = path.relative_to(root).as_posix() if path.is_relative_to(root) else path.as_posix() @@ -90,7 +110,6 @@ def run_parse_project_stage(stage_input: ParseProjectInput) -> ParseProjectOutpu source = path.read_text(encoding="utf-8") source_bytes = source.encode("utf-8") - provider_fingerprint = stage_input.provider.fingerprint() from wardline.scanner.taint.project_resolver import _RESOLVER_VERSION from wardline.scanner.taint.summary import SUMMARY_SCHEMA_VERSION, compute_cache_key @@ -116,7 +135,7 @@ def run_parse_project_stage(stage_input: ParseProjectInput) -> ParseProjectOutpu ) seeds = seed_function_taints( entities, - ctx=SeedContext(module=module, alias_map=alias_map), + ctx=SeedContext(module=module, alias_map=alias_map, project_modules=project_modules), provider=stage_input.provider, ) for ent in entities: @@ -205,6 +224,7 @@ def run_parse_project_stage(stage_input: ParseProjectInput) -> ParseProjectOutpu files=parsed_files, parse_findings=parse_findings, dirty_modules=frozenset(dirty_modules), + provider_fingerprint=provider_fingerprint, ) diff --git a/src/wardline/scanner/taint/decorator_provider.py b/src/wardline/scanner/taint/decorator_provider.py index d924d41c..1948d463 100644 --- a/src/wardline/scanner/taint/decorator_provider.py +++ b/src/wardline/scanner/taint/decorator_provider.py @@ -28,6 +28,7 @@ from wardline.scanner.taint.provider import SeedContext _VOCAB_PREFIX = "wardline.decorators" +_WARDLINE_ROOT = "wardline" _TAINTSTATE_FQN = "wardline.core.taints.TaintState" @@ -78,6 +79,26 @@ def _resolve_decorator_fqn(deco: ast.expr, alias_map: Mapping[str, str]) -> str return _resolve_dotted_fqn(func, alias_map) +def _project_shadows_wardline(project_modules: frozenset[str]) -> bool: + """Return whether the scan target defines a local ``wardline`` package/module. + + Builtin Wardline decorator declarations must refer to the installed marker + package, not a module supplied by the scanned project. If the project itself + contains ``wardline`` or anything below it, Python import resolution can bind + ``wardline.decorators`` to attacker-controlled code, so builtin matching fails + closed for the scan. + """ + return any(module == _WARDLINE_ROOT or module.startswith(_WARDLINE_ROOT + ".") for module in project_modules) + + +def _is_builtin_decorator_fqn(fqn: str, canonical_name: str, module_prefix: str) -> bool: + """Return whether *fqn* is one of Wardline's exact builtin decorator exports.""" + return fqn in { + f"{module_prefix}.{canonical_name}", + f"{module_prefix}.trust.{canonical_name}", + } + + def _level_token(value: ast.expr, alias_map: Mapping[str, str]) -> str | None: """Extract a TaintState name token from a keyword-argument value node. @@ -179,7 +200,7 @@ def taint_for(self, entity: Entity, ctx: SeedContext) -> SeedResult: candidates: list[FunctionTaint] = [] unprovable: list[str] = [] for deco in entity.node.decorator_list: - ft, unprov = self._match(deco, ctx.alias_map) + ft, unprov = self._match(deco, ctx.alias_map, ctx.project_modules) if ft is not None: candidates.append(ft) elif unprov is not None: @@ -213,7 +234,21 @@ def fingerprint(self) -> str: return f"decorator-vocab:{REGISTRY_VERSION}" return f"decorator-vocab:{REGISTRY_VERSION}+grammar:{_grammar_digest(self._boundary_types)}" - def _match(self, deco: ast.expr, alias_map: Mapping[str, str]) -> tuple[FunctionTaint | None, str | None]: + def fingerprint_for_project(self, project_modules: frozenset[str]) -> str: + """Fingerprint declaration inputs that are external to a single module. + + Builtin seeds depend on whether the scanned project shadows ``wardline``; + bind that fact into summary-cache keys so a warm cache cannot reuse trusted + summaries across shadowed and unshadowed scan roots. + """ + return f"{self.fingerprint()}:wardline-shadowed={int(_project_shadows_wardline(project_modules))}" + + def _match( + self, + deco: ast.expr, + alias_map: Mapping[str, str], + project_modules: frozenset[str], + ) -> tuple[FunctionTaint | None, str | None]: """Match one decorator against the loaded boundary types. Returns: ``(seed, None)`` — a boundary type matched and its levels proved; @@ -225,15 +260,18 @@ def _match(self, deco: ast.expr, alias_map: Mapping[str, str]) -> tuple[Function fqn = _resolve_decorator_fqn(deco, alias_map) if fqn is None: return None, None - # A decorator matches a boundary type when its FQN is UNDER the type's module - # prefix and its final segment is the canonical name. This accepts BOTH the - # package re-export (``wardline.decorators.trusted``) and the submodule path - # (``wardline.decorators.trust.trusted``) — preserving the pre-Track-2 matcher - # exactly (it used the same prefix + last-segment rule), and generalizing it - # consistently for custom types. + # Builtin Wardline markers are security-sensitive defaults. Match only the + # exact public re-export or implementation-module export, and reject them + # when the scanned project itself defines ``wardline`` (which would shadow + # the real marker package under normal import resolution). Custom grammar + # markers keep the documented prefix + canonical-name matching behavior. last = fqn.rsplit(".", 1)[-1] + wardline_shadowed = _project_shadows_wardline(project_modules) for bt in self._boundary_types: - if last != bt.canonical_name or not fqn.startswith(bt.module_prefix + "."): + if bt.builtin: + if wardline_shadowed or not _is_builtin_decorator_fqn(fqn, bt.canonical_name, bt.module_prefix): + continue + elif last != bt.canonical_name or not fqn.startswith(bt.module_prefix + "."): continue levels: dict[str, TaintState] = {} unreadable = False diff --git a/src/wardline/scanner/taint/provider.py b/src/wardline/scanner/taint/provider.py index 0ee43a63..f81d8b81 100644 --- a/src/wardline/scanner/taint/provider.py +++ b/src/wardline/scanner/taint/provider.py @@ -30,13 +30,15 @@ class SeedContext: ``alias_map`` is the file's ``{local_name: fully_qualified_name}`` import map (from ``build_import_alias_map``); a provider uses it to resolve aliased - decorator names against the trust vocabulary. Defaults to empty so callers - that do not seed from decorators (e.g. the trivial default provider's tests) - need not supply it. + decorator names against the trust vocabulary. ``project_modules`` contains the + modules discovered in the scanned project, allowing providers to fail closed + when a declaration package would be shadowed by project-local code. Defaults + keep callers that do not seed from decorators lightweight. """ module: str alias_map: Mapping[str, str] = field(default_factory=dict) + project_modules: frozenset[str] = field(default_factory=frozenset) @dataclass(frozen=True, slots=True) diff --git a/tests/unit/scanner/taint/test_decorator_provider.py b/tests/unit/scanner/taint/test_decorator_provider.py index 30be6a2a..3806fd60 100644 --- a/tests/unit/scanner/taint/test_decorator_provider.py +++ b/tests/unit/scanner/taint/test_decorator_provider.py @@ -11,12 +11,17 @@ from wardline.scanner.taint.provider import FunctionTaint, SeedContext -def _seed(src: str, *, module: str = "m") -> dict[str, FunctionTaint | None]: +def _seed( + src: str, + *, + module: str = "m", + project_modules: frozenset[str] = frozenset(), +) -> dict[str, FunctionTaint | None]: """Run the provider over every function entity in *src*; map qualname -> result.""" tree = ast.parse(src) alias_map = build_import_alias_map(tree, module_path=module) entities = discover_file_entities(tree, module=module, path="m.py") - ctx = SeedContext(module=module, alias_map=alias_map) + ctx = SeedContext(module=module, alias_map=alias_map, project_modules=project_modules) provider = DecoratorTaintSourceProvider() # .taint: assertions here compare the declared FunctionTaint; the unprovable- # boundary signal (Track 2 T2.4) is exercised separately in tests/grammar/. @@ -233,3 +238,26 @@ def test_wardline_prefixed_but_unknown_decorator_is_no_opinion() -> None: # (``wardline.decorators.bogus``) — canonical not in REGISTRY -> no opinion. out = _seed("import wardline.decorators\n@wardline.decorators.bogus\ndef f():\n return 1\n") assert out["m.f"] is None + + +def test_builtin_decorator_requires_exact_known_export() -> None: + # Prefix + final-component matching would accept this spoofable nested path. + # Builtin Wardline markers must be exact public/implementation exports. + out = _seed("from wardline.decorators import evil\n@evil.trusted\ndef f():\n return 1\n") + assert out["m.f"] is None + + +def test_builtin_decorator_fails_closed_when_project_shadows_wardline() -> None: + # A scanned project that defines wardline.decorators controls what this import + # means at runtime, so the default provider must not anchor it as Wardline's real + # marker package. + out = _seed( + "from wardline.decorators import trusted\n@trusted\ndef f():\n return 1\n", + project_modules=frozenset({"app", "wardline", "wardline.decorators"}), + ) + assert out["m.f"] is None + + +def test_builtin_decorator_accepts_implementation_module_export() -> None: + out = _seed("from wardline.decorators.trust import trusted\n@trusted\ndef f():\n return 1\n") + assert out["m.f"] == FunctionTaint(T.INTEGRAL, T.INTEGRAL) diff --git a/tests/unit/scanner/test_pipeline.py b/tests/unit/scanner/test_pipeline.py index 75e27a41..91729f69 100644 --- a/tests/unit/scanner/test_pipeline.py +++ b/tests/unit/scanner/test_pipeline.py @@ -67,3 +67,38 @@ def test_parse_project_stage_returns_typed_modules_and_dirty_scope(tmp_path) -> assert result.files[0].relpath == "m.py" assert result.files[0].module == "m" assert result.files[0].entities[0].qualname == "m.read_raw" + + +def test_parse_project_stage_fails_closed_for_shadowed_wardline_decorators(tmp_path) -> None: + app = tmp_path / "app.py" + app.write_text( + "from wardline.decorators import trusted\n" + "@trusted\n" + "def unsafe(p):\n" + " return p\n", + encoding="utf-8", + ) + shadow_pkg = tmp_path / "wardline" / "decorators" + shadow_pkg.mkdir(parents=True) + (tmp_path / "wardline" / "__init__.py").write_text("", encoding="utf-8") + (shadow_pkg / "__init__.py").write_text( + "def trusted(fn):\n" + " return fn\n", + encoding="utf-8", + ) + + result = run_parse_project_stage( + ParseProjectInput( + files=(app, tmp_path / "wardline" / "__init__.py", shadow_pkg / "__init__.py"), + root=tmp_path, + provider=DecoratorTaintSourceProvider(), + config=WardlineConfig(), + star_exports=vocabulary_star_exports(), + ) + ) + + app_module = next(module for module in result.modules if module.module_path == "app") + seed = app_module.seeds["app.unsafe"] + assert seed.source == "default" + assert seed.body_taint == T.UNKNOWN_RAW + assert "wardline-shadowed=1" in result.provider_fingerprint