From 5e66da1fe991be469205cd0954748b11ee63f5bd Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Tue, 16 Jun 2026 10:15:11 -0400 Subject: [PATCH] feat(version-scanner): support target list inputs and refine python version checks --- scripts/version_scanner/regex_config.yaml | 46 +++++- .../tests/unit/test_version_scanner.py | 135 ++++++++++++++- scripts/version_scanner/version_scanner.py | 156 +++++++++++++++--- 3 files changed, 301 insertions(+), 36 deletions(-) diff --git a/scripts/version_scanner/regex_config.yaml b/scripts/version_scanner/regex_config.yaml index 95e62fe002aa..312e7c67d7f0 100644 --- a/scripts/version_scanner/regex_config.yaml +++ b/scripts/version_scanner/regex_config.yaml @@ -17,20 +17,23 @@ rules: examples: - "python_requires = '==3.7'" - "python_requires = '>=3.7'" + - "python_requires = '>=3.7.0'" - "python_requires = '<=3.7'" - "python_requires = '>3.6'" - "python_requires = '<3.8'" rules: - | - python_requires\s*=\s*['"]==3\.{minor}['"] + python_requires\s*=\s*['"]==3\.{minor}(?:\.\d+)?['"] - | - python_requires\s*=\s*['"]>=3\.{minor}['"] + python_requires\s*=\s*['"]>=3\.{minor}(?:\.\d+)?['"] - | - python_requires\s*=\s*['"]<=3\.{minor}['"] + python_requires\s*=\s*['"]<=3\.{minor}(?:\.\d+)?['"] + # Matches >3.6 (equivalent to >=3.7) - | - python_requires\s*=\s*['"]>3\.{minor_minus_one}['"] + python_requires\s*=\s*['"]>3\.{minor_minus_one}(?:\.\d+)?['"] + # Matches <3.8 (equivalent to <=3.7) - | - python_requires\s*=\s*['"]<3\.{minor_plus_one}['"] + python_requires\s*=\s*['"]<3\.{minor_plus_one}(?:\.\d+)?['"] - name: sys_version_info description: Finds sys.version_info checks in code. @@ -46,6 +49,11 @@ rules: - "sys.version_info.minor <= 7" - "sys.version_info.minor > 6" - "sys.version_info.minor < 8" + - "sys.version_info[1] == 7" + - "sys.version_info[1] >= 7" + - "sys.version_info[1] <= 7" + - "sys.version_info[1] > 6" + - "sys.version_info[1] < 8" rules: - | sys\.version_info\s*==\s*\(3,\s*{minor}\) @@ -53,8 +61,10 @@ rules: sys\.version_info\s*>=\s*\(3,\s*{minor}\) - | sys\.version_info\s*<=\s*\(3,\s*{minor}\) + # Matches sys.version_info > (3, 6) (equivalent to >=3.7) - | sys\.version_info\s*>\s*\(3,\s*{minor_minus_one}\) + # Matches sys.version_info < (3, 8) (equivalent to <=3.7) - | sys\.version_info\s*<\s*\(3,\s*{minor_plus_one}\) - | @@ -63,10 +73,24 @@ rules: sys\.version_info\.minor\s*>=\s*{minor}(?!\d) - | sys\.version_info\.minor\s*<=\s*{minor}(?!\d) + # Matches sys.version_info.minor > 6 (equivalent to >=7) - | sys\.version_info\.minor\s*>\s*{minor_minus_one}(?!\d) + # Matches sys.version_info.minor < 8 (equivalent to <=7) - | sys\.version_info\.minor\s*<\s*{minor_plus_one}(?!\d) + - | + sys\.version_info\[1\]\s*==\s*{minor}(?!\d) + - | + sys\.version_info\[1\]\s*>=\s*{minor}(?!\d) + - | + sys\.version_info\[1\]\s*<=\s*{minor}(?!\d) + # Matches sys.version_info[1] > 6 (equivalent to >=7) + - | + sys\.version_info\[1\]\s*>\s*{minor_minus_one}(?!\d) + # Matches sys.version_info[1] < 8 (equivalent to <=7) + - | + sys\.version_info\[1\]\s*<\s*{minor_plus_one}(?!\d) - name: python_env_short description: Finds short python environment names often used in tox or nox. @@ -99,4 +123,16 @@ rules: - | Python{major}{minor}(?!\d) + - name: dependency_requirement + description: Finds standard dependency requirement formats (e.g., protobuf==3.7). + examples: + - "protobuf==3.7" + - "protobuf>=3.7" + - "protobuf<=3.7" + - "protobuf~=3.7" + - "protobuf!=3.7" + rules: + - | + {name}\s*(?:==|>=|<=|~=|!=)\s*{version} + diff --git a/scripts/version_scanner/tests/unit/test_version_scanner.py b/scripts/version_scanner/tests/unit/test_version_scanner.py index f5a909e849e8..cd8fd7a88dc5 100644 --- a/scripts/version_scanner/tests/unit/test_version_scanner.py +++ b/scripts/version_scanner/tests/unit/test_version_scanner.py @@ -227,7 +227,6 @@ def test_main_package_file_permission_error(tmp_path, capsys): package_file = tmp_path / "packages.txt" package_file.write_text("packages/pkg_a") - import sys test_args = ["version_scanner.py", "-d", "python", "-v", "3.7", "--package-file", str(package_file)] real_open = open @@ -246,7 +245,6 @@ def side_effect(file, *args, **kwargs): captured = capsys.readouterr() assert "Error: Permission denied reading package file" in captured.err def test_main_package_file_not_found(capsys): - import sys test_args = ["version_scanner.py", "-d", "python", "-v", "3.7", "--package-file", "non_existent_file.txt"] with patch("sys.argv", test_args): @@ -323,7 +321,6 @@ def test_main_loads_ignore_from_script_dir(mock_scan, mock_load_ignore): mock_load_ignore.return_value = [] mock_scan.return_value = [] - import sys test_args = ["version_scanner.py", "-d", "python", "-v", "3.7"] with mock.patch('sys.argv', test_args): @@ -339,7 +336,8 @@ def test_main_loads_ignore_from_script_dir(mock_scan, mock_load_ignore): try: - import googleapiclient + # Ruff linter F401: Imported solely to detect Google API Client library presence for test skipping + import googleapiclient # noqa: F401 HAS_GOOGLE_API = True except ImportError: HAS_GOOGLE_API = False @@ -407,8 +405,8 @@ def test_regex_examples_from_config(): rules_list = config.get("rules", []) - # Variables for interpolation (simulate Python 3.7) - vars = { + # Base variables for interpolation (simulate target version 3.7) + base_vars = { "major": "3", "minor": "7", "version": "3.7", @@ -424,6 +422,11 @@ def test_regex_examples_from_config(): if not examples or not templates: continue + # Resolve target dependency name based on applies_to metadata, falling back to protobuf + applies_to = rule_group.get("applies_to", []) + dep_name = applies_to[0] if applies_to else "protobuf" + vars = {**base_vars, "name": dep_name} + compiled_patterns = [] for template in templates: try: @@ -638,39 +641,67 @@ def test_format_for_raw_csv_handles_empty_line_number(): def test_format_for_raw_csv(): match = { + "file_name": "setup.py", "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", "repo_path": "packages/pkg_a/setup.py", "package_name": "pkg_a", "rule_name": "python_requires_check", "line_number": "123", "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'" + "context_line": "python_requires = '>=3.7'", + "dependency": "python", + "version": "3.7" } formatted = format_for_raw_csv(match) + assert formatted["file_name"] == "setup.py" assert formatted["file_path"] == "google-cloud-python/main/packages/pkg_a/setup.py" assert formatted["package_name"] == "pkg_a" assert formatted["rule_name"] == "python_requires_check" assert formatted["line_number"] == 123 # Int conversion assert formatted["matched_string"] == "3.7" # No formula wrapping assert formatted["context_line"] == "python_requires = '>=3.7'" + assert formatted["dependency"] == "python" + assert formatted["version"] == "3.7" + +def test_format_for_raw_csv_fallback_filename(): + match = { + "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", + "repo_path": "packages/pkg_a/setup.py", + "package_name": "pkg_a", + "rule_name": "python_requires_check", + "line_number": "123", + "matched_string": "3.7", + "context_line": "python_requires = '>=3.7'", + "dependency": "python", + "version": "3.7" + } + + formatted = format_for_raw_csv(match) + assert formatted["file_name"] == "setup.py" def test_format_for_spreadsheet(): match = { + "file_name": "setup.py", "file_path": "google-cloud-python/main/packages/pkg_a/setup.py", "repo_path": "packages/pkg_a/setup.py", "package_name": "pkg_a", "rule_name": "python_requires_check", "line_number": 123, "matched_string": "3.7", - "context_line": "python_requires = '>=3.7'" + "context_line": "python_requires = '>=3.7'", + "dependency": "python", + "version": "3.7" } # Without github_repo formatted_no_repo = format_for_spreadsheet(match) + assert formatted_no_repo["file_name"] == "setup.py" assert formatted_no_repo["line_number"] == 123 assert formatted_no_repo["matched_string"] == '="3.7"' # Decimal protection formula + assert formatted_no_repo["dependency"] == "python" + assert formatted_no_repo["version"] == "3.7" # With github_repo formatted_repo = format_for_spreadsheet(match, github_repo="https://github.com/user/repo", branch="main") @@ -695,3 +726,91 @@ def test_format_for_console(): assert "3.7" in log_str assert "python_requires = " not in log_str # Slim format doesn't print context line +def test_parse_targets_inline_json(): + from version_scanner import parse_targets + json_str = '{"python": ["3.7", "3.8"], "protobuf": "4.25.8"}' + targets = parse_targets(json_str) + assert targets == [("python", "3.7"), ("python", "3.8"), ("protobuf", "4.25.8")] + +def test_parse_targets_inline_yaml(): + from version_scanner import parse_targets + yaml_str = """ +python: + - "3.7" + - "3.8" +protobuf: "4.25.8" +""" + targets = parse_targets(yaml_str) + assert targets == [("python", "3.7"), ("python", "3.8"), ("protobuf", "4.25.8")] + +def test_parse_targets_from_file(tmp_path): + from version_scanner import parse_targets + yaml_file = tmp_path / "targets.yaml" + yaml_file.write_text(""" +python: + - "3.7" + - "3.8" +protobuf: "4.25.8" +""") + targets = parse_targets(str(yaml_file)) + assert targets == [("python", "3.7"), ("python", "3.8"), ("protobuf", "4.25.8")] + +def test_parse_targets_invalid_syntax(): + from version_scanner import parse_targets + with pytest.raises(SystemExit) as excinfo: + parse_targets('{"invalid"') + assert excinfo.value.code == 1 + +def test_scan_repository_multi_targets(tmp_path): + # Setup files in tmp repository + file1 = tmp_path / "packages" / "pkg1" / "setup.py" + file1.parent.mkdir(parents=True) + file1.write_text("python_requires = '>=3.7'\n") + + file2 = tmp_path / "packages" / "pkg2" / "requirements.txt" + file2.parent.mkdir(parents=True) + file2.write_text("protobuf==4.25.8\n") + + # Let's mock a config file with rules for both python and protobuf + config_file = tmp_path / "regex_config.yaml" + config_file.write_text(""" +rules: + - name: python_requires_check + applies_to: + - python + rules: + - python_requires\\s*=\\s*['\"]>={version}['\"] + - name: protobuf_check + applies_to: + - protobuf + rules: + - protobuf=={version} +""") + + from version_scanner import ConfigManager, scan_repository + + targets = [("python", "3.7"), ("protobuf", "4.25.8")] + rules = [] + for dep, ver in targets: + cm = ConfigManager(str(config_file), dep, ver) + rules.extend(cm.load_config()) + + results = scan_repository(str(tmp_path), rules, targets=targets) + + # We should have 2 matches + assert len(results) == 2 + + # Match for python + python_match = [r for r in results if r["dependency"] == "python"] + assert len(python_match) == 1 + assert python_match[0]["version"] == "3.7" + assert python_match[0]["rule_name"] == "python_requires_check" + assert python_match[0]["file_name"] == "setup.py" + + # Match for protobuf + protobuf_match = [r for r in results if r["dependency"] == "protobuf"] + assert len(protobuf_match) == 1 + assert protobuf_match[0]["version"] == "4.25.8" + assert protobuf_match[0]["rule_name"] == "protobuf_check" + assert protobuf_match[0]["file_name"] == "requirements.txt" + diff --git a/scripts/version_scanner/version_scanner.py b/scripts/version_scanner/version_scanner.py index 90234a967665..183192cfdf89 100644 --- a/scripts/version_scanner/version_scanner.py +++ b/scripts/version_scanner/version_scanner.py @@ -137,7 +137,9 @@ def load_config(self) -> List[Dict[str, str]]: resolved_pattern = template.strip().format(**self.variables) resolved_rules.append({ "name": name, - "pattern": resolved_pattern + "pattern": resolved_pattern, + "dependency": self.dependency, + "version": self.version }) except KeyError as e: print(f"Warning: Missing variable for interpolation in rule {name}: {e}", file=sys.stderr) @@ -178,7 +180,9 @@ def scan_file(file_path: str, compiled_rules: List[Dict[str, re.Pattern]]) -> Li "rule_name": rule["name"], "line_number": line_num, "matched_string": match.group(0).strip(), - "context_line": line.strip() + "context_line": line.strip(), + "dependency": rule.get("dependency", ""), + "version": rule.get("version", "") }) except IOError as e: print(f"Warning: Could not read file {file_path}: {e}", file=sys.stderr) @@ -241,13 +245,19 @@ def _safe_int(value: Any, default: int = 0) -> int: def format_for_raw_csv(match: Dict[str, str]) -> Dict[str, str]: """Prepares a full raw dataset (n + x columns) with clean text values.""" + file_name = match.get("file_name") + if not file_name and match.get("file_path"): + file_name = os.path.basename(match.get("file_path")) return { + "file_name": file_name or "", "file_path": match.get("file_path", ""), "package_name": match.get("package_name", ""), "rule_name": match.get("rule_name", ""), "line_number": _safe_int(match.get("line_number")), "matched_string": match.get("matched_string", ""), - "context_line": _truncate_context(match.get("context_line", ""), match.get("matched_string", "")) + "context_line": _truncate_context(match.get("context_line", ""), match.get("matched_string", "")), + "dependency": match.get("dependency", ""), + "version": match.get("version", "") } @@ -342,7 +352,7 @@ def write_csv_report( output_path: Path to the output CSV file. matches: A list of dictionaries containing match details. """ - fieldnames = ["file_path", "package_name", "rule_name", "line_number", "matched_string", "context_line"] + fieldnames = ["file_name", "file_path", "package_name", "rule_name", "dependency", "version", "line_number", "matched_string", "context_line"] try: with open(output_path, 'w', encoding='utf-8', newline='') as f: @@ -391,13 +401,16 @@ def upload_to_drive(csv_path: str, matches: List[Dict[str, str]], github_repo: s spreadsheet_id = spreadsheet.get('spreadsheetId') # Prepare data - values = [["file_path", "package_name", "rule_name", "line_number", "matched_string", "context_line"]] + values = [["file_name", "file_path", "package_name", "rule_name", "dependency", "version", "line_number", "matched_string", "context_line"]] for m in matches: formatted_m = format_for_spreadsheet(m, github_repo=github_repo, branch=branch) values.append([ + formatted_m.get("file_name", ""), formatted_m.get("file_path", ""), formatted_m.get("package_name", ""), formatted_m.get("rule_name", ""), + formatted_m.get("dependency", ""), + formatted_m.get("version", ""), str(formatted_m.get("line_number", "")), formatted_m.get("matched_string", ""), formatted_m.get("context_line", "") @@ -459,7 +472,8 @@ def scan_repository( rules: List[Dict[str, str]], target_packages: List[str] = None, ignore_dirs: List[str] = None, - version_string: str = None + version_string: str = None, + targets: List[Tuple[str, str]] = None ) -> List[Dict[str, str]]: """ Scans the repository directory tree applying resolved regex patterns to files. @@ -478,6 +492,7 @@ def scan_repository( performs a full recursive scan of the repository. ignore_dirs: Optional list of directory names or glob-like files to ignore (case-insensitive). version_string: Optional target version string (e.g. "3.7") to scan for in filenames. + targets: Optional list of (dependency, version) tuples to check for in filenames. Returns: A list of dictionaries detailing each match: 'file_path', 'repo_path', @@ -486,13 +501,22 @@ def scan_repository( ignore_lower = {i.lower() for i in ignore_dirs} if ignore_dirs else set() results = [] + filename_targets = [] + if targets: + filename_targets = targets + elif version_string: + dep = rules[0].get("dependency") if rules else None + filename_targets = [(dep, version_string)] + # Compile patterns once here compiled_rules = [] for rule in rules: try: compiled_rules.append({ "name": rule["name"], - "pattern": re.compile(rule["pattern"], re.IGNORECASE) + "pattern": re.compile(rule["pattern"], re.IGNORECASE), + "dependency": rule.get("dependency", ""), + "version": rule.get("version", "") }) except re.error as e: print(f"Error compiling regex for rule {rule['name']}: {e}", file=sys.stderr) @@ -510,7 +534,6 @@ def scan_repository( files = [f for f in files if f.lower() not in ignore_lower] rel_root = os.path.relpath(root, root_path) - parts = rel_root.split(os.sep) # Layout-agnostic generic subdirectory filtering if target_packages: @@ -533,13 +556,16 @@ def scan_repository( matches = scan_file(file_path, compiled_rules) # Add filename match if applicable - if version_string and version_string in file: - matches.append({ - "rule_name": "filename_match", - "line_number": 0, - "matched_string": version_string, - "context_line": f"Filename contains {version_string}" - }) + for dep, ver in filename_targets: + if ver and ver in file: + matches.append({ + "rule_name": "filename_match", + "line_number": 0, + "matched_string": ver, + "context_line": f"Filename contains {ver}", + "dependency": dep or "", + "version": ver + }) # Compute display path and package name rel_file_path = os.path.relpath(file_path, root_path) @@ -559,6 +585,7 @@ def scan_repository( display_path = rel_file_path for m in matches: + m["file_name"] = file m["file_path"] = display_path m["repo_path"] = rel_file_path m["package_name"] = package_name @@ -567,6 +594,46 @@ def scan_repository( return results +def parse_targets(targets_input: str) -> List[Tuple[str, str]]: + """ + Parses a targets input (file path or inline YAML/JSON string) into a list of (dependency, version) tuples. + """ + raw_targets = {} + content = targets_input + + # Check if the input is a file path + if os.path.exists(targets_input): + try: + with open(targets_input, 'r', encoding='utf-8') as f: + content = f.read() + except PermissionError: + print(f"Error: Permission denied reading targets file: {targets_input}", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"Error reading targets file {targets_input}: {e}", file=sys.stderr) + sys.exit(1) + + try: + raw_targets = yaml.safe_load(content) + except Exception as e: + print(f"Error parsing targets YAML/JSON content: {e}", file=sys.stderr) + sys.exit(1) + + if not isinstance(raw_targets, dict): + print("Error: Targets input must resolve to a JSON object or YAML mapping", file=sys.stderr) + sys.exit(1) + + targets = [] + for dep, versions in raw_targets.items(): + if isinstance(versions, list): + for v in versions: + targets.append((str(dep), str(v))) + else: + targets.append((str(dep), str(versions))) + + return targets + + def main(): script_dir = os.path.dirname(os.path.abspath(__file__)) default_config = os.path.join(script_dir, "regex_config.yaml") @@ -577,16 +644,19 @@ def main(): parser.add_argument( "-d", "--dependency", - required=True, help="Name of the dependency (e.g., python, protobuf)" ) parser.add_argument( "-v", "--version", - required=True, help="Specific version to search for (e.g., 3.7, 4.25.8)" ) + parser.add_argument( + "--targets", + help="Path to a YAML/JSON targets file, or an inline YAML/JSON string (e.g. 'python: [3.8, 3.9]')" + ) + parser.add_argument( "-p", "--path", default=".", @@ -650,6 +720,25 @@ def main(): args = parser.parse_args() + # Validation of required inputs + has_single_target = bool(args.dependency and args.version) + has_targets_list = bool(args.targets) + + if not (has_single_target or has_targets_list): + parser.error("Must specify either (-d/--dependency AND -v/--version) OR (--targets)") + if has_single_target and has_targets_list: + parser.error("Cannot specify both single target (-d/-v) and targets list (--targets)") + + targets = [] + if has_targets_list: + targets = parse_targets(args.targets) + else: + targets = [(args.dependency, args.version)] + + if not targets: + print("Error: No targets resolved to scan.", file=sys.stderr) + sys.exit(1) + # Resolve target packages if filtering is requested target_packages = [] if args.package: @@ -661,9 +750,14 @@ def main(): elif args.package_file: target_packages = read_package_file(args.package_file) - print(f"Starting scan for dependency: {args.dependency} version: {args.version}") + if has_targets_list: + print("Starting scan for multiple targets:") + for dep, ver in targets: + print(f" - {dep}: {ver}") + else: + print(f"Starting scan for dependency: {args.dependency} version: {args.version}") print(f"Root path: {args.path}") - print(f"Targets to scan:") + print("Targets to scan:") if target_packages: for pkg in target_packages: print(f" - {os.path.join(args.path, pkg)}") @@ -672,8 +766,10 @@ def main(): print(f"Using config: {args.config}") # Load and resolve rules - config_manager = ConfigManager(args.config, args.dependency, args.version) - rules = config_manager.load_config() + rules = [] + for dep, ver in targets: + config_manager = ConfigManager(args.config, dep, ver) + rules.extend(config_manager.load_config()) @@ -686,7 +782,14 @@ def main(): print(f"Loaded {len(ignore_dirs)} ignore patterns from {ignore_file_path}") # Scan repository - all_matches = scan_repository(args.path, rules, target_packages, ignore_dirs, version_string=args.version) + all_matches = scan_repository( + args.path, + rules, + target_packages, + ignore_dirs, + version_string=(None if has_targets_list else args.version), + targets=targets + ) print(f"\nFound {len(all_matches)} matches.") display_matches = all_matches if args.stdout else all_matches[:10] @@ -708,7 +811,14 @@ def main(): script_dir = os.path.dirname(os.path.abspath(__file__)) results_dir = os.path.join(script_dir, "results") os.makedirs(results_dir, exist_ok=True) - output_path = os.path.join(results_dir, f"{args.dependency}-{args.version}-{timestamp}.csv") + if has_targets_list: + if os.path.exists(args.targets): + base_name = os.path.splitext(os.path.basename(args.targets))[0] + else: + base_name = "targets" + output_path = os.path.join(results_dir, f"{base_name}-{timestamp}.csv") + else: + output_path = os.path.join(results_dir, f"{args.dependency}-{args.version}-{timestamp}.csv") write_csv_report(output_path, all_matches)