diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 88a926f..ef70cd6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,11 +30,6 @@ jobs: pip install -e '.[dev,full]' - name: Lint with ruff - # Advisory for now: the ruff step had never actually run (the workflow - # only triggered on `main` while the repo lives on `master`), so it - # surfaces a large backlog of pre-existing style findings. Tracked for a - # dedicated cleanup; kept non-blocking so it doesn't gate unrelated work. - continue-on-error: true run: ruff check src/ - name: Dangling-code audit (stubs + swallowed excepts) diff --git a/src/pacer_cli/auth.py b/src/pacer_cli/auth.py index 7583805..d93cda3 100644 --- a/src/pacer_cli/auth.py +++ b/src/pacer_cli/auth.py @@ -9,7 +9,6 @@ """ from dataclasses import dataclass -from typing import Optional import requests @@ -21,8 +20,8 @@ class AuthResult: """Result of PACER authentication attempt.""" success: bool - token: Optional[str] = None # nextGenCSO 128-byte token - error: Optional[str] = None + token: str | None = None # nextGenCSO 128-byte token + error: str | None = None login_result: str = "" @@ -41,7 +40,7 @@ def generate_totp(secret: str) -> str: def authenticate( config: PacerConfig, - otp_code: Optional[str] = None, + otp_code: str | None = None, ) -> AuthResult: """Authenticate with PACER and get session token. @@ -152,7 +151,7 @@ def logout(config: PacerConfig, token: str) -> bool: return False -def test_credentials(config: PacerConfig, otp_code: Optional[str] = None) -> AuthResult: +def test_credentials(config: PacerConfig, otp_code: str | None = None) -> AuthResult: """Test PACER credentials without keeping the session. Authenticates and immediately logs out to verify credentials work. diff --git a/src/pacer_cli/cli.py b/src/pacer_cli/cli.py index 728d603..60f8639 100644 --- a/src/pacer_cli/cli.py +++ b/src/pacer_cli/cli.py @@ -6,12 +6,11 @@ import re import sys from pathlib import Path -from typing import Optional import click from rich.console import Console from rich.panel import Panel -from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn +from rich.progress import BarColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn from rich.table import Table from .config import ( @@ -54,7 +53,7 @@ "doc": ["download", "document"], "grep": ["search"], "find": ["search"], - + # Alternative PCL shortcuts "cases": ["pcl", "cases"], "parties": ["pcl", "parties"], @@ -63,25 +62,25 @@ class AliasGroup(click.Group): """Click Group that supports command aliases. - + Allows users to use shorter command names that map to full paths. Example: 'pacer docket' -> 'pacer download docket' """ - - def get_command(self, ctx: click.Context, cmd_name: str) -> Optional[click.Command]: + + def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None: # Try exact match first rv = click.Group.get_command(self, ctx, cmd_name) if rv is not None: return rv - + # Check if it's an alias if cmd_name in COMMAND_ALIASES: alias_path = COMMAND_ALIASES[cmd_name] - + # For single-element paths, just look up directly if len(alias_path) == 1: return click.Group.get_command(self, ctx, alias_path[0]) - + # For multi-part paths (e.g., ["download", "docket"]), # navigate through subgroups current_group = self @@ -90,27 +89,27 @@ def get_command(self, ctx: click.Context, cmd_name: str) -> Optional[click.Comma if sub_cmd is None or not isinstance(sub_cmd, click.Group): return None current_group = sub_cmd - + return click.Group.get_command(current_group, ctx, alias_path[-1]) - + return None - + def resolve_command(self, ctx: click.Context, args: list) -> tuple: """Resolve command, handling multi-part aliases.""" cmd_name = args[0] if args else None - + if cmd_name and cmd_name in COMMAND_ALIASES: # Get the actual command cmd = self.get_command(ctx, cmd_name) if cmd: return cmd_name, cmd, args[1:] - + return super().resolve_command(ctx, args) - + def format_commands(self, ctx: click.Context, formatter: click.HelpFormatter) -> None: """Add aliases section to help output.""" super().format_commands(ctx, formatter) - + # Show aliases in help with formatter.section("Aliases"): alias_rows = [] @@ -129,7 +128,7 @@ def confirm_cost( ctx: click.Context, operation: str, estimated_cost: float, - details: Optional[str] = None, + details: str | None = None, ) -> bool: """Prompt user to confirm a billable operation. @@ -189,8 +188,8 @@ def enforce_spend( operation: str, estimated_cost: float, *, - client_code: Optional[str] = None, - details: Optional[str] = None, + client_code: str | None = None, + details: str | None = None, ) -> bool: """Preventive spend-cap gate. Replaces the raw confirm_cost call. @@ -297,7 +296,11 @@ def wrapper(*args, **kwargs): @click.group(cls=AliasGroup) @click.version_option() @click.option("--yes", "-y", is_flag=True, help="Skip cost confirmation prompts") -@click.option("--agent", is_flag=True, help="Non-interactive mode for AI agents (JSON errors, exit 3 on cap).") +@click.option( + "--agent", + is_flag=True, + help="Non-interactive mode for AI agents (JSON errors, exit 3 on cap).", +) @click.pass_context def cli(ctx, yes: bool, agent: bool): """PACER CLI - Download, parse, and search federal court documents. @@ -316,7 +319,7 @@ def cli(ctx, yes: bool, agent: bool): \b Short aliases: pacer docket → pacer download docket - pacer doc → pacer download document + pacer doc → pacer download document pacer grep → pacer search pacer cases → pacer pcl cases pacer parties → pacer pcl parties @@ -347,7 +350,7 @@ def cli(ctx, yes: bool, agent: bool): ctx.obj["config"] = get_config() else: ctx.obj["config"] = get_config() - + # Check for legacy archive on first run (unless migrated) if not migration_marker_exists(): legacy_path = check_legacy_archive() @@ -372,9 +375,13 @@ def cli(ctx, yes: bool, agent: bool): @cli.command("migrate") @click.option("--dry-run", is_flag=True, help="Show what would be moved without moving") -@click.option("--legacy-dir", type=click.Path(exists=True, path_type=Path), help="Legacy archive directory") +@click.option( + "--legacy-dir", + type=click.Path(exists=True, path_type=Path), + help="Legacy archive directory", +) @click.pass_context -def migrate_archive(ctx, dry_run: bool, legacy_dir: Optional[Path]): +def migrate_archive(ctx, dry_run: bool, legacy_dir: Path | None): """Migrate from legacy flat archive to new hierarchical structure. \b @@ -394,18 +401,18 @@ def migrate_archive(ctx, dry_run: bool, legacy_dir: Optional[Path]): pacer migrate --legacy-dir ./old # Migrate from custom location """ from .downloader import extract_document_metadata - + config: PacerConfig = ctx.obj["config"] - + # Find legacy files source_dir = legacy_dir or config.docket_archive if not source_dir.exists(): console.print(f"[yellow]No legacy archive found at:[/] {source_dir}") return - + # Pattern: {court}_{case}.html where case has + for : legacy_pattern = re.compile(r"^([a-z]{2,5}dce)_(.+)\.html$") - + files_to_migrate = [] for html_file in source_dir.glob("*.html"): match = legacy_pattern.match(html_file.name) @@ -421,38 +428,38 @@ def migrate_archive(ctx, dry_run: bool, legacy_dir: Optional[Path]): "case": case_normalized, "target_dir": config.archive_root / court_normalized / case_normalized, }) - + if not files_to_migrate: console.print("[yellow]No legacy docket files found to migrate.[/]") if not dry_run: mark_migration_complete() return - + console.print(f"[cyan]Found {len(files_to_migrate)} dockets to migrate[/]\n") - + if dry_run: table = Table(title="Migration Preview") table.add_column("Source", style="dim") table.add_column("Target", style="green") - + for item in files_to_migrate[:20]: table.add_row( item["source"].name, str(item["target_dir"] / "docket.html"), ) - + console.print(table) if len(files_to_migrate) > 20: console.print(f"[dim]... and {len(files_to_migrate) - 20} more[/]") - + console.print("\n[dim]Run without --dry-run to perform migration.[/]") return - + # Perform migration migrated = 0 skipped = 0 errors = 0 - + with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), @@ -461,26 +468,26 @@ def migrate_archive(ctx, dry_run: bool, legacy_dir: Optional[Path]): console=console, ) as progress: task = progress.add_task("Migrating dockets...", total=len(files_to_migrate)) - + for item in files_to_migrate: try: target_dir = item["target_dir"] target_html = target_dir / "docket.html" - + # Skip if already migrated if target_html.exists(): skipped += 1 progress.advance(task) continue - + # Create target directory target_dir.mkdir(parents=True, exist_ok=True) - + # Copy docket file source_html = item["source"] html_content = source_html.read_text(encoding="utf-8", errors="ignore") target_html.write_text(html_content, encoding="utf-8") - + # Generate docs.json from the docket try: import json @@ -500,27 +507,27 @@ def migrate_archive(ctx, dry_run: bool, legacy_dir: Optional[Path]): except Exception: # docs.json generation failed, but docket copy still succeeds pass - + migrated += 1 - + except Exception as e: err_console.print(f"[red]Error migrating {item['source'].name}:[/] {e}") errors += 1 - + progress.advance(task) - + console.print() console.print(f"[green]Migrated:[/] {migrated} dockets") if skipped: console.print(f"[dim]Skipped:[/] {skipped} (already exist)") if errors: console.print(f"[red]Errors:[/] {errors}") - + console.print(f"\n[dim]New archive location:[/] {config.archive_root}") - + # Mark migration complete mark_migration_complete() - + console.print() console.print(Panel( "[green]Migration complete![/]\n\n" @@ -601,14 +608,20 @@ def auth_init(ctx, qa: bool): try: code = generate_totp(totp_secret) console.print(f" [green]Valid![/] Current code: [bold]{code}[/]") - console.print(" [dim]This code should match what you'd see in Authy/Google Authenticator.[/]") + console.print( + " [dim]This code should match what you'd see in " + "Authy/Google Authenticator.[/]" + ) if Confirm.ask(" Does this code look correct?", default=True): break else: console.print(" [yellow]Let's try again.[/]") except Exception as e: console.print(f" [red]Invalid secret:[/] {e}") - console.print(" [dim]The secret should be a Base32 string (letters A-Z and digits 2-7).[/]") + console.print( + " [dim]The secret should be a Base32 string " + "(letters A-Z and digits 2-7).[/]" + ) if not Confirm.ask(" Try again?", default=True): totp_secret = None break @@ -699,7 +712,13 @@ def auth_init(ctx, qa: bool): help="Client billing code (optional)", ) @click.pass_context -def auth_login(ctx, username: Optional[str], password: Optional[str], totp_secret: Optional[str], client_code: Optional[str]): +def auth_login( + ctx, + username: str | None, + password: str | None, + totp_secret: str | None, + client_code: str | None, +): """Store PACER credentials securely. \b @@ -742,7 +761,9 @@ def auth_login(ctx, username: Optional[str], password: Optional[str], totp_secre continue break - config_path = save_credentials(username, password, totp_secret, client_code, vault_passphrase=passphrase) + config_path = save_credentials( + username, password, totp_secret, client_code, vault_passphrase=passphrase + ) console.print(f"[green]Credentials saved to encrypted vault:[/] {config_path}") console.print("[dim]File permissions set to 600 (owner read/write only)[/]") @@ -750,7 +771,10 @@ def auth_login(ctx, username: Optional[str], password: Optional[str], totp_secre console.print("[green]MFA:[/] TOTP secret configured for automatic code generation") else: console.print("[yellow]MFA:[/] No TOTP secret provided.") - console.print("[dim]If your account has MFA enabled, use --totp-secret or pacer auth setup-mfa[/]") + console.print( + "[dim]If your account has MFA enabled, use --totp-secret or " + "pacer auth setup-mfa[/]" + ) @auth.command("code") @@ -772,9 +796,10 @@ def auth_code(ctx, watch: bool): err_console.print("Run [cyan]pacer auth setup-mfa[/] to configure MFA.") sys.exit(1) - from .auth import generate_totp import time + from .auth import generate_totp + secret = config.totp_secret.get_secret_value() if watch: @@ -784,7 +809,9 @@ def auth_code(ctx, watch: bool): code = generate_totp(secret) # Calculate seconds until next code remaining = 30 - (int(time.time()) % 30) - console.print(f"\r[bold green]{code}[/] [dim]expires in {remaining:2d}s[/]", end="") + console.print( + f"\r[bold green]{code}[/] [dim]expires in {remaining:2d}s[/]", end="" + ) time.sleep(1) except KeyboardInterrupt: console.print("\n") @@ -841,7 +868,7 @@ def auth_setup_mfa(ctx, totp_secret: str): @auth.command("test") @click.option("--otp", "-o", default=None, help="Manual OTP code (if not using stored TOTP)") @click.pass_context -def auth_test(ctx, otp: Optional[str]): +def auth_test(ctx, otp: str | None): """Test authentication with PACER servers. Verifies credentials work and immediately logs out. @@ -1002,7 +1029,10 @@ def use_status(ctx): documents_dir = context.case_path / "documents" doc_count = len(list(documents_dir.glob("*.pdf"))) if documents_dir.exists() else 0 - table.add_row("Docket", "[green]downloaded[/]" if docket.exists() else "[dim]not downloaded[/]") + table.add_row( + "Docket", + "[green]downloaded[/]" if docket.exists() else "[dim]not downloaded[/]", + ) table.add_row("Doc manifest", "[green]cached[/]" if docs.exists() else "[dim]none[/]") if doc_count > 0: table.add_row("Documents", f"[green]{doc_count} PDF(s)[/]") @@ -1033,19 +1063,24 @@ def download(): @click.argument("case_number", required=False) @click.argument("court_id", required=False) @click.option("--verbose", "-v", is_flag=True, help="Enable verbose/trace logging") -@click.option("--case-link", "-l", default=None, help="Direct CM/ECF case link URL (bypasses PCL search)") +@click.option( + "--case-link", + "-l", + default=None, + help="Direct CM/ECF case link URL (bypasses PCL search)", +) @click.option("--legacy", is_flag=True, help="Use legacy flat archive structure") @matter_option @click.pass_context @under_spend_lock def download_docket_cmd( ctx, - case_number: Optional[str], - court_id: Optional[str], + case_number: str | None, + court_id: str | None, verbose: bool, case_link: str, legacy: bool, - client_code: Optional[str], + client_code: str | None, ): """Download a single case docket. @@ -1074,7 +1109,9 @@ def download_docket_cmd( context = ContextConfig.load() if not context.is_set: err_console.print("[red]Error:[/] No case specified and no context set.") - err_console.print("[dim]Either provide arguments or use:[/] pacer use case ") + err_console.print( + "[dim]Either provide arguments or use:[/] pacer use case " + ) sys.exit(1) case_number = case_number or context.case_number court_id = court_id or context.court @@ -1119,7 +1156,9 @@ def download_docket_cmd( TextColumn("[progress.description]{task.description}"), console=console, ) as progress: - progress.add_task(f"Downloading docket {case_number} from {court_normalized}...", total=None) + progress.add_task( + f"Downloading docket {case_number} from {court_normalized}...", total=None + ) downloader = DocketDownloader(config, verbose=verbose) @@ -1164,7 +1203,9 @@ def download_docket_cmd( @matter_option @click.pass_context @under_spend_lock -def download_document(ctx, doc_number: str, doc_link: Optional[str], verbose: bool, client_code: Optional[str]): +def download_document( + ctx, doc_number: str, doc_link: str | None, verbose: bool, client_code: str | None +): """Download a single document from a case. \b @@ -1193,7 +1234,9 @@ def download_document(ctx, doc_number: str, doc_link: Optional[str], verbose: bo # Need context to resolve the link if not context.is_set or not context.case_path: err_console.print("[red]Error:[/] No document link provided and no context set.") - err_console.print("[dim]Either provide a link or set context:[/] pacer use case ") + err_console.print( + "[dim]Either provide a link or set context:[/] pacer use case " + ) sys.exit(1) case_dir = context.case_path @@ -1269,7 +1312,7 @@ def download_document(ctx, doc_number: str, doc_link: Optional[str], verbose: bo @click.argument("case", required=False) @click.option("--json", "output_json", is_flag=True, help="Output as JSON") @click.pass_context -def list_documents(ctx, case: Optional[str], output_json: bool): +def list_documents(ctx, case: str | None, output_json: bool): """List documents from a downloaded docket's cache. Uses the docs.json file created during docket download. @@ -1335,7 +1378,8 @@ def list_documents(ctx, case: Optional[str], output_json: bool): ) console.print(table) - console.print(f"\n[dim]Total: {docs.get('document_count', len(docs.get('documents', [])))} documents with links[/]") + total_docs = docs.get("document_count", len(docs.get("documents", []))) + console.print(f"\n[dim]Total: {total_docs} documents with links[/]") console.print(f"[dim]Case: {docs.get('case_title', '')}[/]") @@ -1352,10 +1396,10 @@ def list_documents(ctx, case: Optional[str], output_json: bool): @click.pass_context def view_case( ctx, - case: Optional[str], + case: str | None, output_format: str, verbose: bool, - output: Optional[Path], + output: Path | None, ): """View a parsed docket from the local archive. @@ -1439,7 +1483,14 @@ def view_case( @matter_option @click.pass_context @under_spend_lock -def download_batch(ctx, csv_file: Path, column_court: str, column_case: str, verbose: bool, client_code: Optional[str]): +def download_batch( + ctx, + csv_file: Path, + column_court: str, + column_case: str, + verbose: bool, + client_code: str | None, +): """Download multiple dockets from a CSV file. The CSV should have columns for court ID and case number. @@ -1526,7 +1577,7 @@ def parse(): help="Input directory (default: local_docket_archive)", ) @click.pass_context -def parse_all(ctx, input_dir: Optional[Path]): +def parse_all(ctx, input_dir: Path | None): """Parse all dockets in the archive directory.""" config: PacerConfig = ctx.obj["config"] ensure_dirs(config) @@ -1591,7 +1642,7 @@ def parse_file(ctx, docket_file: Path, output_json: bool): @click.argument("docket_file", type=click.Path(exists=True, path_type=Path)) @click.option("--verbose", "-v", is_flag=True, help="Show all entries (default: key entries only)") @click.option("--output", "-o", type=click.Path(path_type=Path), help="Save to file") -def parse_text(docket_file: Path, verbose: bool, output: Optional[Path]): +def parse_text(docket_file: Path, verbose: bool, output: Path | None): """Extract plain text from docket HTML. Outputs clean, token-efficient text for analysis. @@ -1630,8 +1681,8 @@ def search_dockets( ctx, require: tuple, exclude: tuple, - within: Optional[int], - output: Optional[Path], + within: int | None, + output: Path | None, individual: bool, ): """Search parsed docket entries. @@ -2067,7 +2118,11 @@ def pcl_cases( response = client.search_cases(criteria, page=page) all_results = response.content page_info = response.page_info - total_fee = float(response.receipt.search_fee) if response.receipt and response.receipt.search_fee else 0.0 + total_fee = ( + float(response.receipt.search_fee) + if response.receipt and response.receipt.search_fee + else 0.0 + ) # Record actual spend from the receipt so caps converge on real cost. if total_fee > 0: @@ -2098,7 +2153,17 @@ def pcl_cases( elif output_csv: import csv as csv_module import io - fieldnames = ["court_id", "case_number_full", "case_type", "case_title", "date_filed", "effective_date_closed", "jurisdiction_type", "nature_of_suit", "case_link"] + fieldnames = [ + "court_id", + "case_number_full", + "case_type", + "case_title", + "date_filed", + "effective_date_closed", + "jurisdiction_type", + "nature_of_suit", + "case_link", + ] if output: with open(output, "w", newline="") as f: @@ -2142,7 +2207,10 @@ def pcl_cases( f"({page_info.total_elements} total cases)[/]" ) if len(all_results) > 50: - console.print(f"[dim]Showing first 50 of {len(all_results)}. Use --json or --csv for full output.[/]") + console.print( + f"[dim]Showing first 50 of {len(all_results)}. " + "Use --json or --csv for full output.[/]" + ) # Interactive selection if interactive and all_results: @@ -2202,7 +2270,9 @@ def _handle_case_action(ctx, config: PacerConfig, case, action: str): ctx.invoke(view_case, case=str(docket_path)) else: err_console.print("[yellow]Docket not downloaded yet.[/]") - err_console.print(f"[dim]Download with:[/] pacer download docket \"{case_number}\" {court}") + err_console.print( + f'[dim]Download with:[/] pacer download docket "{case_number}" {court}' + ) elif action == "s": # Set as context @@ -2383,7 +2453,11 @@ def pcl_parties( response = client.search_parties(criteria, page=page) all_results = response.content page_info = response.page_info - total_fee = float(response.receipt.search_fee) if response.receipt and response.receipt.search_fee else 0.0 + total_fee = ( + float(response.receipt.search_fee) + if response.receipt and response.receipt.search_fee + else 0.0 + ) # Record actual spend from the receipt so caps converge on real cost. if total_fee > 0: @@ -2412,7 +2486,16 @@ def pcl_parties( elif output_csv: import csv as csv_module import io - fieldnames = ["last_name", "first_name", "middle_name", "party_role", "court_id", "case_number_full", "case_title", "date_filed"] + fieldnames = [ + "last_name", + "first_name", + "middle_name", + "party_role", + "court_id", + "case_number_full", + "case_title", + "date_filed", + ] if output: with open(output, "w", newline="") as f: @@ -2456,7 +2539,10 @@ def pcl_parties( f"({page_info.total_elements} total parties)[/]" ) if len(all_results) > 50: - console.print(f"[dim]Showing first 50 of {len(all_results)}. Use --json or --csv for full output.[/]") + console.print( + f"[dim]Showing first 50 of {len(all_results)}. " + "Use --json or --csv for full output.[/]" + ) except PCLError as e: err_console.print(f"[red]Error:[/] {e}") @@ -2487,7 +2573,13 @@ def pcl_batch(): @pcl_batch.command("list") -@click.option("--type", "search_type", type=click.Choice(["cases", "parties"]), default="cases", help="Search type") +@click.option( + "--type", + "search_type", + type=click.Choice(["cases", "parties"]), + default="cases", + help="Search type", +) @click.pass_context def batch_list(ctx, search_type): """List all batch jobs.""" @@ -2531,7 +2623,13 @@ def batch_list(ctx, search_type): @pcl_batch.command("status") @click.argument("report_id", type=int) -@click.option("--type", "search_type", type=click.Choice(["cases", "parties"]), default="cases", help="Search type") +@click.option( + "--type", + "search_type", + type=click.Choice(["cases", "parties"]), + default="cases", + help="Search type", +) @click.pass_context def batch_status(ctx, report_id, search_type): """Check status of a batch job.""" @@ -2567,8 +2665,20 @@ def batch_status(ctx, report_id, search_type): @pcl_batch.command("download") @click.argument("report_id", type=int) -@click.option("--type", "search_type", type=click.Choice(["cases", "parties"]), default="cases", help="Search type") -@click.option("--output", "-o", type=click.Path(path_type=Path), required=True, help="Output file (JSON)") +@click.option( + "--type", + "search_type", + type=click.Choice(["cases", "parties"]), + default="cases", + help="Search type", +) +@click.option( + "--output", + "-o", + type=click.Path(path_type=Path), + required=True, + help="Output file (JSON)", +) @click.pass_context def batch_download(ctx, report_id, search_type, output): """Download results from a completed batch job.""" @@ -2604,7 +2714,13 @@ def batch_download(ctx, report_id, search_type, output): @pcl_batch.command("delete") @click.argument("report_id", type=int) -@click.option("--type", "search_type", type=click.Choice(["cases", "parties"]), default="cases", help="Search type") +@click.option( + "--type", + "search_type", + type=click.Choice(["cases", "parties"]), + default="cases", + help="Search type", +) @click.pass_context def batch_delete(ctx, report_id, search_type): """Delete a batch job and its results.""" diff --git a/src/pacer_cli/config.py b/src/pacer_cli/config.py index 461bf8a..a7dd1a2 100644 --- a/src/pacer_cli/config.py +++ b/src/pacer_cli/config.py @@ -4,7 +4,7 @@ import os from datetime import datetime, timezone from pathlib import Path -from typing import Literal, Optional +from typing import Literal from pydantic import BaseModel, SecretStr, field_validator from pydantic_settings import BaseSettings, SettingsConfigDict @@ -38,11 +38,11 @@ class PacerConfig(BaseSettings): env_file_encoding="utf-8", ) - username: Optional[str] = None - password: Optional[SecretStr] = None - totp_secret: Optional[SecretStr] = None # Base32-encoded TOTP secret for MFA (production) - qa_totp_secret: Optional[SecretStr] = None # Base32-encoded TOTP secret for QA environment - client_code: Optional[str] = None # Optional client code for billing + username: str | None = None + password: SecretStr | None = None + totp_secret: SecretStr | None = None # Base32-encoded TOTP secret for MFA (production) + qa_totp_secret: SecretStr | None = None # Base32-encoded TOTP secret for QA environment + client_code: str | None = None # Optional client code for billing use_qa: bool = False # Use QA environment instead of production # Legacy paths (deprecated, kept for backward compatibility) @@ -98,7 +98,7 @@ def has_mfa(self) -> bool: return self.active_totp_secret is not None @property - def active_totp_secret(self) -> Optional[SecretStr]: + def active_totp_secret(self) -> SecretStr | None: """Get TOTP secret for current environment (QA or production). Falls back to totp_secret if qa_totp_secret is not set in QA mode. @@ -138,10 +138,10 @@ def get_documents_dir(self, court: str, case_number: str) -> Path: class ContextConfig(BaseModel): """Active working context for CLI commands.""" - court: Optional[str] = None - case_number: Optional[str] = None - case_path: Optional[Path] = None - updated_at: Optional[str] = None + court: str | None = None + case_number: str | None = None + case_path: Path | None = None + updated_at: str | None = None @classmethod def load(cls) -> "ContextConfig": @@ -257,9 +257,9 @@ def apply_policy_csv(cfg: PacerConfig) -> PacerConfig: def save_credentials( username: str, password: str, - totp_secret: Optional[str] = None, - client_code: Optional[str] = None, - vault_passphrase: Optional[str] = None, + totp_secret: str | None = None, + client_code: str | None = None, + vault_passphrase: str | None = None, ) -> Path: """Save PACER credentials to encrypted vault. @@ -274,15 +274,17 @@ def save_credentials( Path to vault file """ if vault_passphrase: - return _save_credentials_vault(username, password, totp_secret, client_code, vault_passphrase) + return _save_credentials_vault( + username, password, totp_secret, client_code, vault_passphrase + ) return _save_credentials_legacy(username, password, totp_secret, client_code) def _save_credentials_legacy( username: str, password: str, - totp_secret: Optional[str] = None, - client_code: Optional[str] = None, + totp_secret: str | None = None, + client_code: str | None = None, ) -> Path: """Save credentials to plaintext config.env (legacy mode).""" CONFIG_DIR.mkdir(parents=True, exist_ok=True) @@ -302,8 +304,8 @@ def _save_credentials_legacy( def _save_credentials_vault( username: str, password: str, - totp_secret: Optional[str] = None, - client_code: Optional[str] = None, + totp_secret: str | None = None, + client_code: str | None = None, passphrase: str = "", ) -> Path: """Save credentials to encrypted vault.""" @@ -401,7 +403,7 @@ def ensure_dirs(config: PacerConfig) -> None: config.parsed_dockets.mkdir(parents=True, exist_ok=True) -def check_legacy_archive() -> Optional[Path]: +def check_legacy_archive() -> Path | None: """Check for old-style flat archive that could be migrated. Returns: diff --git a/src/pacer_cli/courts.py b/src/pacer_cli/courts.py index 20f1844..7d4f305 100644 --- a/src/pacer_cli/courts.py +++ b/src/pacer_cli/courts.py @@ -10,7 +10,7 @@ import re from functools import lru_cache from pathlib import Path -from typing import Any, Optional +from typing import Any @lru_cache(maxsize=1) @@ -23,7 +23,7 @@ def _load_court_data() -> list[dict[str, Any]]: @lru_cache(maxsize=256) -def get_court_by_ecf_domain(ecf_domain: str) -> Optional[dict[str, Any]]: +def get_court_by_ecf_domain(ecf_domain: str) -> dict[str, Any] | None: """Look up court info by ECF domain name. Args: @@ -46,7 +46,7 @@ def get_court_by_ecf_domain(ecf_domain: str) -> Optional[dict[str, Any]]: @lru_cache(maxsize=256) -def get_court_by_id(court_id: str) -> Optional[dict[str, Any]]: +def get_court_by_id(court_id: str) -> dict[str, Any] | None: """Look up court info by court ID. Args: @@ -64,7 +64,7 @@ def get_court_by_id(court_id: str) -> Optional[dict[str, Any]]: return None -def get_cso_court_id(ecf_domain: str) -> Optional[str]: +def get_cso_court_id(ecf_domain: str) -> str | None: """Get CSO court ID from ECF domain. Args: @@ -83,7 +83,7 @@ def get_cso_court_id(ecf_domain: str) -> Optional[str]: return court.get("court_id") if court else None -def get_ecf_url(court_id: str) -> Optional[str]: +def get_ecf_url(court_id: str) -> str | None: """Get ECF login URL from court ID. Args: @@ -100,7 +100,7 @@ def get_ecf_url(court_id: str) -> Optional[str]: return court.get("login_url") if court else None -def get_ecf_domain_from_url(url: str) -> Optional[str]: +def get_ecf_domain_from_url(url: str) -> str | None: """Extract ECF domain from a URL. Args: @@ -117,7 +117,7 @@ def get_ecf_domain_from_url(url: str) -> Optional[str]: return match.group(1) if match else None -def get_court_name(court_id: str) -> Optional[str]: +def get_court_name(court_id: str) -> str | None: """Get human-readable court name. Args: @@ -130,7 +130,7 @@ def get_court_name(court_id: str) -> Optional[str]: return court.get("court_name") or court.get("title") if court else None -def get_court_type(court_id: str) -> Optional[str]: +def get_court_type(court_id: str) -> str | None: """Get court type (District, Bankruptcy, Appeals). Args: @@ -143,7 +143,7 @@ def get_court_type(court_id: str) -> Optional[str]: return court.get("type") if court else None -def list_courts(court_type: Optional[str] = None) -> list[dict[str, str]]: +def list_courts(court_type: str | None = None) -> list[dict[str, str]]: """List all courts, optionally filtered by type. Args: @@ -167,7 +167,7 @@ def list_courts(court_type: Optional[str] = None) -> list[dict[str, str]]: return result -def normalize_court_id(court_id: str) -> Optional[str]: +def normalize_court_id(court_id: str) -> str | None: """Normalize various court ID formats to CSO format. Handles: @@ -215,7 +215,7 @@ def normalize_court_id(court_id: str) -> Optional[str]: # reach). -def _courts_csv_path() -> "Path": +def _courts_csv_path() -> Path: # Imported lazily to avoid a courts <-> config import cycle at module load. from .config import PACER_ROOT @@ -255,7 +255,7 @@ def read_courts_scope() -> dict[str, bool]: return scope -def write_courts_scope(scope: dict[str, bool]) -> "Path": +def write_courts_scope(scope: dict[str, bool]) -> Path: """Write {court_id: enabled} to courts.csv (sorted), creating dirs as needed.""" path = _courts_csv_path() path.parent.mkdir(parents=True, exist_ok=True) @@ -269,7 +269,7 @@ def write_courts_scope(scope: dict[str, bool]) -> "Path": return path -def enabled_court_ids() -> Optional[list[str]]: +def enabled_court_ids() -> list[str] | None: """Enabled court IDs for scoping a search, or None for no scope (nationwide). Raw view used by ``pacer courts status``: None when courts.csv is absent or @@ -290,7 +290,7 @@ def enabled_court_ids() -> Optional[list[str]]: return enabled -def resolve_court_scope(explicit_courts) -> Optional[list[str]]: +def resolve_court_scope(explicit_courts) -> list[str] | None: """The single open/off switch for scoping a billable search. One source of truth so callers never re-implement the rule (which is how an diff --git a/src/pacer_cli/docket_types.py b/src/pacer_cli/docket_types.py index 67a5848..7690bc4 100644 --- a/src/pacer_cli/docket_types.py +++ b/src/pacer_cli/docket_types.py @@ -7,9 +7,7 @@ from __future__ import annotations import json -from dataclasses import dataclass, field, asdict -from typing import Optional - +from dataclasses import asdict, dataclass, field # Key filing terms for filtering significant entries KEY_TERMS = frozenset({ @@ -24,9 +22,9 @@ class Attorney: """Attorney information.""" name: str - firm: Optional[str] = None - email: Optional[str] = None - phone: Optional[str] = None + firm: str | None = None + email: str | None = None + phone: str | None = None pro_hac_vice: bool = False @@ -46,7 +44,9 @@ def to_compact(self) -> str: atty_names = ', '.join(a.name for a in self.attorneys[:2]) if len(self.attorneys) > 2: atty_names += f" +{len(self.attorneys) - 2}" - return f"{role_abbrev}: {self.name} | Atty: {atty_names}" if atty_names else f"{role_abbrev}: {self.name}" + if atty_names: + return f"{role_abbrev}: {self.name} | Atty: {atty_names}" + return f"{role_abbrev}: {self.name}" @dataclass @@ -54,8 +54,8 @@ class DocketEntry: """Single docket entry.""" seq: int date: str # ISO format YYYY-MM-DD - doc_num: Optional[str] = None - doc_url: Optional[str] = None + doc_num: str | None = None + doc_url: str | None = None text: str = "" has_attachments: bool = False attachment_count: int = 0 @@ -80,16 +80,16 @@ class DocketMeta: case_number: str # e.g., "1:18-cv-08434-VEC-SLC" case_title: str date_filed: str # ISO format - date_closed: Optional[str] = None + date_closed: str | None = None judge: str = "" - magistrate: Optional[str] = None + magistrate: str | None = None nature_of_suit: str = "" # Code like "442" nos_description: str = "" # "Civil Rights: Jobs" cause: str = "" jurisdiction: str = "" - jury_demand: Optional[str] = None - demand: Optional[str] = None - lead_case: Optional[str] = None + jury_demand: str | None = None + demand: str | None = None + lead_case: str | None = None member_cases: list[str] = field(default_factory=list) flags: list[str] = field(default_factory=list) # CLOSED, MDL, etc. @@ -100,7 +100,7 @@ class ParsedDocket: meta: DocketMeta entries: list[DocketEntry] parties: list[Party] = field(default_factory=list) - download_meta: Optional[dict] = None + download_meta: dict | None = None def key_entries(self, limit: int = 20) -> list[DocketEntry]: """Return most significant docket entries.""" @@ -165,13 +165,20 @@ def to_markdown(self, verbose: bool = False) -> str: if self.meta.judge: lines.append(f"**Judge:** {self.meta.judge} ") if self.meta.nature_of_suit: - lines.append(f"**Nature of Suit:** {self.meta.nature_of_suit} - {self.meta.nos_description} ") + lines.append( + f"**Nature of Suit:** {self.meta.nature_of_suit} - " + f"{self.meta.nos_description} " + ) # Parties if self.parties: lines.extend(["", "## Parties", ""]) for party in self.parties: - atty_list = ", ".join(a.name for a in party.attorneys) if party.attorneys else "(none)" + atty_list = ( + ", ".join(a.name for a in party.attorneys) + if party.attorneys + else "(none)" + ) lines.append(f"- **{party.role}:** {party.name}") if party.pro_se: lines.append(" - *Pro Se*") diff --git a/src/pacer_cli/downloader.py b/src/pacer_cli/downloader.py index eb9f09d..e27cca6 100644 --- a/src/pacer_cli/downloader.py +++ b/src/pacer_cli/downloader.py @@ -9,7 +9,6 @@ import time from dataclasses import dataclass from pathlib import Path -from typing import Optional from urllib.parse import urljoin, urlparse import requests @@ -24,9 +23,9 @@ class DownloadResult: """Result of a docket download attempt.""" success: bool - filepath: Optional[Path] = None - docs_filepath: Optional[Path] = None # Path to docs.json manifest - error: Optional[str] = None + filepath: Path | None = None + docs_filepath: Path | None = None # Path to docs.json manifest + error: str | None = None pages: int = 0 cost: float = 0.0 @@ -90,7 +89,7 @@ def extract_document_metadata( } -def load_cached_documents(case_dir: Path) -> Optional[dict]: +def load_cached_documents(case_dir: Path) -> dict | None: """Load cached document metadata for a case. Args: @@ -105,12 +104,12 @@ def load_cached_documents(case_dir: Path) -> Optional[dict]: if docs_path.exists(): try: return json.loads(docs_path.read_text(encoding="utf-8")) - except (json.JSONDecodeError, IOError): + except (OSError, json.JSONDecodeError): pass # unreadable docs.json cache -> treat as no cache return None -def get_document_by_number(case_dir: Path, doc_number: str) -> Optional[dict]: +def get_document_by_number(case_dir: Path, doc_number: str) -> dict | None: """Get document info by number from cached metadata. Args: @@ -135,7 +134,7 @@ def __init__(self, config: PacerConfig, verbose: bool = False): self.config = config self.verbose = verbose self.session = requests.Session() - self.token: Optional[str] = None + self.token: str | None = None def _log(self, msg: str): """Print trace message if verbose mode enabled.""" @@ -184,7 +183,10 @@ def _cso_login(self, court_id: str, app_url: str) -> bool: # Extract the ViewState (JSF CSRF token) - both javax and jakarta variants viewstate = None import re - vs_match = re.search(r'name="(?:javax|jakarta)\.faces\.ViewState"[^>]*value="([^"]*)"', resp.text) + vs_match = re.search( + r'name="(?:javax|jakarta)\.faces\.ViewState"[^>]*value="([^"]*)"', + resp.text, + ) if vs_match: viewstate = vs_match.group(1) self._log(f"Found ViewState: {viewstate[:50]}...") @@ -223,7 +225,10 @@ def _cso_login(self, court_id: str, app_url: str) -> bool: self._log("MFA step required, submitting OTP via PrimeFaces AJAX...") # Get new ViewState for MFA form - vs_match = re.search(r'name="(?:javax|jakarta)\.faces\.ViewState"[^>]*value="([^"]*)"', resp.text) + vs_match = re.search( + r'name="(?:javax|jakarta)\.faces\.ViewState"[^>]*value="([^"]*)"', + resp.text, + ) mfa_viewstate = vs_match.group(1) if vs_match else "" from .auth import generate_totp @@ -316,7 +321,7 @@ def _get_ecf_base_url(self, court_id: str) -> str: # District courts typically use format like nysd, cacd return f"https://ecf.{court_abbrev}.uscourts.gov" - def _get_case_id_from_link(self, case_link: str) -> Optional[str]: + def _get_case_id_from_link(self, case_link: str) -> str | None: """Extract case ID from a PCL case link URL. Args: @@ -332,7 +337,7 @@ def download_docket_by_link( self, case_link: str, output_dir: Path, - filename: Optional[str] = None, + filename: str | None = None, ) -> DownloadResult: """Download docket using a PCL case link URL. @@ -413,7 +418,10 @@ def download_docket_by_link( # Retry the request after login resp = self.session.get(case_link, headers=headers, timeout=30) - self._log(f"After CSO login, response: {resp.status_code}, length: {len(resp.text)}") + self._log( + f"After CSO login, response: {resp.status_code}, " + f"length: {len(resp.text)}" + ) # Check again for JS redirect (login may have failed) if "location.assign" in resp.text and "csologin" in resp.text: @@ -456,7 +464,9 @@ def download_docket_by_link( # Extract the form action URL (contains session token) import re - form_action_match = re.search(r']*action="([^"]+)"', resp.text, re.IGNORECASE) + form_action_match = re.search( + r']*action="([^"]+)"', resp.text, re.IGNORECASE + ) if form_action_match: form_action = form_action_match.group(1) # Handle relative URLs @@ -469,7 +479,11 @@ def download_docket_by_link( self._log("No form action found, using docket URL") # Extract hidden form fields - hidden_fields = re.findall(r']*type="hidden"[^>]*name="([^"]*)"[^>]*value="([^"]*)"', resp.text, re.IGNORECASE) + hidden_fields = re.findall( + r']*type="hidden"[^>]*name="([^"]*)"[^>]*value="([^"]*)"', + resp.text, + re.IGNORECASE, + ) # Submit form with default options (all entries) form_data = { @@ -532,7 +546,10 @@ def download_docket_by_link( ) docs_filepath = output_dir / "docs.json" docs_filepath.write_text(json.dumps(docs_meta, indent=2), encoding="utf-8") - self._log(f"Saved docs.json to: {docs_filepath} ({docs_meta.get('document_count', 0)} docs)") + self._log( + f"Saved docs.json to: {docs_filepath} " + f"({docs_meta.get('document_count', 0)} docs)" + ) except Exception as e: self._log(f"Warning: Could not save docs.json: {e}") @@ -552,7 +569,7 @@ def download_docket_by_case_number( case_number: str, court_id: str, output_dir: Path, - filename: Optional[str] = None, + filename: str | None = None, ) -> DownloadResult: """Download docket by case number and court ID. @@ -617,7 +634,7 @@ def download_docket( court_id: str, output_dir: Path, verbose: bool = False, - filename: Optional[str] = None, + filename: str | None = None, ) -> DownloadResult: """Convenience function to download a docket. @@ -644,7 +661,7 @@ class DocumentDownloader: def __init__(self, config: PacerConfig, verbose: bool = False): self.config = config self.verbose = verbose - self._docket_dl: Optional[DocketDownloader] = None + self._docket_dl: DocketDownloader | None = None self.authenticated_courts: set[str] = set() def _log(self, msg: str): @@ -713,7 +730,7 @@ def download_document( self, doc_url: str, output_dir: Path, - filename: Optional[str] = None, + filename: str | None = None, ) -> DownloadResult: """Download a document from CM/ECF. @@ -738,7 +755,10 @@ def download_document( try: resp = self.session.get(doc_url, headers=headers, timeout=60, allow_redirects=True) - self._log(f"Response: {resp.status_code}, content-type: {resp.headers.get('content-type', 'unknown')}") + self._log( + f"Response: {resp.status_code}, " + f"content-type: {resp.headers.get('content-type', 'unknown')}" + ) # Check for login redirect early (before following any links) if self._is_login_redirect(resp): @@ -776,46 +796,79 @@ def download_document( if 'View Document' in resp.text and 'goDLS' in resp.text: self._log("PACER receipt page detected, extracting goDLS params...") - # Extract goDLS parameters: goDLS(path, caseid, de_seq, got_receipt, pdf_hdr, pdf_toggle, magic, hdr, psf) + # Extract goDLS parameters: goDLS(path, caseid, de_seq, + # got_receipt, pdf_hdr, pdf_toggle, magic, hdr, psf) godls_match = re.search( r"goDLS\('([^']+)','([^']+)','([^']+)','([^']*)','([^']*)','([^']*)','([^']*)','([^']*)','([^']*)'\)", resp.text ) if godls_match: - path, caseid, de_seq, got_receipt, pdf_hdr, pdf_toggle, magic, hdr, psf = godls_match.groups() + ( + path, + caseid, + de_seq, + got_receipt, + pdf_hdr, + pdf_toggle, + magic, + hdr, + psf, + ) = godls_match.groups() # Build POST data from goDLS params form_data = {} - if caseid: form_data['caseid'] = caseid - if de_seq: form_data['de_seq_num'] = de_seq - if got_receipt: form_data['got_receipt'] = got_receipt - if pdf_hdr: form_data['pdf_header'] = pdf_hdr - if pdf_toggle: form_data['pdf_toggle_possible'] = pdf_toggle - if magic: form_data['magic_num'] = magic - if hdr: form_data['hdr'] = hdr - if psf: form_data['psf_report'] = psf + if caseid: + form_data['caseid'] = caseid + if de_seq: + form_data['de_seq_num'] = de_seq + if got_receipt: + form_data['got_receipt'] = got_receipt + if pdf_hdr: + form_data['pdf_header'] = pdf_hdr + if pdf_toggle: + form_data['pdf_toggle_possible'] = pdf_toggle + if magic: + form_data['magic_num'] = magic + if hdr: + form_data['hdr'] = hdr + if psf: + form_data['psf_report'] = psf parsed = urlparse(doc_url) form_url = f"{parsed.scheme}://{parsed.netloc}{path}" self._log(f"POSTing to {form_url} with goDLS params: {form_data}") - resp = self.session.post(form_url, data=form_data, headers=headers, timeout=120) - self._log(f"goDLS POST response: {resp.status_code}, type: {resp.headers.get('content-type', 'unknown')}") + resp = self.session.post( + form_url, data=form_data, headers=headers, timeout=120 + ) + self._log( + f"goDLS POST response: {resp.status_code}, " + f"type: {resp.headers.get('content-type', 'unknown')}" + ) # PACER returns HTML with iframe containing PDF URL - if 'text/html' in resp.headers.get('content-type', '') and ']+src="([^"]+)"', resp.text) if iframe_match: pdf_path = iframe_match.group(1) pdf_url = f"{parsed.scheme}://{parsed.netloc}{pdf_path}" self._log(f"Following iframe to PDF: {pdf_url}") resp = self.session.get(pdf_url, headers=headers, timeout=120) - self._log(f"PDF response: {resp.status_code}, type: {resp.headers.get('content-type', 'unknown')}, size: {len(resp.content)}") + self._log( + f"PDF response: {resp.status_code}, " + f"type: {resp.headers.get('content-type', 'unknown')}, " + f"size: {len(resp.content)}" + ) # Fallback: check for other form types elif '