diff --git a/CAPABILITIES.md b/CAPABILITIES.md index c6350db..6360d9c 100644 --- a/CAPABILITIES.md +++ b/CAPABILITIES.md @@ -9,6 +9,8 @@ - SARIF 2.1.0 export - Scan history storage - Machine-readable bridge for editor integration +- Confidence-aware filtering for fixtures, tests, docs, and generated-looking data +- Per-analyzer runtime isolation so one failed analyzer does not abort the whole scan ## VS Code integration @@ -19,9 +21,18 @@ - Scan-on-save - SARIF export command - Disabled-rule filtering in the client +- Minimum-confidence filtering in the client + +## Dependency coverage + +- Python requirements files +- Python `pyproject.toml` dependencies +- npm `package-lock.json` dependencies +- Static local advisory database for offline use ## Non-goals in this repository - No web upload UI - No CSV analyzer - No duplicate extension and CLI logic paths +- No live package-advisory network lookup diff --git a/DEPLOYMENT.md b/DEPLOYMENT.md index 6bdd161..7f42cce 100644 --- a/DEPLOYMENT.md +++ b/DEPLOYMENT.md @@ -8,11 +8,14 @@ The extension is packaged from the repository root with `vsce`. The package incl - `src/contractguard/` Python engine - `rules/` bundled rule files - `media/` extension assets +- `python-requirements.txt` runtime dependency list ## Runtime model ContractGuard runs its analyzers out of process through `python -m contractguard.bridge`. The extension sets `PYTHONPATH` to its bundled `src/` directory so the engine can run without a separate package install step inside the extension host. +The 2.0 scanner filters low-confidence fixture findings by default and reports analyzer runtime failures as findings instead of aborting an entire workspace scan. + ## Publish checklist 1. Build the extension with `tsc`. diff --git a/INSTRUCTIONS.md b/INSTRUCTIONS.md index c2363b1..9adc727 100644 --- a/INSTRUCTIONS.md +++ b/INSTRUCTIONS.md @@ -5,6 +5,7 @@ ```powershell .\.venv\Scripts\python.exe -m contractguard.cli analyze --type all --path . --score .\.venv\Scripts\python.exe -m contractguard.cli analyze --type secrets --path . --report-sarif contractguard.sarif +.\.venv\Scripts\python.exe -m contractguard.cli analyze --type all --path . --min-confidence low --score ``` ## Bridge @@ -12,6 +13,7 @@ ```powershell $env:PYTHONPATH = (Resolve-Path .\src).Path .\.venv\Scripts\python.exe -m contractguard.bridge scan --path . --analyzer all --include-sarif +.\.venv\Scripts\python.exe -m contractguard.bridge scan --path . --analyzer all --min-confidence medium ``` ## VS Code diff --git a/README.md b/README.md index 3b3aab1..951453f 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,8 @@ - Publish inline diagnostics in the editor - Export SARIF for external security workflows - Show an overall security score in the status bar +- Filter low-confidence fixture/doc/test findings by default +- Continue scans when one analyzer has a runtime problem ## What it checks @@ -21,6 +23,8 @@ - Insecure configuration - Dockerfile issues - Dependency vulnerabilities +- Python `requirements.txt` and `pyproject.toml` +- npm `package-lock.json` advisories from the bundled local database ## Commands @@ -43,12 +47,15 @@ If the Python runtime dependencies are missing, run: - `contractguard.pythonPath` - `contractguard.scanOnSave` - `contractguard.scanDebounceMs` +- `contractguard.scanOnSaveScope` - `contractguard.enabledAnalyzers` - `contractguard.disabledRules` +- `contractguard.minimumConfidence` - `contractguard.rulesDirectory` - `contractguard.sqlExplainDatabase` ## Notes - The extension runs analysis locally. +- The default minimum confidence is `medium`; use `low` for audit mode when you want sample/test fixtures included. - SARIF export is available for CI and external security tooling. diff --git a/package-lock.json b/package-lock.json index fe06deb..2972645 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "contract-guard", - "version": "1.3.0", + "version": "2.1.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "contract-guard", - "version": "1.3.0", + "version": "2.1.0", "license": "Apache-2.0", "devDependencies": { "@types/node": "^20.16.1", @@ -102,7 +102,7 @@ } }, "node_modules/@azure/core-tracing": { - "version": "1.3.1", + "version": "2.0.0", "resolved": "https://registry.npmjs.org/@azure/core-tracing/-/core-tracing-1.3.1.tgz", "integrity": "sha512-9MWKevR7Hz8kNzzPLfX4EAtGM2b8mr50HPDBvio96bURP/9C+HjdH3sBlLSNNrvRAr5/k/svoH457gB5IKpmwQ==", "dev": true, diff --git a/package.json b/package.json index 291d226..7812aad 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "contract-guard", "displayName": "contract-guard", "description": "A VS Code extension that finds security issues in code, configs, queries, Dockerfiles, and secrets.", - "version": "1.3.1", + "version": "2.1.0", "publisher": "BlackplaneSystems", "license": "Apache-2.0", "icon": "media/icon.png", @@ -38,7 +38,8 @@ "main": "./dist/extension.js", "files": [ "dist/**", - "src/contractguard/**", + "src/contractguard/*.py", + "src/contractguard/analyzers/*.py", "rules/**", "media/**", "python-requirements.txt", @@ -120,10 +121,21 @@ }, "contractguard.scanOnSaveScope": { "type": "string", - "default": "workspace", + "default": "currentFile", "enum": ["workspace", "currentFile"], "description": "What to scan after saving a file: the whole workspace or just the active file." }, + "contractguard.minimumConfidence": { + "type": "string", + "default": "medium", + "enum": ["low", "medium", "high"], + "description": "Minimum finding confidence shown by the VS Code client and requested from the scanner." + }, + "contractguard.includeFixtures": { + "type": "boolean", + "default": false, + "description": "Include findings from fixture-like paths (tests, samples, docs)." + }, "contractguard.enabledAnalyzers": { "type": "array", "default": [ @@ -164,7 +176,7 @@ }, "scripts": { "build": "tsc -p ./tsconfig.json", - "package": "node -e \"require('fs').mkdirSync('dist-vsix',{recursive:true})\" && vsce package --out dist-vsix/contractguard-1.3.1.vsix", + "package": "node -e \"require('fs').mkdirSync('dist-vsix',{recursive:true})\" && vsce package --out dist-vsix/contractguard-2.1.0.vsix", "prepackage": "npm run build" }, "devDependencies": { diff --git a/pyproject.toml b/pyproject.toml index 228404a..80a8303 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "contractguard" -version = "1.3.1" +version = "2.1.0" description = "ContractGuard security analysis core for VS Code and CI workflows." readme = "README.md" license = {text = "Apache-2.0"} diff --git a/src/contractguard/__init__.py b/src/contractguard/__init__.py index 57b9bbc..432ed75 100644 --- a/src/contractguard/__init__.py +++ b/src/contractguard/__init__.py @@ -1,3 +1,3 @@ """ContractGuard core package.""" -__version__ = "1.3.1" +__version__ = "2.1.0" diff --git a/src/contractguard/analyzers/config_analyzer.py b/src/contractguard/analyzers/config_analyzer.py index 227aa9c..d2752ec 100644 --- a/src/contractguard/analyzers/config_analyzer.py +++ b/src/contractguard/analyzers/config_analyzer.py @@ -13,6 +13,15 @@ import yaml from contractguard.engine import Finding, Severity, load_rules_for_analyzer, run_rules +from contractguard.analyzers.file_filters import is_fixture_path, should_skip_large_file, should_skip_path + +_SKIP_CONFIG_NAMES = { + "package-lock.json", + "package.json", + "pnpm-lock.yaml", + "poetry.lock", + "yarn.lock", +} def extract_facts(content: str, filename: str = "") -> dict[str, Any]: @@ -63,7 +72,8 @@ def extract_facts(content: str, filename: str = "") -> dict[str, Any]: facts["insecure_secret_key"] = True facts["dangerous_settings_count"] += 1 - if re.search(r'(?:password|passwd|pwd)\s*[=:]\s*[\'"]?(?:admin|password|123456|root|default|test)[\'"]?', content, re.I): + if re.search(r'(?:password|passwd|pwd)\s*[=:]\s*[\'"]?(?:admin|password|123456|root|default|test)[\'"]?', content, re.I) or \ + re.search(r'\$\{[^}]*?(?:password|passwd|pwd)[^}]*:-(?:admin|password|postgres|root|default|test)', content, re.I): facts["default_password"] = True facts["dangerous_settings_count"] += 1 @@ -71,7 +81,15 @@ def extract_facts(content: str, filename: str = "") -> dict[str, Any]: facts["exposed_admin_port"] = True facts["dangerous_settings_count"] += 1 - if re.search(r'(?:ssl|tls|https)[_\w]*\s*[=:]\s*(?:false|0|off|no|disabled)', content, re.I): + ssl_disabled_lines = [ + line for line in lines + if re.search(r'(?:ssl|tls|https)[_\w]*\s*[=:]\s*(?:false|0|off|no|disabled)', line, re.I) + ] + ssl_disabled_lines = [ + line for line in ssl_disabled_lines + if not ("smtp" in line.lower() and re.search(r'smtp_use_starttls\s*[=:]\s*(?:true|1|yes|on)', content, re.I)) + ] + if ssl_disabled_lines: facts["ssl_disabled"] = True facts["dangerous_settings_count"] += 1 @@ -106,12 +124,16 @@ def load_config_files(path: str | Path) -> list[tuple[str, str]]: if path.is_dir(): for f in sorted(path.rglob("*")): + if should_skip_path(f) or should_skip_large_file(f) or f.name.casefold() in _SKIP_CONFIG_NAMES: + continue if f.is_file() and (f.suffix.lower() in config_exts or f.stem.lower() in config_names): try: files.append((str(f), f.read_text(encoding="utf-8", errors="replace"))) except Exception: continue elif path.is_file(): + if should_skip_path(path) or should_skip_large_file(path) or path.name.casefold() in _SKIP_CONFIG_NAMES: + return files try: files.append((str(path), path.read_text(encoding="utf-8", errors="replace"))) except Exception: @@ -131,6 +153,8 @@ def analyze(path: str | Path, rules_dir: str | Path) -> list[Finding]: for f in findings: f.location = source f.context = f"Config file: {Path(source).name}" + if is_fixture_path(source): + f.confidence = "low" all_findings.extend(findings) return all_findings diff --git a/src/contractguard/analyzers/dependency_analyzer.py b/src/contractguard/analyzers/dependency_analyzer.py index 823e346..86fac47 100644 --- a/src/contractguard/analyzers/dependency_analyzer.py +++ b/src/contractguard/analyzers/dependency_analyzer.py @@ -6,11 +6,14 @@ from __future__ import annotations +import json import re +import tomllib from pathlib import Path from typing import Any from contractguard.engine import Finding, Severity, load_rules_for_analyzer, run_rules +from contractguard.analyzers.file_filters import is_fixture_path # Local vulnerability database — curated set of high-profile CVEs # Format: (package, operator, version, cve, severity, description) @@ -26,7 +29,6 @@ ("pillow", "<", "10.0.1", "CVE-2023-44271", "warning", "DoS via uncontrolled resource consumption"), ("jinja2", "<", "3.1.3", "CVE-2024-22195", "critical", "XSS via xmlattr filter"), ("numpy", "<", "1.22.0", "CVE-2021-41496", "warning", "Buffer overflow in array_from_pyobj"), - ("pyyaml", "<", "6.0.1", "CVE-2023-XXXXX", "warning", "Arbitrary code execution via YAML load"), ("sqlparse", "<", "0.4.4", "CVE-2023-30608", "warning", "ReDoS via crafted SQL"), ("aiohttp", "<", "3.9.0", "CVE-2023-49081", "critical", "HTTP request smuggling"), ("fastapi", "<", "0.109.0", "CVE-2024-24762", "warning", "DoS via multipart form data"), @@ -36,14 +38,17 @@ ("setuptools", "<", "65.5.1", "CVE-2022-40897", "warning", "ReDoS in package_index"), ("pip", "<", "23.3", "CVE-2023-5752", "info", "Dependency confusion via --extra-index-url"), ("starlette", "<", "0.36.2", "CVE-2024-24762", "warning", "DoS via multipart body"), - ("pydantic", "<", "1.10.13", "CVE-2024-XXXXX", "info", "Information disclosure via error messages"), ("twisted", "<", "23.10.0", "CVE-2023-46137", "critical", "HTTP request smuggling"), - ("scrapy", "<", "2.11.0", "CVE-2023-XXXXX", "warning", "Cookie leak to third-party domains"), ("ansible", "<", "8.5.0", "CVE-2023-5764", "critical", "Template injection in tasks"), ("gunicorn", "<", "22.0.0", "CVE-2024-1135", "critical", "HTTP request smuggling via transfer-encoding"), - ("transformers", "<", "4.36.0", "CVE-2023-XXXXX", "critical", "Arbitrary code execution in model loading"), ("lxml", "<", "4.9.3", "CVE-2022-2309", "warning", "NULL pointer dereference"), - ("black", "<", "24.1.0", "CVE-2024-XXXXX", "info", "Jupyter notebook parsing issue"), +] + +_NPM_VULN_DB: list[tuple[str, str, str, str, str, str]] = [ + ("lodash", "<", "4.17.21", "CVE-2021-23337", "critical", "Command injection via template"), + ("minimist", "<", "1.2.6", "CVE-2021-44906", "critical", "Prototype pollution"), + ("follow-redirects", "<", "1.15.6", "CVE-2024-28849", "warning", "Authorization header leak"), + ("semver", "<", "7.5.2", "CVE-2022-25883", "warning", "Regular expression denial of service"), ] @@ -73,9 +78,12 @@ def _version_matches(installed: str, op: str, vuln_version: str) -> bool: return False -def extract_facts_from_requirements(content: str) -> dict[str, Any]: - """Parse requirements.txt and check against vulnerability database.""" - facts: dict[str, Any] = { +def _normalize_package_name(name: str) -> str: + return name.lower().replace("-", "_").replace(".", "_") + + +def _empty_facts() -> dict[str, Any]: + return { "vulnerable_count": 0, "total_packages": 0, "unpinned_count": 0, @@ -85,6 +93,43 @@ def extract_facts_from_requirements(content: str) -> dict[str, Any]: "critical_vuln_count": 0, } + +def _record_unpinned(facts: dict[str, Any]) -> None: + facts["unpinned_count"] += 1 + facts["has_unpinned_packages"] = True + + +def _record_vulnerability( + facts: dict[str, Any], + package: str, + version: str, + advisory: str, + severity: str, + description: str, +) -> None: + facts["vulnerable_count"] += 1 + facts["has_vulnerable_packages"] = True + facts["vulnerabilities"].append((package, version, advisory, severity, description)) + if severity == "critical": + facts["critical_vuln_count"] += 1 + + +def _check_vulnerability_db( + facts: dict[str, Any], + package_name: str, + package_version: str, + db: list[tuple[str, str, str, str, str, str]], +) -> None: + normalized = _normalize_package_name(package_name) + for vuln_pkg, op, vuln_ver, advisory, severity, desc in db: + if normalized == _normalize_package_name(vuln_pkg) and _version_matches(package_version, op, vuln_ver): + _record_vulnerability(facts, package_name, package_version, advisory, severity, desc) + + +def extract_facts_from_requirements(content: str) -> dict[str, Any]: + """Parse requirements.txt and check against vulnerability database.""" + facts = _empty_facts() + for line in content.splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#") or stripped.startswith("-"): @@ -92,41 +137,132 @@ def extract_facts_from_requirements(content: str) -> dict[str, Any]: facts["total_packages"] += 1 - # Parse package==version, package>=version, package~=version, or just package - m = re.match(r"^([a-zA-Z0-9_.-]+)\s*(?:[>=<~!=]+\s*(\S+))?", stripped) + # Parse package==version, package>=version, package~=version, or just package. + m = re.match(r"^([a-zA-Z0-9_.-]+)\s*(?:(==|>=|<=|~=|!=|>|<)\s*(\S+))?", stripped) if not m: continue - pkg_name = m.group(1).lower().replace("-", "_").replace(".", "_") - pkg_version = m.group(2) + pkg_name = m.group(1) + operator = m.group(2) + pkg_version = m.group(3) - if not pkg_version: - facts["unpinned_count"] += 1 - facts["has_unpinned_packages"] = True + if not pkg_version or operator != "==": + _record_unpinned(facts) continue - for vuln_pkg, op, vuln_ver, cve, sev, desc in _VULN_DB: - vuln_pkg_normalized = vuln_pkg.lower().replace("-", "_").replace(".", "_") - if pkg_name == vuln_pkg_normalized: - if _version_matches(pkg_version, op, vuln_ver): - facts["vulnerable_count"] += 1 - facts["has_vulnerable_packages"] = True - facts["vulnerabilities"].append((m.group(1), pkg_version, cve, sev, desc)) - if sev == "critical": - facts["critical_vuln_count"] += 1 + _check_vulnerability_db(facts, pkg_name, pkg_version, _VULN_DB) + + return facts + + +def extract_facts_from_pyproject(content: str) -> dict[str, Any]: + facts = _empty_facts() + try: + data = tomllib.loads(content) + except tomllib.TOMLDecodeError: + return facts + + dependencies: list[str] = [] + project = data.get("project", {}) + if isinstance(project, dict): + project_deps = project.get("dependencies", []) + if isinstance(project_deps, list): + dependencies.extend(str(item) for item in project_deps) + optional = project.get("optional-dependencies", {}) + if isinstance(optional, dict): + for items in optional.values(): + if isinstance(items, list): + dependencies.extend(str(item) for item in items) + + poetry = data.get("tool", {}).get("poetry", {}) if isinstance(data.get("tool"), dict) else {} + poetry_deps = poetry.get("dependencies", {}) if isinstance(poetry, dict) else {} + if isinstance(poetry_deps, dict): + for name, version in poetry_deps.items(): + if name.lower() != "python": + dependencies.append(f"{name}=={version}" if isinstance(version, str) else name) + + return extract_facts_from_requirements("\n".join(dependencies)) + + +def extract_facts_from_package_json(content: str, locked: bool = False) -> dict[str, Any]: + facts = _empty_facts() + try: + data = json.loads(content) + except json.JSONDecodeError: + return facts + + packages: dict[str, str] = {} + if locked and isinstance(data.get("packages"), dict): + for package_path, meta in data["packages"].items(): + if not package_path.startswith("node_modules/") or not isinstance(meta, dict): + continue + version = meta.get("version") + if isinstance(version, str): + packages[package_path.removeprefix("node_modules/")] = version + else: + for section in ("dependencies", "devDependencies", "optionalDependencies"): + deps = data.get(section, {}) + if isinstance(deps, dict): + for name, version in deps.items(): + if isinstance(version, str): + packages[name] = version + + for package, raw_version in packages.items(): + facts["total_packages"] += 1 + pinned = locked or re.match(r"^\d+(?:\.\d+){1,3}", raw_version.strip()) is not None + clean_version = re.sub(r"^[~^=v<> ]+", "", raw_version.strip()) + if not pinned: + _record_unpinned(facts) + continue + _check_vulnerability_db(facts, package, clean_version, _NPM_VULN_DB) return facts +def _merge_facts(items: list[dict[str, Any]]) -> dict[str, Any]: + merged = _empty_facts() + for facts in items: + merged["vulnerable_count"] += facts["vulnerable_count"] + merged["total_packages"] += facts["total_packages"] + merged["unpinned_count"] += facts["unpinned_count"] + merged["critical_vuln_count"] += facts["critical_vuln_count"] + merged["vulnerabilities"].extend(facts["vulnerabilities"]) + merged["has_vulnerable_packages"] = merged["vulnerable_count"] > 0 + merged["has_unpinned_packages"] = merged["unpinned_count"] > 0 + return merged + + +def extract_facts_from_dependency_file(filename: str, content: str) -> dict[str, Any]: + name = Path(filename).name.lower() + if name == "pyproject.toml": + return extract_facts_from_pyproject(content) + if name == "package-lock.json": + return extract_facts_from_package_json(content, locked=True) + if name == "package.json": + return extract_facts_from_package_json(content, locked=False) + return extract_facts_from_requirements(content) + + def load_dependency_files(path: str | Path) -> list[tuple[str, str]]: """Load dependency files.""" path = Path(path) files: list[tuple[str, str]] = [] - dep_names = {"requirements.txt", "requirements-dev.txt", "requirements_dev.txt", - "requirements.in", "constraints.txt"} + dep_names = { + "constraints.txt", + "package-lock.json", + "package.json", + "pyproject.toml", + "requirements-dev.txt", + "requirements_dev.txt", + "requirements.in", + "requirements.txt", + } if path.is_dir(): + has_npm_lock = (path / "package-lock.json").exists() for name in sorted(dep_names): + if name == "package.json" and has_npm_lock: + continue f = path / name if f.exists(): files.append((str(f), f.read_text(encoding="utf-8", errors="replace"))) @@ -146,10 +282,12 @@ def analyze(path: str | Path, rules_dir: str | Path) -> list[Finding]: all_findings: list[Finding] = [] for source, content in files: - facts = extract_facts_from_requirements(content) + facts = extract_facts_from_dependency_file(source, content) findings = run_rules(facts, rules) for f in findings: f.location = source + if is_fixture_path(source): + f.confidence = "low" all_findings.extend(findings) # Direct findings for each vulnerability @@ -168,8 +306,9 @@ def analyze(path: str | Path, rules_dir: str | Path) -> list[Finding]: suggestion=f"Upgrade {pkg} to the latest patched version. Run: pip install --upgrade {pkg}", location=source, context=f"{pkg}=={ver}", - attack_vector=f"Exploiting {cve} in {pkg} {ver} — {desc}", + attack_vector=f"Exploiting {cve} in {pkg} {ver} - {desc}", cwe="CWE-1035", + confidence="low" if is_fixture_path(source) else "high", ) all_findings.append(finding) diff --git a/src/contractguard/analyzers/file_filters.py b/src/contractguard/analyzers/file_filters.py index a60caeb..afe28c2 100644 --- a/src/contractguard/analyzers/file_filters.py +++ b/src/contractguard/analyzers/file_filters.py @@ -14,6 +14,15 @@ "dist-vsix", "build", "out", + ".next", + ".nuxt", + ".turbo", + ".cache", + "coverage", + "htmlcov", + "target", + "vendor", + "site-packages", ".pytest_cache", "__pycache__", ".mypy_cache", @@ -22,6 +31,72 @@ _NORMALIZED_SKIP_DIRS = {part.casefold() for part in _SKIP_DIRS} +_FIXTURE_DIRS = { + "__tests__", + "docs", + "doc", + "example", + "examples", + "fixture", + "fixtures", + "sample", + "samples", + "spec", + "test", + "tests", +} + +_SOURCE_EXTENSIONS = { + ".c", + ".cc", + ".cpp", + ".cs", + ".go", + ".java", + ".js", + ".jsx", + ".kt", + ".mjs", + ".php", + ".py", + ".rb", + ".rs", + ".swift", + ".ts", + ".tsx", +} + +_DATA_EXTENSIONS = { + ".csv", + ".env", + ".ini", + ".json", + ".jsonl", + ".properties", + ".toml", + ".tsv", + ".xml", + ".yaml", + ".yml", +} + +_DOCUMENTATION_EXTENSIONS = { + ".adoc", + ".md", + ".mdx", + ".rst", +} + +_INLINE_IGNORE_MARKERS = { + "contractguard:ignore", + "contractguard-ignore", + "gitleaks:allow", + "nosec", + "pragma: allowlist secret", +} + +DEFAULT_MAX_TEXT_FILE_BYTES = 1_000_000 + def should_skip_path(path: Path) -> bool: parts = path.parts @@ -30,3 +105,43 @@ def should_skip_path(path: Path) -> bool: elif path.suffix: parts = path.parent.parts return any(part.casefold() in _NORMALIZED_SKIP_DIRS for part in parts) + + +def should_skip_large_file(path: Path, max_bytes: int = DEFAULT_MAX_TEXT_FILE_BYTES) -> bool: + try: + return path.is_file() and path.stat().st_size > max_bytes + except OSError: + return True + + +def is_fixture_path(path: str | Path) -> bool: + """Return true for docs/tests/samples where fixture-looking data is common.""" + file_path = Path(path) + normalized = {part.casefold() for part in file_path.parts} + name = file_path.name.casefold() + return any(part in _FIXTURE_DIRS for part in normalized) or any( + token in name for token in ("example", "fixture", "sample", "template") + ) + + +def is_source_file(path: str | Path) -> bool: + return Path(path).suffix.casefold() in _SOURCE_EXTENSIONS + + +def is_data_file(path: str | Path) -> bool: + file_path = Path(path) + return file_path.suffix.casefold() in _DATA_EXTENSIONS or file_path.name.casefold() == ".env" + + +def is_documentation_file(path: str | Path) -> bool: + return Path(path).suffix.casefold() in _DOCUMENTATION_EXTENSIONS + + +def has_inline_ignore(line: str) -> bool: + lowered = line.casefold() + return any(marker in lowered for marker in _INLINE_IGNORE_MARKERS) + + +def confidence_allowed(confidence: str, minimum: str) -> bool: + order = {"low": 0, "medium": 1, "high": 2} + return order.get(confidence, 2) >= order.get(minimum, 1) diff --git a/src/contractguard/analyzers/json_analyzer.py b/src/contractguard/analyzers/json_analyzer.py index a1b1ca4..9507a0d 100644 --- a/src/contractguard/analyzers/json_analyzer.py +++ b/src/contractguard/analyzers/json_analyzer.py @@ -12,6 +12,16 @@ from typing import Any from contractguard.engine import Finding, Rule, load_rules_for_analyzer, run_rules +from contractguard.analyzers.file_filters import should_skip_path + +_SKIP_JSON_NAMES = { + "composer.lock", + "package.json", + "package-lock.json", + "pnpm-lock.yaml", + "tsconfig.json", + "tsconfig.build.json", +} def _type_label(val: Any) -> str: @@ -111,8 +121,12 @@ def load_json_samples(path: str | Path) -> list[dict]: if path.is_dir(): for f in sorted(path.glob("*.json")): + if f.name.casefold() in _SKIP_JSON_NAMES or should_skip_path(f): + continue objects.extend(_load_single(f)) else: + if path.name.casefold() in _SKIP_JSON_NAMES or should_skip_path(path): + return [] objects.extend(_load_single(path)) return objects diff --git a/src/contractguard/analyzers/pii_analyzer.py b/src/contractguard/analyzers/pii_analyzer.py index ec22b87..b06c163 100644 --- a/src/contractguard/analyzers/pii_analyzer.py +++ b/src/contractguard/analyzers/pii_analyzer.py @@ -10,13 +10,21 @@ import ipaddress import json -import re import os +import re from pathlib import Path from typing import Any from contractguard.engine import Finding, Severity, load_rules_for_analyzer, run_rules -from contractguard.analyzers.file_filters import should_skip_path +from contractguard.analyzers.file_filters import ( + has_inline_ignore, + is_data_file, + is_documentation_file, + is_fixture_path, + is_source_file, + should_skip_large_file, + should_skip_path, +) _PII_PATTERNS: list[tuple[str, re.Pattern, str]] = [ ("ssn", re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "Social Security Number"), @@ -48,6 +56,106 @@ "ip", "ip_address", } +_PHONE_CONTEXT = {"phone", "phone_number", "mobile", "cell", "tel", "telephone", "contact"} +_BIRTH_CONTEXT = {"dob", "date_of_birth", "birthday", "birth_date", "born"} +_IP_CONTEXT = { + "client_ip", + "ip_address", + "remote_addr", + "remote_ip", + "source_ip", + "user_ip", + "visitor_ip", + "x-forwarded-for", +} +_PASSPORT_CONTEXT = {"passport", "passport_number"} +_LICENSE_CONTEXT = {"drivers_license", "driver_license", "license", "dl_number"} +_SKIP_FILENAMES = { + "package-lock.json", + "package.json", + "pnpm-lock.yaml", + "poetry.lock", + "pyproject.toml", + "yarn.lock", +} + + +def _line_has_any_context(line: str, terms: set[str]) -> bool: + lowered = line.casefold() + return any(re.search(rf"\b{re.escape(term)}\b", lowered) for term in terms) + + +def _digits_only(value: str) -> str: + return re.sub(r"\D", "", value) + + +def _passes_luhn(value: str) -> bool: + digits = [int(ch) for ch in _digits_only(value)] + if len(digits) < 13: + return False + checksum = 0 + parity = len(digits) % 2 + for index, digit in enumerate(digits): + if index % 2 == parity: + digit *= 2 + if digit > 9: + digit -= 9 + checksum += digit + return checksum % 10 == 0 + + +def _is_decimal_fragment(line: str, start: int, end: int) -> bool: + before = line[start - 1] if start > 0 else "" + after = line[end] if end < len(line) else "" + return before == "." or after == "." + + +def _should_accept_pii_match(pii_name: str, match: re.Match[str], line: str, filename: str) -> bool: + if has_inline_ignore(line): + return False + + matched = match.group(0) + if pii_name.startswith("credit_card"): + return not _is_decimal_fragment(line, match.start(), match.end()) and _passes_luhn(matched) + + if pii_name == "date_of_birth": + return _line_has_any_context(line, _BIRTH_CONTEXT) + + if pii_name.startswith("phone"): + return _line_has_any_context(line, _PHONE_CONTEXT) + + if pii_name == "ip_address": + if _is_non_personal_ip(matched): + return False + if not _line_has_any_context(line, _IP_CONTEXT | {" ip ", "ip"}): + return False + lowered = line.casefold() + if any(token in lowered for token in ("oid", "threat", "tor", "indicator", "urlhaus")): + return False + return True + + if pii_name == "passport": + return _line_has_any_context(line, _PASSPORT_CONTEXT) + + if pii_name == "drivers_license": + return _line_has_any_context(line, _LICENSE_CONTEXT) + + return True + + +def _confidence_for_pii(pii_name: str, line: str, filename: str) -> str: + if is_fixture_path(filename): + return "low" + if pii_name == "email_address" and (is_source_file(filename) or is_documentation_file(filename)): + return "low" + if pii_name in {"ssn", "credit_card_visa", "credit_card_mc", "credit_card_amex", "medical_record"}: + return "high" + if is_source_file(filename) and not is_data_file(filename): + return "medium" + if pii_name in {"ip_address", "phone_us", "phone_intl", "date_of_birth"}: + return "medium" + return "high" + def extract_facts(content: str, filename: str = "") -> dict[str, Any]: """Scan content for PII patterns.""" @@ -62,21 +170,24 @@ def extract_facts(content: str, filename: str = "") -> dict[str, Any]: "has_passport": False, "has_medical_record": False, "pii_field_names_count": 0, - "pii_items": [], # list of (type, line, preview) + "pii_items": [], # list of (pii_name, line_num, preview, description) + "pii_details": [], # list of (type, line, preview, description, confidence) } for line_num, line in enumerate(content.splitlines(), 1): for pii_name, regex, desc in _PII_PATTERNS: for match in regex.finditer(line): matched = match.group(0) - if pii_name == "ip_address" and _is_non_personal_ip(matched): + if not _should_accept_pii_match(pii_name, match, line, filename): continue facts["pii_count"] += 1 if len(matched) > 8: preview = matched[:3] + "***" + matched[-2:] else: preview = "***" + confidence = _confidence_for_pii(pii_name, line, filename) facts["pii_items"].append((pii_name, line_num, preview, desc)) + facts["pii_details"].append((pii_name, line_num, preview, desc, confidence)) if "ssn" in pii_name: facts["has_ssn"] = True @@ -95,10 +206,11 @@ def extract_facts(content: str, filename: str = "") -> dict[str, Any]: if "medical_record" in pii_name: facts["has_medical_record"] = True - lower_content = content.lower() - for field_name in _PII_FIELD_NAMES: - if f'"{field_name}"' in lower_content or f"'{field_name}'" in lower_content: - facts["pii_field_names_count"] += 1 + if not filename or not is_source_file(filename): + lower_content = content.lower() + for field_name in _PII_FIELD_NAMES: + if f'"{field_name}"' in lower_content or f"'{field_name}'" in lower_content: + facts["pii_field_names_count"] += 1 return facts @@ -107,7 +219,11 @@ def load_files(path: str | Path) -> list[tuple[str, str]]: """Load text files for PII scanning.""" path = Path(path) files: list[tuple[str, str]] = [] - _skip = {".pyc", ".exe", ".dll", ".png", ".jpg", ".gif", ".zip", ".tar", ".gz"} + _skip = { + ".bmp", ".class", ".dll", ".doc", ".docx", ".dylib", ".eot", ".exe", + ".gif", ".gz", ".ico", ".jpg", ".jpeg", ".pdf", ".png", ".pyc", ".pyo", + ".rar", ".so", ".svg", ".tar", ".ttf", ".woff", ".woff2", ".zip", + } if path.is_dir(): for root, dirnames, filenames in os.walk(path): @@ -117,14 +233,25 @@ def load_files(path: str | Path) -> list[tuple[str, str]]: ] for name in sorted(filenames): file_path = root_path / name - if file_path.suffix.lower() in _skip or should_skip_path(file_path): + if ( + file_path.suffix.lower() in _skip + or file_path.name.casefold() in _SKIP_FILENAMES + or file_path.name.casefold().endswith("_rules.yaml") + or should_skip_large_file(file_path) + or should_skip_path(file_path) + ): continue try: files.append((str(file_path), file_path.read_text(encoding="utf-8", errors="replace"))) except Exception: continue elif path.is_file(): - if should_skip_path(path): + if ( + path.name.casefold() in _SKIP_FILENAMES + or path.name.casefold().endswith("_rules.yaml") + or should_skip_large_file(path) + or should_skip_path(path) + ): return files try: files.append((str(path), path.read_text(encoding="utf-8", errors="replace"))) @@ -157,23 +284,31 @@ def analyze(path: str | Path, rules_dir: str | Path) -> list[Finding]: for source, content in files: facts = extract_facts(content, source) findings = run_rules(facts, rules) - for f in findings: - f.location = source + for finding in findings: + finding.location = source + if is_fixture_path(source): + finding.confidence = "low" + if facts["pii_details"]: + findings = [item for item in findings if item.rule_id == "PII004"] all_findings.extend(findings) - # Direct findings for each PII match - for pii_name, line_num, preview, desc in facts["pii_items"]: + # Direct findings are line-specific, so suppress broad duplicates above. + for pii_name, line_num, preview, desc, confidence in facts["pii_details"]: + severity = Severity.CRITICAL + if pii_name in {"email_address", "ip_address", "phone_us", "phone_intl", "date_of_birth"}: + severity = Severity.WARNING finding = Finding( rule_id=f"PII-{pii_name.upper()[:8]}", rule_name=pii_name, - severity=Severity.CRITICAL, + severity=severity, description=f"Detected {desc} in data.", explanation=f"Line {line_num}: matched {pii_name} pattern", suggestion="Remove PII from source/data files. Use tokenization, encryption, or a PII vault.", location=f"{source}:{line_num}", context=preview, - attack_vector=f"Data breach exposes {desc} → identity theft, regulatory fines (GDPR/CCPA/HIPAA)", + attack_vector=f"Data breach exposes {desc} -> identity theft, regulatory fines (GDPR/CCPA/HIPAA)", cwe="CWE-359", + confidence=confidence, ) all_findings.append(finding) diff --git a/src/contractguard/analyzers/secrets_analyzer.py b/src/contractguard/analyzers/secrets_analyzer.py index bb54d47..5239ec2 100644 --- a/src/contractguard/analyzers/secrets_analyzer.py +++ b/src/contractguard/analyzers/secrets_analyzer.py @@ -6,13 +6,20 @@ from __future__ import annotations -import re import os +import re from pathlib import Path from typing import Any from contractguard.engine import Finding, Severity, load_rules_for_analyzer, run_rules -from contractguard.analyzers.file_filters import should_skip_path +from contractguard.analyzers.file_filters import ( + has_inline_ignore, + is_documentation_file, + is_fixture_path, + is_source_file, + should_skip_large_file, + should_skip_path, +) _SECRET_PATTERNS: list[tuple[str, re.Pattern, str]] = [ ("aws_access_key", re.compile(r"(?:^|[^A-Za-z0-9/+=])(?:AKIA[0-9A-Z]{16})(?:[^A-Za-z0-9/+=]|$)"), "block"), @@ -21,6 +28,7 @@ ("github_fine_grained", re.compile(r"github_pat_[A-Za-z0-9_]{22,255}"), "block"), ("generic_api_key", re.compile(r"(?:api[_-]?key|apikey|api[_-]?secret)\s*[=:]\s*['\"]?([A-Za-z0-9_\-]{20,})['\"]?", re.I), "critical"), ("generic_secret", re.compile(r"(?:secret|token|password|passwd|pwd)\s*[=:]\s*['\"]?([^\s'\"]{8,})['\"]?", re.I), "critical"), + ("smtp_app_password", re.compile(r"(?:smtp[_-]?password|gmail[_-]?app[_-]?password)\s*[=:]\s*['\"]?([a-z]{4}\s+[a-z]{4}\s+[a-z]{4}\s+[a-z]{4})['\"]?", re.I), "critical"), ("private_key", re.compile(r"-----BEGIN (?:RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----"), "block"), ("stripe_key", re.compile(r"(?:sk|pk)_(?:live|test)_[A-Za-z0-9]{20,}"), "block"), ("slack_token", re.compile(r"xox[baprs]-[A-Za-z0-9-]{10,}"), "block"), @@ -48,6 +56,121 @@ ".pdf", ".doc", ".docx", } +_PLACEHOLDER_TOKENS = { + "changeme", + "change_me", + "change-me", + "dummy", + "example", + "fake", + "not_a_real", + "not-real", + "placeholder", + "replace", + "sample", + "test", + "todo", + "value", + "your-", + "your_", +} + +_CODE_VALUE_PREFIXES = ( + "await ", + "function", + "header", + "process.", + "request.", + "this.", + "verify", +) + + +def _extract_match_value(match: re.Match[str]) -> str: + if match.lastindex: + for index in range(match.lastindex, 0, -1): + value = match.group(index) + if value: + return value.strip().strip("'\"") + return match.group(0).strip().strip("'\"") + + +def _looks_like_pattern_definition(line: str) -> bool: + lowered = line.casefold() + return "re.compile" in lowered or "regexp" in lowered or "regex" in lowered or "_patterns" in lowered + + +def _looks_like_code_expression(value: str) -> bool: + lowered = value.strip().casefold() + if lowered.startswith(_CODE_VALUE_PREFIXES): + return True + if "(" in value or ")" in value or "=>" in value or "${" in value or "}" in value: + return True + if "?." in value or value.endswith(".") or value.endswith(","): + return True + return False + + +def _looks_like_placeholder(value: str) -> bool: + lowered = value.casefold() + if any(token in lowered for token in _PLACEHOLDER_TOKENS): + return True + compact = re.sub(r"[^a-z0-9]", "", lowered) + if compact in {"admin", "admin123", "password", "password123", "secret", "token"}: + return True + if len(set(compact)) <= 2 and len(compact) >= 8: + return True + return False + + +def _is_plausible_secret(pattern_name: str, match: re.Match[str], line: str, filename: str) -> bool: + if has_inline_ignore(line): + return False + if pattern_name == "heroku_api_key" and not re.search(r"\b(?:heroku|api[_-]?key|token|credential)\b", line, re.I): + return False + if pattern_name in {"private_key", "ssh_private_key"} and _looks_like_pattern_definition(line): + return False + if pattern_name in {"generic_api_key", "generic_secret", "env_secret", "high_entropy_assignment"}: + value = _extract_match_value(match) + if _looks_like_code_expression(value): + return False + if is_source_file(filename) and not _looks_like_placeholder(value): + # Source assignments often reference variables/functions rather than literals. + if not re.search(r"['\"][A-Za-z0-9_./+=:-]{16,}['\"]", line): + return False + return True + + +def _confidence_for_secret(pattern_name: str, match: re.Match[str], line: str, filename: str) -> str: + if pattern_name in { + "aws_access_key", + "aws_secret_key", + "database_url", + "github_fine_grained", + "github_token", + "gcp_api_key", + "npm_token", + "private_key", + "sendgrid_key", + "slack_token", + "slack_webhook", + "smtp_app_password", + "ssh_private_key", + "stripe_key", + }: + base = "high" + else: + base = "medium" + + value = _extract_match_value(match) + if is_fixture_path(filename): + return "low" + if is_documentation_file(filename) and _looks_like_placeholder(value): + return "low" + if _looks_like_placeholder(value): + return "low" + return base + def extract_facts(content: str, filename: str = "") -> dict[str, Any]: """Scan text content for secret patterns and build facts dict.""" @@ -61,6 +184,7 @@ def extract_facts(content: str, filename: str = "") -> dict[str, Any]: "has_generic_secret": False, "has_high_entropy": False, "secrets_found": [], # list of (pattern_name, line_num, matched_text_preview) + "secret_items": [], # list of (pattern_name, line_num, preview, confidence) "max_severity": "info", } @@ -75,15 +199,17 @@ def extract_facts(content: str, filename: str = "") -> dict[str, Any]: for pattern_name, regex, sev_hint in _SECRET_PATTERNS: match = regex.search(line) - if match: + if match and _is_plausible_secret(pattern_name, match, line, filename): facts["secret_count"] += 1 - # Redact the match for safety — show only first/last 4 chars - matched = match.group(0) + # Redact the match for safety - show only first/last 4 chars + matched = _extract_match_value(match) if len(matched) > 12: preview = matched[:4] + "****" + matched[-4:] else: preview = "****" + confidence = _confidence_for_secret(pattern_name, match, line, filename) facts["secrets_found"].append((pattern_name, line_num, preview)) + facts["secret_items"].append((pattern_name, line_num, preview, confidence)) if "aws" in pattern_name: facts["has_aws_key"] = True @@ -123,7 +249,11 @@ def load_files(path: str | Path) -> list[tuple[str, str]]: ] for name in sorted(filenames): file_path = root_path / name - if file_path.suffix.lower() in _SKIP_EXTENSIONS or should_skip_path(file_path): + if ( + file_path.suffix.lower() in _SKIP_EXTENSIONS + or should_skip_large_file(file_path) + or should_skip_path(file_path) + ): continue try: content = file_path.read_text(encoding="utf-8", errors="replace") @@ -131,7 +261,7 @@ def load_files(path: str | Path) -> list[tuple[str, str]]: except Exception: continue elif path.is_file(): - if should_skip_path(path): + if should_skip_large_file(path) or should_skip_path(path): return files try: content = path.read_text(encoding="utf-8", errors="replace") @@ -154,9 +284,13 @@ def analyze(path: str | Path, rules_dir: str | Path) -> list[Finding]: findings = run_rules(facts, rules) for f in findings: f.location = source + if is_fixture_path(source) or is_documentation_file(source): + f.confidence = "low" + + all_findings.extend(findings) # Also generate direct findings for each secret found (bypass rule engine) - for pattern_name, line_num, preview in facts["secrets_found"]: + for pattern_name, line_num, preview, confidence in facts["secret_items"]: sev_map = {n: s for n, _, s in _SECRET_PATTERNS for n2 in [n] if n2 == pattern_name} sev_str = sev_map.get(pattern_name, "critical") try: @@ -175,9 +309,8 @@ def analyze(path: str | Path, rules_dir: str | Path) -> list[Finding]: context=preview, attack_vector=f"Attacker clones repo → extracts {pattern_name.replace('_', ' ')} → gains unauthorized access", cwe="CWE-798", + confidence=confidence, ) all_findings.append(finding) - all_findings.extend(findings) - return all_findings diff --git a/src/contractguard/bridge.py b/src/contractguard/bridge.py index 6b1d276..b5c9795 100644 --- a/src/contractguard/bridge.py +++ b/src/contractguard/bridge.py @@ -20,10 +20,19 @@ def scan( analyzer: str = typer.Option("all", "--analyzer", help="Analyzer id or 'all'."), rules_dir: Path | None = typer.Option(None, "--rules-dir", help="Override rules directory."), db_path: str | None = typer.Option(None, "--db", help="SQLite database used for SQL EXPLAIN mode."), + min_confidence: str = typer.Option("medium", "--min-confidence", help="Minimum confidence: low, medium, high."), + include_fixtures: bool = typer.Option(False, "--include-fixtures", help="Include findings from tests/samples/docs."), include_sarif: bool = typer.Option(False, "--include-sarif", help="Include SARIF payload in the response."), ) -> None: result = scan_target( - ScanTarget(path=path, analyzer=analyzer, rules_dir=rules_dir, db_path=db_path), + ScanTarget( + path=path, + analyzer=analyzer, + rules_dir=rules_dir, + db_path=db_path, + min_confidence=min_confidence, + include_fixtures=include_fixtures, + ), include_sarif=include_sarif, ) typer.echo(json.dumps(result.to_dict(), indent=2)) @@ -35,8 +44,19 @@ def findings( analyzer: str = typer.Option("all", "--analyzer", help="Analyzer id or 'all'."), rules_dir: Path | None = typer.Option(None, "--rules-dir", help="Override rules directory."), db_path: str | None = typer.Option(None, "--db", help="SQLite database used for SQL EXPLAIN mode."), + min_confidence: str = typer.Option("medium", "--min-confidence", help="Minimum confidence: low, medium, high."), + include_fixtures: bool = typer.Option(False, "--include-fixtures", help="Include findings from tests/samples/docs."), ) -> None: - result = scan_target(ScanTarget(path=path, analyzer=analyzer, rules_dir=rules_dir, db_path=db_path)) + result = scan_target( + ScanTarget( + path=path, + analyzer=analyzer, + rules_dir=rules_dir, + db_path=db_path, + min_confidence=min_confidence, + include_fixtures=include_fixtures, + ) + ) typer.echo(findings_to_json(result.findings)) diff --git a/src/contractguard/cli.py b/src/contractguard/cli.py index efad9bc..4c22dc0 100644 --- a/src/contractguard/cli.py +++ b/src/contractguard/cli.py @@ -117,6 +117,8 @@ def analyze( report_json: Optional[Path] = typer.Option(None, "--report-json", help="Write JSON report"), report_sarif: Optional[Path] = typer.Option(None, "--report-sarif", help="Write SARIF report"), db: Optional[str] = typer.Option(None, "--db", help="SQLite DB path for EXPLAIN mode (sql only)"), + min_confidence: str = typer.Option("medium", "--min-confidence", help="Minimum confidence: low, medium, high"), + include_fixtures: bool = typer.Option(False, "--include-fixtures", help="Include findings from tests/samples/docs"), ci: bool = typer.Option(False, "--ci", help="CI mode: exit code 2 on critical or block findings"), show_score: bool = typer.Option(False, "--score", help="Show security grade after analysis"), record: bool = typer.Option(False, "--record", help="Record scan to history database"), @@ -135,7 +137,14 @@ def analyze( console.print(f"[red]Error:[/red] Unknown type '{type}'. Use: {', '.join(_ANALYZER_TYPES)}") raise typer.Exit(1) - findings = run_scan(path=path, analyzer=type, rules_dir=rules_path, db_path=db) + findings = run_scan( + path=path, + analyzer=type, + rules_dir=rules_path, + db_path=db, + min_confidence=min_confidence, + include_fixtures=include_fixtures, + ) ci_fail = _print_findings(findings, ci_mode=ci) if show_score or type == "all": @@ -173,6 +182,8 @@ def analyze( def score( path: Path = typer.Option(".", "--path", "-p", help="Project root to scan"), rules_dir: Optional[Path] = typer.Option(None, "--rules-dir", "-r"), + min_confidence: str = typer.Option("medium", "--min-confidence", help="Minimum confidence: low, medium, high"), + include_fixtures: bool = typer.Option(False, "--include-fixtures", help="Include findings from tests/samples/docs"), ) -> None: try: rules_path = resolve_rules_dir(rules_dir) @@ -185,7 +196,13 @@ def score( raise typer.Exit(1) console.print("[bold]Running full security scan...[/bold]") - findings = run_scan(path=path, analyzer="all", rules_dir=rules_path) + findings = run_scan( + path=path, + analyzer="all", + rules_dir=rules_path, + min_confidence=min_confidence, + include_fixtures=include_fixtures, + ) _print_score(findings) @@ -236,6 +253,7 @@ def watch( path: Path = typer.Option(".", "--path", "-p", help="Directory to watch"), type: str = typer.Option("all", "--type", "-t", help="Analyzer type to run"), rules_dir: Optional[Path] = typer.Option(None, "--rules-dir", "-r"), + min_confidence: str = typer.Option("medium", "--min-confidence", help="Minimum confidence: low, medium, high"), interval: int = typer.Option(3, "--interval", help="Seconds between scans"), ) -> None: try: @@ -272,7 +290,7 @@ def get_mtimes() -> dict[str, float]: if changed or new_files: console.print(f"\n[yellow]Change detected ({len(changed | new_files)} file(s)). Re-scanning...[/yellow]") - findings = run_scan(path=path, analyzer=type, rules_dir=rules_path) + findings = run_scan(path=path, analyzer=type, rules_dir=rules_path, min_confidence=min_confidence) _print_findings(findings) _print_score(findings) last_mtimes = current diff --git a/src/contractguard/reporter.py b/src/contractguard/reporter.py index 34b4ed3..3016161 100644 --- a/src/contractguard/reporter.py +++ b/src/contractguard/reporter.py @@ -7,6 +7,7 @@ from jinja2 import BaseLoader, Environment +from contractguard import __version__ from contractguard.engine import Finding, Severity from contractguard.scorer import compute_score @@ -211,7 +212,7 @@ def render_sarif_report( "tool": { "driver": { "name": "ContractGuard", - "version": "1.2.0", + "version": __version__, "informationUri": "https://github.com/Blackplane-Systems/contractguard", "rules": rules, } diff --git a/src/contractguard/scan.py b/src/contractguard/scan.py index b2c5250..3ab5604 100644 --- a/src/contractguard/scan.py +++ b/src/contractguard/scan.py @@ -2,12 +2,14 @@ import importlib import json +import datetime from dataclasses import asdict, dataclass from pathlib import Path from typing import Any, Callable from contractguard import __version__ from contractguard.engine import Finding, Severity +from contractguard.analyzers.file_filters import confidence_allowed, is_fixture_path from contractguard.reporter import render_sarif_report from contractguard.scorer import SecurityScore, compute_score @@ -33,6 +35,8 @@ class ScanTarget: analyzer: str = "all" rules_dir: Path | None = None db_path: str | None = None + min_confidence: str = "medium" + include_fixtures: bool = False @dataclass @@ -115,7 +119,14 @@ def scan_target(target: ScanTarget, include_sarif: bool = False) -> ScanResult: raise ValueError(f"Unsupported analyzer '{analyzer}'. Supported values: {supported}") rules_dir = resolve_rules_dir(target.rules_dir) - findings = run_scan(path=path, analyzer=analyzer, rules_dir=rules_dir, db_path=target.db_path) + findings = run_scan( + path=path, + analyzer=analyzer, + rules_dir=rules_dir, + db_path=target.db_path, + min_confidence=target.min_confidence, + include_fixtures=target.include_fixtures, + ) score = compute_score(findings) sarif = render_sarif_report(findings, analyzer_type=analyzer) if include_sarif else None return ScanResult( @@ -124,6 +135,7 @@ def scan_target(target: ScanTarget, include_sarif: bool = False) -> ScanResult: findings=findings, score=score, sarif=sarif, + generated_at=datetime.datetime.now(datetime.timezone.utc).isoformat(), ) @@ -132,6 +144,8 @@ def run_scan( analyzer: str = "all", rules_dir: str | Path | None = None, db_path: str | None = None, + min_confidence: str = "medium", + include_fixtures: bool = False, ) -> list[Finding]: registry = _get_analyzer_registry() rules_path = resolve_rules_dir(Path(rules_dir) if rules_dir else None) @@ -141,22 +155,31 @@ def run_scan( findings: list[Finding] = [] for analyzer_id, module_path in registry.items(): findings.extend( - _invoke_analyzer( + _run_analyzer( analyzer_id=analyzer_id, - analyzer_fn=_load_analyzer(module_path), + module_path=module_path, path=target_path, rules_path=rules_path, db_path=db_path, ) ) - return findings - - return _invoke_analyzer( - analyzer_id=analyzer, - analyzer_fn=_load_analyzer(registry[analyzer]), - path=target_path, - rules_path=rules_path, - db_path=db_path, + return _filter_findings_by_fixtures( + _filter_findings_by_confidence(findings, min_confidence), + include_fixtures, + ) + + return _filter_findings_by_fixtures( + _filter_findings_by_confidence( + _run_analyzer( + analyzer_id=analyzer, + module_path=registry[analyzer], + path=target_path, + rules_path=rules_path, + db_path=db_path, + ), + min_confidence, + ), + include_fixtures, ) @@ -165,6 +188,39 @@ def _load_analyzer(module_path: str) -> AnalyzerFn: return getattr(module, "analyze") +def _run_analyzer( + analyzer_id: str, + module_path: str, + path: Path, + rules_path: Path, + db_path: str | None, +) -> list[Finding]: + try: + return _invoke_analyzer( + analyzer_id=analyzer_id, + analyzer_fn=_load_analyzer(module_path), + path=path, + rules_path=rules_path, + db_path=db_path, + ) + except Exception as exc: + return [ + Finding( + rule_id=f"CG-RUNTIME-{analyzer_id.upper()}", + rule_name=f"{analyzer_id}_runtime_error", + severity=Severity.WARNING, + description=f"{analyzer_id} analyzer failed to run.", + explanation=str(exc), + suggestion="Install runtime dependencies or disable this analyzer until the runtime is fixed.", + location=str(path), + context=type(exc).__name__, + attack_vector="Analyzer failure may hide issues in this category.", + cwe="", + confidence="high", + ) + ] + + def _invoke_analyzer( analyzer_id: str, analyzer_fn: AnalyzerFn, @@ -177,6 +233,34 @@ def _invoke_analyzer( return analyzer_fn(path, rules_path) +def _filter_findings_by_confidence(findings: list[Finding], min_confidence: str) -> list[Finding]: + minimum = min_confidence if min_confidence in {"low", "medium", "high"} else "medium" + return [finding for finding in findings if confidence_allowed(finding.confidence, minimum)] + + +def _location_to_path(location: str) -> str: + if not location: + return "" + if ":" not in location: + return location + head, tail = location.rsplit(":", 1) + if tail.isdigit(): + return head + return location + + +def _filter_findings_by_fixtures(findings: list[Finding], include_fixtures: bool) -> list[Finding]: + if include_fixtures: + return findings + filtered: list[Finding] = [] + for finding in findings: + location_path = _location_to_path(finding.location) + if location_path and is_fixture_path(location_path): + continue + filtered.append(finding) + return filtered + + def summarize_findings(findings: list[Finding]) -> dict[str, int]: return { "total": len(findings), diff --git a/tests/test_config_analyzer.py b/tests/test_config_analyzer.py index 41171db..d10928e 100644 --- a/tests/test_config_analyzer.py +++ b/tests/test_config_analyzer.py @@ -36,11 +36,21 @@ def test_default_password(self): facts = extract_facts(content) assert facts["default_password"] is True + def test_compose_default_password(self): + content = "DATABASE_URL: postgresql://postgres:${POSTGRES_PASSWORD:-postgres}@db/app\n" + facts = extract_facts(content) + assert facts["default_password"] is True + def test_ssl_disabled(self): content = "ssl_enabled: false\n" facts = extract_facts(content) assert facts["ssl_disabled"] is True + def test_smtp_starttls_does_not_count_as_ssl_disabled(self): + content = "SMTP_USE_STARTTLS=true\nSMTP_USE_SSL=false\n" + facts = extract_facts(content) + assert facts["ssl_disabled"] is False + def test_wildcard_host(self): content = "ALLOWED_HOSTS=*\n" facts = extract_facts(content) diff --git a/tests/test_dependency_analyzer.py b/tests/test_dependency_analyzer.py index f7dfeae..b254b12 100644 --- a/tests/test_dependency_analyzer.py +++ b/tests/test_dependency_analyzer.py @@ -7,6 +7,9 @@ from contractguard.analyzers.dependency_analyzer import ( analyze, + extract_facts_from_dependency_file, + extract_facts_from_package_json, + extract_facts_from_pyproject, extract_facts_from_requirements, _parse_version, _version_matches, @@ -63,6 +66,12 @@ def test_skips_comments_and_flags(self): facts = extract_facts_from_requirements(content) assert facts["total_packages"] == 1 + def test_range_versions_are_unpinned_not_vulnerable(self): + content = "requests>=2.0.0\n" + facts = extract_facts_from_requirements(content) + assert facts["has_unpinned_packages"] is True + assert facts["vulnerable_count"] == 0 + def test_multiple_vulns(self): content = "django==2.2.0\nflask==0.12.0\nurllib3==1.24.0\ncryptography==2.1.0\n" facts = extract_facts_from_requirements(content) @@ -73,6 +82,26 @@ def test_counts_critical(self): facts = extract_facts_from_requirements(content) assert facts["critical_vuln_count"] >= 1 + def test_pyproject_dependencies_are_scanned(self): + content = """ +[project] +dependencies = ["django==2.2.0", "requests>=2.31.0"] +""" + facts = extract_facts_from_pyproject(content) + assert facts["has_vulnerable_packages"] is True + assert facts["has_unpinned_packages"] is True + + def test_package_lock_dependencies_are_scanned(self): + content = '{"packages":{"node_modules/lodash":{"version":"4.17.20"}}}' + facts = extract_facts_from_package_json(content, locked=True) + assert facts["has_vulnerable_packages"] is True + assert any(item[2] == "CVE-2021-23337" for item in facts["vulnerabilities"]) + + def test_no_placeholder_cve_ids(self): + content = "django==2.2.0\n" + facts = extract_facts_from_dependency_file("requirements.txt", content) + assert all("XXXXX" not in item[2] for item in facts["vulnerabilities"]) + class TestAnalyze: def test_analyze_vulnerable_requirements(self): diff --git a/tests/test_pii_analyzer.py b/tests/test_pii_analyzer.py index f279d69..fd2e017 100644 --- a/tests/test_pii_analyzer.py +++ b/tests/test_pii_analyzer.py @@ -54,6 +54,31 @@ def test_suppresses_non_personal_ips(self): assert facts["has_ip_address"] is False assert facts["pii_count"] == 0 + def test_does_not_treat_numeric_code_constants_as_phone(self): + content = "this.state = (1664525 * this.state + 1013904223) >>> 0;\n" + facts = extract_facts(content, "math.ts") + assert facts["has_phone"] is False + assert facts["pii_count"] == 0 + + def test_does_not_treat_regular_dates_as_dob(self): + content = '"dateadded","2026-05-13 08:25:10","last_online","2026-05-13 08:25:10"\n' + facts = extract_facts(content, "feed.csv") + assert facts["has_dob"] is False + assert facts["pii_count"] == 0 + + def test_credit_cards_require_luhn(self): + content = '{"transitivity": 0.5191742775433075}\n' + facts = extract_facts(content, "model.json") + assert facts["has_credit_card"] is False + assert facts["pii_count"] == 0 + + def test_source_contact_email_is_low_confidence(self, tmp_path): + source = tmp_path / "ProductDetail.tsx" + source.write_text("Contact: atelier@example.com\n") + findings = analyze(source, RULES_DIR) + assert findings + assert all(f.confidence == "low" for f in findings) + def test_redacted_preview(self): content = '{"ssn": "123-45-6789"}' facts = extract_facts(content) diff --git a/tests/test_reporter.py b/tests/test_reporter.py index da76f08..bb00e75 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -1,5 +1,6 @@ """Tests for report generation.""" +from contractguard import __version__ from contractguard.engine import Finding, Severity from contractguard.reporter import render_html_report, render_sarif_report @@ -51,3 +52,7 @@ def test_sarif_preserves_windows_drive_paths(self): location = sarif["runs"][0]["results"][0]["locations"][0]["physicalLocation"] assert location["artifactLocation"]["uri"] == "C:/repo/.env" assert location["region"]["startLine"] == 12 + + def test_sarif_uses_package_version(self): + sarif = render_sarif_report([]) + assert sarif["runs"][0]["tool"]["driver"]["version"] == __version__ diff --git a/tests/test_scan.py b/tests/test_scan.py index 72c9287..d4ff6ec 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -1,6 +1,6 @@ from pathlib import Path -from contractguard.scan import ScanTarget, list_analyzers, scan_target, serialize_finding +from contractguard.scan import ScanTarget, list_analyzers, run_scan, scan_target, serialize_finding def test_list_analyzers_excludes_csv(): @@ -38,8 +38,28 @@ def test_serialize_finding_shape(): path=Path(__file__).resolve().parent.parent / "samples" / "secrets", analyzer="secrets", rules_dir=Path(__file__).resolve().parent.parent / "rules", + min_confidence="low", + include_fixtures=True, ) ).findings payload = serialize_finding(findings[0]) assert payload["rule_id"] assert payload["severity"] + + +def test_scan_filters_low_confidence_fixture_findings_by_default(): + findings = scan_target( + ScanTarget( + path=Path(__file__).resolve().parent.parent / "samples" / "secrets", + analyzer="secrets", + rules_dir=Path(__file__).resolve().parent.parent / "rules", + ) + ).findings + assert findings == [] + + +def test_analyzer_runtime_error_is_reported(monkeypatch, tmp_path): + monkeypatch.setattr("contractguard.scan._get_analyzer_registry", lambda: {"broken": "contractguard.missing"}) + findings = run_scan(tmp_path, analyzer="all", rules_dir=Path(__file__).resolve().parent.parent / "rules") + assert len(findings) == 1 + assert findings[0].rule_id == "CG-RUNTIME-BROKEN" diff --git a/tests/test_secrets_analyzer.py b/tests/test_secrets_analyzer.py index e1a0fd3..32cc411 100644 --- a/tests/test_secrets_analyzer.py +++ b/tests/test_secrets_analyzer.py @@ -55,11 +55,26 @@ def test_detects_jwt(self): facts = extract_facts(content) assert facts["has_jwt"] is True + def test_detects_smtp_app_password(self): + content = "SMTP_PASSWORD=abcd efgh ijkl mnop\n" + facts = extract_facts(content, ".env") + assert facts["secret_count"] >= 1 + + def test_plain_uuid_is_not_heroku_key(self): + content = "request_id = '123e4567-e89b-12d3-a456-426614174000'\n" + facts = extract_facts(content, "events.py") + assert facts["secret_count"] == 0 + def test_clean_file_no_secrets(self): content = "# This is a clean config\nDEBUG=false\nPORT=8080\n" facts = extract_facts(content) assert facts["secret_count"] == 0 + def test_ignores_token_variable_expression(self): + content = "const token = header?.startsWith('Bearer ') ? header.slice(7) : undefined;\n" + facts = extract_facts(content, "api.ts") + assert facts["secret_count"] == 0 + def test_redacted_preview(self): content = f"GITHUB_TOKEN={fake_github_token()}\n" facts = extract_facts(content) @@ -124,3 +139,18 @@ def test_skips_vendor_directories(self, tmp_path): (tmp_path / "safe.txt").write_text("Nothing here\n") findings = analyze(tmp_path, RULES_DIR) assert all("node_modules" not in f.location for f in findings) + + def test_fixture_findings_are_low_confidence(self, tmp_path): + samples_dir = tmp_path / "samples" + samples_dir.mkdir() + (samples_dir / "leaked.env").write_text("DATABASE_URL=postgresql://user:pass@example/db\n") + findings = analyze(samples_dir, RULES_DIR) + assert findings + assert all(f.confidence == "low" for f in findings) + + def test_readme_placeholder_token_is_low_confidence(self, tmp_path): + readme = tmp_path / "README.md" + readme.write_text("storefrontAccessToken: 'your-storefront-access-token'\n") + findings = analyze(readme, RULES_DIR) + assert findings + assert all(f.confidence == "low" for f in findings) diff --git a/vscode-src/pythonBridge.ts b/vscode-src/pythonBridge.ts index f58f9ed..088dc64 100644 --- a/vscode-src/pythonBridge.ts +++ b/vscode-src/pythonBridge.ts @@ -18,6 +18,15 @@ function getBundledRulesPath(context: vscode.ExtensionContext): string { return configured ? configured : path.join(context.extensionPath, 'rules'); } +function getMinimumConfidence(): string { + const configured = getConfig().get('minimumConfidence', 'medium').trim(); + return ['low', 'medium', 'high'].includes(configured) ? configured : 'medium'; +} + +function getIncludeFixtures(): boolean { + return getConfig().get('includeFixtures', false); +} + function getPythonExecutable(): string { const configured = getConfig().get('pythonPath', '').trim(); if (configured) { @@ -68,9 +77,15 @@ export async function runContractGuardScan( '--analyzer', analyzer, '--rules-dir', - getBundledRulesPath(context) + getBundledRulesPath(context), + '--min-confidence', + getMinimumConfidence() ]; + if (getIncludeFixtures()) { + args.push('--include-fixtures'); + } + if (dbPath) { args.push('--db', dbPath); }