diff --git a/scripts/version_scanner/tests/integration/test_scanner_integration.py b/scripts/version_scanner/tests/integration/test_scanner_integration.py index 36eff2402d38..3ce6d2cd3ab1 100644 --- a/scripts/version_scanner/tests/integration/test_scanner_integration.py +++ b/scripts/version_scanner/tests/integration/test_scanner_integration.py @@ -32,7 +32,8 @@ def test_integration_scan(tmp_path): "-v", "3.7", "-p", data_dir, "--config", config_path, - "-o", "scanner_report.csv" + "-o", "scanner_report.csv", + "--soft-fail" ] result = subprocess.run(cmd, cwd=tmp_path, capture_output=True, text=True, check=True) diff --git a/scripts/version_scanner/tests/unit/test_version_scanner.py b/scripts/version_scanner/tests/unit/test_version_scanner.py index f76b66d4d55d..054df22421bc 100644 --- a/scripts/version_scanner/tests/unit/test_version_scanner.py +++ b/scripts/version_scanner/tests/unit/test_version_scanner.py @@ -32,6 +32,60 @@ format_for_console ) +@pytest.fixture +def sample_match(): + return { + "file_name": "setup.py", + "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", + "repo_path": "packages/pkg_a/setup.py", + "package_name": "pkg_a", + "rule_name": "python_requires_check", + "line_number": "123", + "matched_string": "3.7", + "context_line": "python_requires = '>=3.7'", + "dependency": "python", + "version": "3.7" + } + + +@pytest.mark.parametrize( + "exception_to_raise, required, silent_missing, expected_exit, expected_output, expected_return", + [ + (None, True, False, False, None, "file content"), # Success + (FileNotFoundError(), True, True, False, None, None), # Silent missing FileNotFoundError + (FileNotFoundError(), True, False, True, "Error: Test_desc not found", None), # Required FileNotFoundError + (FileNotFoundError(), False, False, False, "Warning: Test_desc not found", None), # Optional FileNotFoundError + (PermissionError(), True, False, True, "Error: Permission denied reading test_desc", None), # Required PermissionError + (PermissionError(), False, False, False, "Warning: Permission denied reading test_desc", None), # Optional PermissionError + (IOError("disk full"), True, False, True, "Error reading test_desc", None), # Required IOError + (IOError("disk full"), False, False, False, "Warning: Error reading test_desc", None), # Optional IOError + ] +) +def test_safe_read_file_scenarios( + capsys, exception_to_raise, required, silent_missing, expected_exit, expected_output, expected_return +): + from version_scanner import _safe_read_file + + if exception_to_raise: + mock_open = mock.mock_open() + mock_open.side_effect = exception_to_raise + else: + mock_open = mock.mock_open(read_data="file content") + + with patch("builtins.open", mock_open): + if expected_exit: + with pytest.raises(SystemExit) as excinfo: + _safe_read_file("dummy.txt", required=required, description="test_desc", silent_missing=silent_missing) + assert excinfo.value.code == 1 + else: + res = _safe_read_file("dummy.txt", required=required, description="test_desc", silent_missing=silent_missing) + assert res == expected_return + + if expected_output: + captured = capsys.readouterr() + assert expected_output in captured.err + + # Test ConfigManager @pytest.mark.parametrize("dependency, version, expected", [ ( @@ -682,34 +736,13 @@ def test_safe_int(): assert _safe_int(None) == 0 assert _safe_int("abc") == 0 -def test_format_for_raw_csv_handles_empty_line_number(): - match = { - "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", - "repo_path": "packages/pkg_a/setup.py", - "package_name": "pkg_a", - "rule_name": "python_requires_check", - "line_number": "", - "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'" - } - formatted = format_for_raw_csv(match) +def test_format_for_raw_csv_handles_empty_line_number(sample_match): + sample_match["line_number"] = "" + formatted = format_for_raw_csv(sample_match) assert formatted["line_number"] == 0 -def test_format_for_raw_csv(): - match = { - "file_name": "setup.py", - "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", - "repo_path": "packages/pkg_a/setup.py", - "package_name": "pkg_a", - "rule_name": "python_requires_check", - "line_number": "123", - "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'", - "dependency": "python", - "version": "3.7" - } - - formatted = format_for_raw_csv(match) +def test_format_for_raw_csv(sample_match): + formatted = format_for_raw_csv(sample_match) assert formatted["file_name"] == "setup.py" assert formatted["file_path"] == "google-cloud-python/main/packages/pkg_a/setup.py" @@ -721,38 +754,14 @@ def test_format_for_raw_csv(): assert formatted["dependency"] == "python" assert formatted["version"] == "3.7" -def test_format_for_raw_csv_fallback_filename(): - match = { - "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", - "repo_path": "packages/pkg_a/setup.py", - "package_name": "pkg_a", - "rule_name": "python_requires_check", - "line_number": "123", - "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'", - "dependency": "python", - "version": "3.7" - } - - formatted = format_for_raw_csv(match) +def test_format_for_raw_csv_fallback_filename(sample_match): + del sample_match["file_name"] + formatted = format_for_raw_csv(sample_match) assert formatted["file_name"] == "setup.py" -def test_format_for_spreadsheet(): - match = { - "file_name": "setup.py", - "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", - "repo_path": "packages/pkg_a/setup.py", - "package_name": "pkg_a", - "rule_name": "python_requires_check", - "line_number": 123, - "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'", - "dependency": "python", - "version": "3.7" - } - +def test_format_for_spreadsheet(sample_match): # Without github_repo - formatted_no_repo = format_for_spreadsheet(match) + formatted_no_repo = format_for_spreadsheet(sample_match) assert formatted_no_repo["file_name"] == "setup.py" assert formatted_no_repo["line_number"] == 123 assert formatted_no_repo["matched_string"] == '="3.7"' # Decimal protection formula @@ -760,25 +769,102 @@ def test_format_for_spreadsheet(): assert formatted_no_repo["version"] == "3.7" # With github_repo - formatted_repo = format_for_spreadsheet(match, github_repo="https://github.com/user/repo", branch="main") + formatted_repo = format_for_spreadsheet(sample_match, github_repo="https://github.com/user/repo", branch="main") expected_url = "https://github.com/user/repo/blob/main/packages/pkg_a/setup.py#L123" assert formatted_repo["line_number"] == f'=HYPERLINK("{expected_url}", "123")' assert formatted_repo["matched_string"] == '="3.7"' -def test_format_for_console(): - match = { - "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", - "repo_path": "packages/pkg_a/setup.py", - "package_name": "pkg_a", - "rule_name": "python_requires_check", - "line_number": 123, - "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'" - } - - log_str = format_for_console(match) +def test_format_for_console(sample_match): + log_str = format_for_console(sample_match) assert "google-cloud-python/main/packages/pkg_a/setup.py:123" in log_str assert "[python_requires_check]" in log_str assert "3.7" in log_str assert "python_requires = " not in log_str # Slim format doesn't print context line + +def test_parse_targets_file(tmp_path): + from version_scanner import parse_targets_file + yaml_file = tmp_path / "targets.yaml" + yaml_file.write_text(""" +python: + - "3.7" + - "3.8" +protobuf: "4.25.8" +""") + targets = parse_targets_file(str(yaml_file)) + assert targets == [("python", "3.7"), ("python", "3.8"), ("protobuf", "4.25.8")] + +@pytest.mark.parametrize( + "file_content, file_exists", + [ + (None, False), # File not found + ("invalid: {", True), # Invalid YAML + ("- not_a_mapping", True), # Invalid structure (list instead of map) + ("python:\n - null", True), # Invalid version type (null/None value) + ] +) +def test_parse_targets_file_failures(tmp_path, file_content, file_exists): + from version_scanner import parse_targets_file + + if file_exists: + yaml_file = tmp_path / "targets_failures.yaml" + yaml_file.write_text(file_content) + path = str(yaml_file) + else: + path = "nonexistent_file.yaml" + + with pytest.raises(SystemExit) as excinfo: + parse_targets_file(path) + assert excinfo.value.code == 1 + +def test_scan_repository_multi_targets(tmp_path): + # Setup files in tmp repository + file1 = tmp_path / "packages" / "pkg1" / "setup.py" + file1.parent.mkdir(parents=True) + file1.write_text("python_requires = '>=3.7'\n") + + file2 = tmp_path / "packages" / "pkg2" / "requirements.txt" + file2.parent.mkdir(parents=True) + file2.write_text("protobuf==4.25.8\n") + + # Let's mock a config file with rules for both python and protobuf + config_file = tmp_path / "regex_config.yaml" + config_file.write_text(""" +rules: + - name: python_requires_check + applies_to: + - python + rules: + - python_requires\\s*=\\s*['\"]>={version}['\"] + - name: protobuf_check + applies_to: + - protobuf + rules: + - protobuf=={version} +""") + + from version_scanner import ConfigManager, scan_repository + + targets = [("python", "3.7"), ("protobuf", "4.25.8")] + rules = [] + for dep, ver in targets: + cm = ConfigManager(str(config_file), dep, ver) + rules.extend(cm.load_config()) + + results = scan_repository(str(tmp_path), rules, targets=targets) + + # We should have 2 matches + assert len(results) == 2 + + # Match for python + python_match = [r for r in results if r["dependency"] == "python"] + assert len(python_match) == 1 + assert python_match[0]["version"] == "3.7" + assert python_match[0]["rule_name"] == "python_requires_check" + + # Match for protobuf + protobuf_match = [r for r in results if r["dependency"] == "protobuf"] + assert len(protobuf_match) == 1 + assert protobuf_match[0]["version"] == "4.25.8" + assert protobuf_match[0]["rule_name"] == "protobuf_check" + diff --git a/scripts/version_scanner/version_scanner.py b/scripts/version_scanner/version_scanner.py index 484a6eacacae..6205e8effadd 100644 --- a/scripts/version_scanner/version_scanner.py +++ b/scripts/version_scanner/version_scanner.py @@ -23,9 +23,57 @@ import os import re import sys -from typing import Dict, List, Tuple, Any +from typing import Dict, List, Tuple, Any, Optional import yaml + +def _safe_read_file( + file_path: str, + required: bool = True, + description: str = "file", + silent_missing: bool = False +) -> Optional[str]: + """ + Safely reads file content and handles common file errors. + + Args: + file_path: Path to the file. + required: If True, exits the program with code 1 on read failure. + If False, prints a warning (or ignores) and returns None. + description: Description of the file type for error logging. + silent_missing: If True, silently ignores FileNotFoundError (returns None). + + Returns: + The file content string, or None if reading failed/was ignored. + """ + try: + with open(file_path, 'r', encoding='utf-8') as f: + return f.read() + except FileNotFoundError: + if silent_missing: + return None + if required: + print(f"Error: {description.capitalize()} not found: {file_path}", file=sys.stderr) + sys.exit(1) + else: + print(f"Warning: {description.capitalize()} not found: {file_path}", file=sys.stderr) + return None + except PermissionError: + if required: + print(f"Error: Permission denied reading {description}: {file_path}", file=sys.stderr) + sys.exit(1) + else: + print(f"Warning: Permission denied reading {description}: {file_path}", file=sys.stderr) + return None + except IOError as e: + if required: + print(f"Error reading {description} {file_path}: {e}", file=sys.stderr) + sys.exit(1) + else: + print(f"Warning: Error reading {description} {file_path}: {e}", file=sys.stderr) + return None + + class ConfigManager: """ Handles loading, validation, and interpolation of the regex configuration rules. @@ -106,15 +154,9 @@ def _compute_variables(self) -> Dict[str, str]: def load_config(self) -> List[Dict[str, str]]: """Load and resolve rules from config.""" + content = _safe_read_file(self.config_path, required=True, description="config file") try: - with open(self.config_path, 'r', encoding='utf-8') as f: - config = yaml.safe_load(f) - except FileNotFoundError: - print(f"Error: Config file not found: {self.config_path}", file=sys.stderr) - sys.exit(1) - except PermissionError: - print(f"Error: Permission denied reading config file: {self.config_path}", file=sys.stderr) - sys.exit(1) + config = yaml.safe_load(content) except yaml.YAMLError as e: print(f"Error parsing config file: {e}", file=sys.stderr) sys.exit(1) @@ -137,7 +179,9 @@ def load_config(self) -> List[Dict[str, str]]: resolved_pattern = template.strip().format(**self.variables) resolved_rules.append({ "name": name, - "pattern": resolved_pattern + "pattern": resolved_pattern, + "dependency": self.dependency, + "version": self.version }) except KeyError as e: print(f"Warning: Missing variable for interpolation in rule {name}: {e}", file=sys.stderr) @@ -178,7 +222,9 @@ def scan_file(file_path: str, compiled_rules: List[Dict[str, re.Pattern]]) -> Li "rule_name": rule["name"], "line_number": line_num, "matched_string": match.group(0).strip(), - "context_line": line.strip() + "context_line": line.strip(), + "dependency": rule.get("dependency", ""), + "version": rule.get("version", "") }) except IOError as e: print(f"Warning: Could not read file {file_path}: {e}", file=sys.stderr) @@ -326,14 +372,12 @@ def load_ignore_file(file_path: str) -> List[str]: Read ignore paths from a file. """ ignore_dirs = [] - try: - with open(file_path, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - if line and not line.startswith('#'): - ignore_dirs.append(line) - except FileNotFoundError: - pass + content = _safe_read_file(file_path, required=False, silent_missing=True) + if content: + for line in content.splitlines(): + line = line.strip() + if line and not line.startswith('#'): + ignore_dirs.append(line) return ignore_dirs @@ -445,30 +489,22 @@ def read_package_file(file_path: str) -> List[str]: A list of package paths. """ packages = [] - try: - with open(file_path, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - if line and not line.startswith('#'): - packages.append(line) - except FileNotFoundError: - print(f"Error: Package file not found: {file_path}", file=sys.stderr) - sys.exit(1) - except PermissionError: - print(f"Error: Permission denied reading package file: {file_path}", file=sys.stderr) - sys.exit(1) - except IOError as e: - print(f"Error reading package file: {e}", file=sys.stderr) - sys.exit(1) + content = _safe_read_file(file_path, required=True, description="package file") + if content: + for line in content.splitlines(): + line = line.strip() + if line and not line.startswith('#'): + packages.append(line) return packages def scan_repository( root_path: str, - rules: List[Dict[str, str]], + rules: List[Dict[str, Any]], target_packages: List[str] = None, ignore_dirs: List[str] = None, - version_string: str = None + version_string: str = None, + targets: List[Tuple[str, str]] = None ) -> List[Dict[str, Any]]: """ Scans the repository directory tree applying resolved regex patterns to files. @@ -487,21 +523,30 @@ def scan_repository( performs a full recursive scan of the repository. ignore_dirs: Optional list of directory names or glob-like files to ignore (case-insensitive). version_string: Optional target version string (e.g. "3.7") to scan for in filenames. + targets: Optional list of (dependency, version) tuples. Returns: - A list of dictionaries detailing each match: 'file_path', 'repo_path', - 'package_name', 'rule_name', 'line_number', 'matched_string', 'context_line'. + A list of dictionaries detailing each match. """ ignore_lower = {i.lower() for i in ignore_dirs} if ignore_dirs else set() results = [] + filename_targets = [] + if targets: + filename_targets = targets + elif version_string: + dep = rules[0].get("dependency") if rules else None + filename_targets = [(dep, version_string)] + # Compile patterns once here compiled_rules = [] for rule in rules: try: compiled_rules.append({ "name": rule["name"], - "pattern": re.compile(rule["pattern"], re.IGNORECASE) + "pattern": re.compile(rule["pattern"], re.IGNORECASE), + "dependency": rule.get("dependency", ""), + "version": rule.get("version", "") }) except re.error as e: print(f"Error compiling regex for rule {rule['name']}: {e}", file=sys.stderr) @@ -541,13 +586,16 @@ def scan_repository( matches = scan_file(file_path, compiled_rules) # Add filename match if applicable - if version_string and version_string in file: - matches.append({ - "rule_name": "filename_match", - "line_number": 0, - "matched_string": version_string, - "context_line": f"Filename contains {version_string}" - }) + for dep, ver in filename_targets: + if ver and ver in file: + matches.append({ + "rule_name": "filename_match", + "line_number": 0, + "matched_string": ver, + "context_line": f"Filename contains {ver}", + "dependency": dep or "", + "version": ver + }) # Compute display path and package name rel_file_path = os.path.relpath(file_path, root_path) @@ -576,6 +624,38 @@ def scan_repository( return results +def parse_targets_file(file_path: str) -> List[Tuple[str, str]]: + """ + Parses a YAML targets file into a list of (dependency, version) tuples. + """ + content = _safe_read_file(file_path, required=True, description="targets file") + try: + raw_targets = yaml.safe_load(content) + except Exception as e: + print(f"Error parsing targets YAML mapping: {e}", file=sys.stderr) + sys.exit(1) + + if not isinstance(raw_targets, dict): + print("Error: Targets file content must resolve to a YAML mapping", file=sys.stderr) + sys.exit(1) + + targets = [] + for dep, versions in raw_targets.items(): + if isinstance(versions, list): + for v in versions: + if v is None or isinstance(v, (dict, list)): + print(f"Error: Invalid version '{v}' for dependency '{dep}'", file=sys.stderr) + sys.exit(1) + targets.append((str(dep), str(v))) + elif versions is not None and not isinstance(versions, dict): + targets.append((str(dep), str(versions))) + else: + print(f"Error: Invalid version '{versions}' for dependency '{dep}'", file=sys.stderr) + sys.exit(1) + + return targets + + def main(): script_dir = os.path.dirname(os.path.abspath(__file__)) default_config = os.path.join(script_dir, "regex_config.yaml") @@ -586,16 +666,19 @@ def main(): parser.add_argument( "-d", "--dependency", - required=True, help="Name of the dependency (e.g., python, protobuf)" ) parser.add_argument( "-v", "--version", - required=True, help="Specific version to search for (e.g., 3.7, 4.25.8)" ) + parser.add_argument( + "--targets-file", + help="Path to a YAML file containing target dependencies and versions." + ) + parser.add_argument( "-p", "--path", default=".", @@ -659,6 +742,25 @@ def main(): args = parser.parse_args() + # Validation of required inputs + has_single_target = bool(args.dependency and args.version) + has_targets_file = bool(args.targets_file) + + if not (has_single_target or has_targets_file): + parser.error("Must specify either (-d/--dependency AND -v/--version) OR (--targets-file)") + if has_single_target and has_targets_file: + parser.error("Cannot specify both single target (-d/-v) and targets file (--targets-file)") + + targets = [] + if has_targets_file: + targets = parse_targets_file(args.targets_file) + else: + targets = [(args.dependency, args.version)] + + if not targets: + print("Error: No targets resolved to scan.", file=sys.stderr) + sys.exit(1) + # Resolve target packages if filtering is requested target_packages = [] if args.package: @@ -670,7 +772,12 @@ def main(): elif args.package_file: target_packages = read_package_file(args.package_file) - print(f"Starting scan for dependency: {args.dependency} version: {args.version}") + if has_targets_file: + print("Starting scan for multiple targets:") + for dep, ver in targets: + print(f" - {dep}: {ver}") + else: + print(f"Starting scan for dependency: {args.dependency} version: {args.version}") print(f"Root path: {args.path}") print("Targets to scan:") if target_packages: @@ -681,8 +788,10 @@ def main(): print(f"Using config: {args.config}") # Load and resolve rules - config_manager = ConfigManager(args.config, args.dependency, args.version) - rules = config_manager.load_config() + rules = [] + for dep, ver in targets: + config_manager = ConfigManager(args.config, dep, ver) + rules.extend(config_manager.load_config()) @@ -695,7 +804,14 @@ def main(): print(f"Loaded {len(ignore_dirs)} ignore patterns from {ignore_file_path}") # Scan repository - all_matches = scan_repository(args.path, rules, target_packages, ignore_dirs, version_string=args.version) + all_matches = scan_repository( + args.path, + rules, + target_packages, + ignore_dirs, + version_string=(None if has_targets_file else args.version), + targets=targets + ) print(f"\nFound {len(all_matches)} matches.") display_matches = all_matches if args.stdout else all_matches[:10] @@ -717,7 +833,11 @@ def main(): script_dir = os.path.dirname(os.path.abspath(__file__)) results_dir = os.path.join(script_dir, "results") os.makedirs(results_dir, exist_ok=True) - output_path = os.path.join(results_dir, f"{args.dependency}-{args.version}-{timestamp}.csv") + if has_targets_file: + base_name = os.path.splitext(os.path.basename(args.targets_file))[0] + output_path = os.path.join(results_dir, f"{base_name}-{timestamp}.csv") + else: + output_path = os.path.join(results_dir, f"{args.dependency}-{args.version}-{timestamp}.csv") write_csv_report(output_path, all_matches)