From 4fc1b77cccc8b7f0a53a93083ed039ad5162aa57 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Jun 2026 04:02:48 +0000 Subject: [PATCH 1/4] Ruff cleanup (1/2): apply 299 safe auto-fixes ruff check --fix across src/: Optional[X]->X|None (UP045), List/Tuple->list/ tuple (UP006), import sorting (I001), pyupgrade (UP024/UP035/UP037), and whitespace. No behavior change; 302 passed, 9 skipped. --- src/pacer_cli/auth.py | 9 +- src/pacer_cli/cli.py | 118 +++++++++---------- src/pacer_cli/config.py | 38 +++---- src/pacer_cli/courts.py | 28 ++--- src/pacer_cli/docket_types.py | 26 ++--- src/pacer_cli/downloader.py | 27 +++-- src/pacer_cli/errors.py | 13 +-- src/pacer_cli/mcp_server.py | 26 ++--- src/pacer_cli/models.py | 206 +++++++++++++++++----------------- src/pacer_cli/parser.py | 2 +- src/pacer_cli/pcl.py | 9 +- src/pacer_cli/reader.py | 6 +- src/pacer_cli/security.py | 33 +++--- src/pacer_cli/selection.py | 13 +-- src/pacer_cli/vault.py | 9 +- 15 files changed, 279 insertions(+), 284 deletions(-) diff --git a/src/pacer_cli/auth.py b/src/pacer_cli/auth.py index 7583805..d93cda3 100644 --- a/src/pacer_cli/auth.py +++ b/src/pacer_cli/auth.py @@ -9,7 +9,6 @@ """ from dataclasses import dataclass -from typing import Optional import requests @@ -21,8 +20,8 @@ class AuthResult: """Result of PACER authentication attempt.""" success: bool - token: Optional[str] = None # nextGenCSO 128-byte token - error: Optional[str] = None + token: str | None = None # nextGenCSO 128-byte token + error: str | None = None login_result: str = "" @@ -41,7 +40,7 @@ def generate_totp(secret: str) -> str: def authenticate( config: PacerConfig, - otp_code: Optional[str] = None, + otp_code: str | None = None, ) -> AuthResult: """Authenticate with PACER and get session token. @@ -152,7 +151,7 @@ def logout(config: PacerConfig, token: str) -> bool: return False -def test_credentials(config: PacerConfig, otp_code: Optional[str] = None) -> AuthResult: +def test_credentials(config: PacerConfig, otp_code: str | None = None) -> AuthResult: """Test PACER credentials without keeping the session. Authenticates and immediately logs out to verify credentials work. diff --git a/src/pacer_cli/cli.py b/src/pacer_cli/cli.py index 728d603..5526504 100644 --- a/src/pacer_cli/cli.py +++ b/src/pacer_cli/cli.py @@ -6,12 +6,11 @@ import re import sys from pathlib import Path -from typing import Optional import click from rich.console import Console from rich.panel import Panel -from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn +from rich.progress import BarColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn from rich.table import Table from .config import ( @@ -54,7 +53,7 @@ "doc": ["download", "document"], "grep": ["search"], "find": ["search"], - + # Alternative PCL shortcuts "cases": ["pcl", "cases"], "parties": ["pcl", "parties"], @@ -67,21 +66,21 @@ class AliasGroup(click.Group): Allows users to use shorter command names that map to full paths. Example: 'pacer docket' -> 'pacer download docket' """ - - def get_command(self, ctx: click.Context, cmd_name: str) -> Optional[click.Command]: + + def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None: # Try exact match first rv = click.Group.get_command(self, ctx, cmd_name) if rv is not None: return rv - + # Check if it's an alias if cmd_name in COMMAND_ALIASES: alias_path = COMMAND_ALIASES[cmd_name] - + # For single-element paths, just look up directly if len(alias_path) == 1: return click.Group.get_command(self, ctx, alias_path[0]) - + # For multi-part paths (e.g., ["download", "docket"]), # navigate through subgroups current_group = self @@ -90,27 +89,27 @@ def get_command(self, ctx: click.Context, cmd_name: str) -> Optional[click.Comma if sub_cmd is None or not isinstance(sub_cmd, click.Group): return None current_group = sub_cmd - + return click.Group.get_command(current_group, ctx, alias_path[-1]) - + return None - + def resolve_command(self, ctx: click.Context, args: list) -> tuple: """Resolve command, handling multi-part aliases.""" cmd_name = args[0] if args else None - + if cmd_name and cmd_name in COMMAND_ALIASES: # Get the actual command cmd = self.get_command(ctx, cmd_name) if cmd: return cmd_name, cmd, args[1:] - + return super().resolve_command(ctx, args) - + def format_commands(self, ctx: click.Context, formatter: click.HelpFormatter) -> None: """Add aliases section to help output.""" super().format_commands(ctx, formatter) - + # Show aliases in help with formatter.section("Aliases"): alias_rows = [] @@ -129,7 +128,7 @@ def confirm_cost( ctx: click.Context, operation: str, estimated_cost: float, - details: Optional[str] = None, + details: str | None = None, ) -> bool: """Prompt user to confirm a billable operation. @@ -189,8 +188,8 @@ def enforce_spend( operation: str, estimated_cost: float, *, - client_code: Optional[str] = None, - details: Optional[str] = None, + client_code: str | None = None, + details: str | None = None, ) -> bool: """Preventive spend-cap gate. Replaces the raw confirm_cost call. @@ -347,7 +346,7 @@ def cli(ctx, yes: bool, agent: bool): ctx.obj["config"] = get_config() else: ctx.obj["config"] = get_config() - + # Check for legacy archive on first run (unless migrated) if not migration_marker_exists(): legacy_path = check_legacy_archive() @@ -374,7 +373,7 @@ def cli(ctx, yes: bool, agent: bool): @click.option("--dry-run", is_flag=True, help="Show what would be moved without moving") @click.option("--legacy-dir", type=click.Path(exists=True, path_type=Path), help="Legacy archive directory") @click.pass_context -def migrate_archive(ctx, dry_run: bool, legacy_dir: Optional[Path]): +def migrate_archive(ctx, dry_run: bool, legacy_dir: Path | None): """Migrate from legacy flat archive to new hierarchical structure. \b @@ -394,18 +393,18 @@ def migrate_archive(ctx, dry_run: bool, legacy_dir: Optional[Path]): pacer migrate --legacy-dir ./old # Migrate from custom location """ from .downloader import extract_document_metadata - + config: PacerConfig = ctx.obj["config"] - + # Find legacy files source_dir = legacy_dir or config.docket_archive if not source_dir.exists(): console.print(f"[yellow]No legacy archive found at:[/] {source_dir}") return - + # Pattern: {court}_{case}.html where case has + for : legacy_pattern = re.compile(r"^([a-z]{2,5}dce)_(.+)\.html$") - + files_to_migrate = [] for html_file in source_dir.glob("*.html"): match = legacy_pattern.match(html_file.name) @@ -421,38 +420,38 @@ def migrate_archive(ctx, dry_run: bool, legacy_dir: Optional[Path]): "case": case_normalized, "target_dir": config.archive_root / court_normalized / case_normalized, }) - + if not files_to_migrate: console.print("[yellow]No legacy docket files found to migrate.[/]") if not dry_run: mark_migration_complete() return - + console.print(f"[cyan]Found {len(files_to_migrate)} dockets to migrate[/]\n") - + if dry_run: table = Table(title="Migration Preview") table.add_column("Source", style="dim") table.add_column("Target", style="green") - + for item in files_to_migrate[:20]: table.add_row( item["source"].name, str(item["target_dir"] / "docket.html"), ) - + console.print(table) if len(files_to_migrate) > 20: console.print(f"[dim]... and {len(files_to_migrate) - 20} more[/]") - + console.print("\n[dim]Run without --dry-run to perform migration.[/]") return - + # Perform migration migrated = 0 skipped = 0 errors = 0 - + with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), @@ -461,26 +460,26 @@ def migrate_archive(ctx, dry_run: bool, legacy_dir: Optional[Path]): console=console, ) as progress: task = progress.add_task("Migrating dockets...", total=len(files_to_migrate)) - + for item in files_to_migrate: try: target_dir = item["target_dir"] target_html = target_dir / "docket.html" - + # Skip if already migrated if target_html.exists(): skipped += 1 progress.advance(task) continue - + # Create target directory target_dir.mkdir(parents=True, exist_ok=True) - + # Copy docket file source_html = item["source"] html_content = source_html.read_text(encoding="utf-8", errors="ignore") target_html.write_text(html_content, encoding="utf-8") - + # Generate docs.json from the docket try: import json @@ -500,27 +499,27 @@ def migrate_archive(ctx, dry_run: bool, legacy_dir: Optional[Path]): except Exception: # docs.json generation failed, but docket copy still succeeds pass - + migrated += 1 - + except Exception as e: err_console.print(f"[red]Error migrating {item['source'].name}:[/] {e}") errors += 1 - + progress.advance(task) - + console.print() console.print(f"[green]Migrated:[/] {migrated} dockets") if skipped: console.print(f"[dim]Skipped:[/] {skipped} (already exist)") if errors: console.print(f"[red]Errors:[/] {errors}") - + console.print(f"\n[dim]New archive location:[/] {config.archive_root}") - + # Mark migration complete mark_migration_complete() - + console.print() console.print(Panel( "[green]Migration complete![/]\n\n" @@ -699,7 +698,7 @@ def auth_init(ctx, qa: bool): help="Client billing code (optional)", ) @click.pass_context -def auth_login(ctx, username: Optional[str], password: Optional[str], totp_secret: Optional[str], client_code: Optional[str]): +def auth_login(ctx, username: str | None, password: str | None, totp_secret: str | None, client_code: str | None): """Store PACER credentials securely. \b @@ -772,9 +771,10 @@ def auth_code(ctx, watch: bool): err_console.print("Run [cyan]pacer auth setup-mfa[/] to configure MFA.") sys.exit(1) - from .auth import generate_totp import time + from .auth import generate_totp + secret = config.totp_secret.get_secret_value() if watch: @@ -841,7 +841,7 @@ def auth_setup_mfa(ctx, totp_secret: str): @auth.command("test") @click.option("--otp", "-o", default=None, help="Manual OTP code (if not using stored TOTP)") @click.pass_context -def auth_test(ctx, otp: Optional[str]): +def auth_test(ctx, otp: str | None): """Test authentication with PACER servers. Verifies credentials work and immediately logs out. @@ -1040,12 +1040,12 @@ def download(): @under_spend_lock def download_docket_cmd( ctx, - case_number: Optional[str], - court_id: Optional[str], + case_number: str | None, + court_id: str | None, verbose: bool, case_link: str, legacy: bool, - client_code: Optional[str], + client_code: str | None, ): """Download a single case docket. @@ -1164,7 +1164,7 @@ def download_docket_cmd( @matter_option @click.pass_context @under_spend_lock -def download_document(ctx, doc_number: str, doc_link: Optional[str], verbose: bool, client_code: Optional[str]): +def download_document(ctx, doc_number: str, doc_link: str | None, verbose: bool, client_code: str | None): """Download a single document from a case. \b @@ -1269,7 +1269,7 @@ def download_document(ctx, doc_number: str, doc_link: Optional[str], verbose: bo @click.argument("case", required=False) @click.option("--json", "output_json", is_flag=True, help="Output as JSON") @click.pass_context -def list_documents(ctx, case: Optional[str], output_json: bool): +def list_documents(ctx, case: str | None, output_json: bool): """List documents from a downloaded docket's cache. Uses the docs.json file created during docket download. @@ -1352,10 +1352,10 @@ def list_documents(ctx, case: Optional[str], output_json: bool): @click.pass_context def view_case( ctx, - case: Optional[str], + case: str | None, output_format: str, verbose: bool, - output: Optional[Path], + output: Path | None, ): """View a parsed docket from the local archive. @@ -1439,7 +1439,7 @@ def view_case( @matter_option @click.pass_context @under_spend_lock -def download_batch(ctx, csv_file: Path, column_court: str, column_case: str, verbose: bool, client_code: Optional[str]): +def download_batch(ctx, csv_file: Path, column_court: str, column_case: str, verbose: bool, client_code: str | None): """Download multiple dockets from a CSV file. The CSV should have columns for court ID and case number. @@ -1526,7 +1526,7 @@ def parse(): help="Input directory (default: local_docket_archive)", ) @click.pass_context -def parse_all(ctx, input_dir: Optional[Path]): +def parse_all(ctx, input_dir: Path | None): """Parse all dockets in the archive directory.""" config: PacerConfig = ctx.obj["config"] ensure_dirs(config) @@ -1591,7 +1591,7 @@ def parse_file(ctx, docket_file: Path, output_json: bool): @click.argument("docket_file", type=click.Path(exists=True, path_type=Path)) @click.option("--verbose", "-v", is_flag=True, help="Show all entries (default: key entries only)") @click.option("--output", "-o", type=click.Path(path_type=Path), help="Save to file") -def parse_text(docket_file: Path, verbose: bool, output: Optional[Path]): +def parse_text(docket_file: Path, verbose: bool, output: Path | None): """Extract plain text from docket HTML. Outputs clean, token-efficient text for analysis. @@ -1630,8 +1630,8 @@ def search_dockets( ctx, require: tuple, exclude: tuple, - within: Optional[int], - output: Optional[Path], + within: int | None, + output: Path | None, individual: bool, ): """Search parsed docket entries. diff --git a/src/pacer_cli/config.py b/src/pacer_cli/config.py index 461bf8a..f81ea6f 100644 --- a/src/pacer_cli/config.py +++ b/src/pacer_cli/config.py @@ -4,7 +4,7 @@ import os from datetime import datetime, timezone from pathlib import Path -from typing import Literal, Optional +from typing import Literal from pydantic import BaseModel, SecretStr, field_validator from pydantic_settings import BaseSettings, SettingsConfigDict @@ -38,11 +38,11 @@ class PacerConfig(BaseSettings): env_file_encoding="utf-8", ) - username: Optional[str] = None - password: Optional[SecretStr] = None - totp_secret: Optional[SecretStr] = None # Base32-encoded TOTP secret for MFA (production) - qa_totp_secret: Optional[SecretStr] = None # Base32-encoded TOTP secret for QA environment - client_code: Optional[str] = None # Optional client code for billing + username: str | None = None + password: SecretStr | None = None + totp_secret: SecretStr | None = None # Base32-encoded TOTP secret for MFA (production) + qa_totp_secret: SecretStr | None = None # Base32-encoded TOTP secret for QA environment + client_code: str | None = None # Optional client code for billing use_qa: bool = False # Use QA environment instead of production # Legacy paths (deprecated, kept for backward compatibility) @@ -98,7 +98,7 @@ def has_mfa(self) -> bool: return self.active_totp_secret is not None @property - def active_totp_secret(self) -> Optional[SecretStr]: + def active_totp_secret(self) -> SecretStr | None: """Get TOTP secret for current environment (QA or production). Falls back to totp_secret if qa_totp_secret is not set in QA mode. @@ -138,10 +138,10 @@ def get_documents_dir(self, court: str, case_number: str) -> Path: class ContextConfig(BaseModel): """Active working context for CLI commands.""" - court: Optional[str] = None - case_number: Optional[str] = None - case_path: Optional[Path] = None - updated_at: Optional[str] = None + court: str | None = None + case_number: str | None = None + case_path: Path | None = None + updated_at: str | None = None @classmethod def load(cls) -> "ContextConfig": @@ -257,9 +257,9 @@ def apply_policy_csv(cfg: PacerConfig) -> PacerConfig: def save_credentials( username: str, password: str, - totp_secret: Optional[str] = None, - client_code: Optional[str] = None, - vault_passphrase: Optional[str] = None, + totp_secret: str | None = None, + client_code: str | None = None, + vault_passphrase: str | None = None, ) -> Path: """Save PACER credentials to encrypted vault. @@ -281,8 +281,8 @@ def save_credentials( def _save_credentials_legacy( username: str, password: str, - totp_secret: Optional[str] = None, - client_code: Optional[str] = None, + totp_secret: str | None = None, + client_code: str | None = None, ) -> Path: """Save credentials to plaintext config.env (legacy mode).""" CONFIG_DIR.mkdir(parents=True, exist_ok=True) @@ -302,8 +302,8 @@ def _save_credentials_legacy( def _save_credentials_vault( username: str, password: str, - totp_secret: Optional[str] = None, - client_code: Optional[str] = None, + totp_secret: str | None = None, + client_code: str | None = None, passphrase: str = "", ) -> Path: """Save credentials to encrypted vault.""" @@ -401,7 +401,7 @@ def ensure_dirs(config: PacerConfig) -> None: config.parsed_dockets.mkdir(parents=True, exist_ok=True) -def check_legacy_archive() -> Optional[Path]: +def check_legacy_archive() -> Path | None: """Check for old-style flat archive that could be migrated. Returns: diff --git a/src/pacer_cli/courts.py b/src/pacer_cli/courts.py index 20f1844..7d4f305 100644 --- a/src/pacer_cli/courts.py +++ b/src/pacer_cli/courts.py @@ -10,7 +10,7 @@ import re from functools import lru_cache from pathlib import Path -from typing import Any, Optional +from typing import Any @lru_cache(maxsize=1) @@ -23,7 +23,7 @@ def _load_court_data() -> list[dict[str, Any]]: @lru_cache(maxsize=256) -def get_court_by_ecf_domain(ecf_domain: str) -> Optional[dict[str, Any]]: +def get_court_by_ecf_domain(ecf_domain: str) -> dict[str, Any] | None: """Look up court info by ECF domain name. Args: @@ -46,7 +46,7 @@ def get_court_by_ecf_domain(ecf_domain: str) -> Optional[dict[str, Any]]: @lru_cache(maxsize=256) -def get_court_by_id(court_id: str) -> Optional[dict[str, Any]]: +def get_court_by_id(court_id: str) -> dict[str, Any] | None: """Look up court info by court ID. Args: @@ -64,7 +64,7 @@ def get_court_by_id(court_id: str) -> Optional[dict[str, Any]]: return None -def get_cso_court_id(ecf_domain: str) -> Optional[str]: +def get_cso_court_id(ecf_domain: str) -> str | None: """Get CSO court ID from ECF domain. Args: @@ -83,7 +83,7 @@ def get_cso_court_id(ecf_domain: str) -> Optional[str]: return court.get("court_id") if court else None -def get_ecf_url(court_id: str) -> Optional[str]: +def get_ecf_url(court_id: str) -> str | None: """Get ECF login URL from court ID. Args: @@ -100,7 +100,7 @@ def get_ecf_url(court_id: str) -> Optional[str]: return court.get("login_url") if court else None -def get_ecf_domain_from_url(url: str) -> Optional[str]: +def get_ecf_domain_from_url(url: str) -> str | None: """Extract ECF domain from a URL. Args: @@ -117,7 +117,7 @@ def get_ecf_domain_from_url(url: str) -> Optional[str]: return match.group(1) if match else None -def get_court_name(court_id: str) -> Optional[str]: +def get_court_name(court_id: str) -> str | None: """Get human-readable court name. Args: @@ -130,7 +130,7 @@ def get_court_name(court_id: str) -> Optional[str]: return court.get("court_name") or court.get("title") if court else None -def get_court_type(court_id: str) -> Optional[str]: +def get_court_type(court_id: str) -> str | None: """Get court type (District, Bankruptcy, Appeals). Args: @@ -143,7 +143,7 @@ def get_court_type(court_id: str) -> Optional[str]: return court.get("type") if court else None -def list_courts(court_type: Optional[str] = None) -> list[dict[str, str]]: +def list_courts(court_type: str | None = None) -> list[dict[str, str]]: """List all courts, optionally filtered by type. Args: @@ -167,7 +167,7 @@ def list_courts(court_type: Optional[str] = None) -> list[dict[str, str]]: return result -def normalize_court_id(court_id: str) -> Optional[str]: +def normalize_court_id(court_id: str) -> str | None: """Normalize various court ID formats to CSO format. Handles: @@ -215,7 +215,7 @@ def normalize_court_id(court_id: str) -> Optional[str]: # reach). -def _courts_csv_path() -> "Path": +def _courts_csv_path() -> Path: # Imported lazily to avoid a courts <-> config import cycle at module load. from .config import PACER_ROOT @@ -255,7 +255,7 @@ def read_courts_scope() -> dict[str, bool]: return scope -def write_courts_scope(scope: dict[str, bool]) -> "Path": +def write_courts_scope(scope: dict[str, bool]) -> Path: """Write {court_id: enabled} to courts.csv (sorted), creating dirs as needed.""" path = _courts_csv_path() path.parent.mkdir(parents=True, exist_ok=True) @@ -269,7 +269,7 @@ def write_courts_scope(scope: dict[str, bool]) -> "Path": return path -def enabled_court_ids() -> Optional[list[str]]: +def enabled_court_ids() -> list[str] | None: """Enabled court IDs for scoping a search, or None for no scope (nationwide). Raw view used by ``pacer courts status``: None when courts.csv is absent or @@ -290,7 +290,7 @@ def enabled_court_ids() -> Optional[list[str]]: return enabled -def resolve_court_scope(explicit_courts) -> Optional[list[str]]: +def resolve_court_scope(explicit_courts) -> list[str] | None: """The single open/off switch for scoping a billable search. One source of truth so callers never re-implement the rule (which is how an diff --git a/src/pacer_cli/docket_types.py b/src/pacer_cli/docket_types.py index 67a5848..317a903 100644 --- a/src/pacer_cli/docket_types.py +++ b/src/pacer_cli/docket_types.py @@ -7,9 +7,7 @@ from __future__ import annotations import json -from dataclasses import dataclass, field, asdict -from typing import Optional - +from dataclasses import asdict, dataclass, field # Key filing terms for filtering significant entries KEY_TERMS = frozenset({ @@ -24,9 +22,9 @@ class Attorney: """Attorney information.""" name: str - firm: Optional[str] = None - email: Optional[str] = None - phone: Optional[str] = None + firm: str | None = None + email: str | None = None + phone: str | None = None pro_hac_vice: bool = False @@ -54,8 +52,8 @@ class DocketEntry: """Single docket entry.""" seq: int date: str # ISO format YYYY-MM-DD - doc_num: Optional[str] = None - doc_url: Optional[str] = None + doc_num: str | None = None + doc_url: str | None = None text: str = "" has_attachments: bool = False attachment_count: int = 0 @@ -80,16 +78,16 @@ class DocketMeta: case_number: str # e.g., "1:18-cv-08434-VEC-SLC" case_title: str date_filed: str # ISO format - date_closed: Optional[str] = None + date_closed: str | None = None judge: str = "" - magistrate: Optional[str] = None + magistrate: str | None = None nature_of_suit: str = "" # Code like "442" nos_description: str = "" # "Civil Rights: Jobs" cause: str = "" jurisdiction: str = "" - jury_demand: Optional[str] = None - demand: Optional[str] = None - lead_case: Optional[str] = None + jury_demand: str | None = None + demand: str | None = None + lead_case: str | None = None member_cases: list[str] = field(default_factory=list) flags: list[str] = field(default_factory=list) # CLOSED, MDL, etc. @@ -100,7 +98,7 @@ class ParsedDocket: meta: DocketMeta entries: list[DocketEntry] parties: list[Party] = field(default_factory=list) - download_meta: Optional[dict] = None + download_meta: dict | None = None def key_entries(self, limit: int = 20) -> list[DocketEntry]: """Return most significant docket entries.""" diff --git a/src/pacer_cli/downloader.py b/src/pacer_cli/downloader.py index eb9f09d..a7eb4f8 100644 --- a/src/pacer_cli/downloader.py +++ b/src/pacer_cli/downloader.py @@ -9,7 +9,6 @@ import time from dataclasses import dataclass from pathlib import Path -from typing import Optional from urllib.parse import urljoin, urlparse import requests @@ -24,9 +23,9 @@ class DownloadResult: """Result of a docket download attempt.""" success: bool - filepath: Optional[Path] = None - docs_filepath: Optional[Path] = None # Path to docs.json manifest - error: Optional[str] = None + filepath: Path | None = None + docs_filepath: Path | None = None # Path to docs.json manifest + error: str | None = None pages: int = 0 cost: float = 0.0 @@ -90,7 +89,7 @@ def extract_document_metadata( } -def load_cached_documents(case_dir: Path) -> Optional[dict]: +def load_cached_documents(case_dir: Path) -> dict | None: """Load cached document metadata for a case. Args: @@ -105,12 +104,12 @@ def load_cached_documents(case_dir: Path) -> Optional[dict]: if docs_path.exists(): try: return json.loads(docs_path.read_text(encoding="utf-8")) - except (json.JSONDecodeError, IOError): + except (OSError, json.JSONDecodeError): pass # unreadable docs.json cache -> treat as no cache return None -def get_document_by_number(case_dir: Path, doc_number: str) -> Optional[dict]: +def get_document_by_number(case_dir: Path, doc_number: str) -> dict | None: """Get document info by number from cached metadata. Args: @@ -135,7 +134,7 @@ def __init__(self, config: PacerConfig, verbose: bool = False): self.config = config self.verbose = verbose self.session = requests.Session() - self.token: Optional[str] = None + self.token: str | None = None def _log(self, msg: str): """Print trace message if verbose mode enabled.""" @@ -316,7 +315,7 @@ def _get_ecf_base_url(self, court_id: str) -> str: # District courts typically use format like nysd, cacd return f"https://ecf.{court_abbrev}.uscourts.gov" - def _get_case_id_from_link(self, case_link: str) -> Optional[str]: + def _get_case_id_from_link(self, case_link: str) -> str | None: """Extract case ID from a PCL case link URL. Args: @@ -332,7 +331,7 @@ def download_docket_by_link( self, case_link: str, output_dir: Path, - filename: Optional[str] = None, + filename: str | None = None, ) -> DownloadResult: """Download docket using a PCL case link URL. @@ -552,7 +551,7 @@ def download_docket_by_case_number( case_number: str, court_id: str, output_dir: Path, - filename: Optional[str] = None, + filename: str | None = None, ) -> DownloadResult: """Download docket by case number and court ID. @@ -617,7 +616,7 @@ def download_docket( court_id: str, output_dir: Path, verbose: bool = False, - filename: Optional[str] = None, + filename: str | None = None, ) -> DownloadResult: """Convenience function to download a docket. @@ -644,7 +643,7 @@ class DocumentDownloader: def __init__(self, config: PacerConfig, verbose: bool = False): self.config = config self.verbose = verbose - self._docket_dl: Optional[DocketDownloader] = None + self._docket_dl: DocketDownloader | None = None self.authenticated_courts: set[str] = set() def _log(self, msg: str): @@ -713,7 +712,7 @@ def download_document( self, doc_url: str, output_dir: Path, - filename: Optional[str] = None, + filename: str | None = None, ) -> DownloadResult: """Download a document from CM/ECF. diff --git a/src/pacer_cli/errors.py b/src/pacer_cli/errors.py index 0e47c4e..b6d96a5 100644 --- a/src/pacer_cli/errors.py +++ b/src/pacer_cli/errors.py @@ -1,7 +1,6 @@ """User-friendly error handling with actionable next steps.""" from dataclasses import dataclass -from typing import List, Optional, Tuple from rich.console import Console from rich.panel import Panel @@ -15,8 +14,8 @@ class ErrorContext: title: str message: str - suggestions: List[str] - docs_link: Optional[str] = None + suggestions: list[str] + docs_link: str | None = None # Error catalog @@ -171,8 +170,8 @@ class ErrorContext: def show_error( error_key: str, - detail: Optional[str] = None, - extra_suggestions: Optional[List[str]] = None, + detail: str | None = None, + extra_suggestions: list[str] | None = None, ): """Display a rich error panel with suggestions. @@ -207,7 +206,7 @@ def show_error( ) -def classify_pcl_error(error: Exception) -> Tuple[str, str]: +def classify_pcl_error(error: Exception) -> tuple[str, str]: """Classify PCL exceptions to error keys. Args: @@ -237,7 +236,7 @@ def classify_pcl_error(error: Exception) -> Tuple[str, str]: return "network_error", error_str -def classify_download_error(error_message: str) -> Tuple[str, str]: +def classify_download_error(error_message: str) -> tuple[str, str]: """Classify download error messages to error keys. Args: diff --git a/src/pacer_cli/mcp_server.py b/src/pacer_cli/mcp_server.py index 135f21a..d7dcfdd 100644 --- a/src/pacer_cli/mcp_server.py +++ b/src/pacer_cli/mcp_server.py @@ -18,7 +18,7 @@ from __future__ import annotations -from typing import Any, Optional +from typing import Any from .config import PacerConfig, PolicyError, apply_policy_csv, get_config from .courts import resolve_court_scope @@ -40,7 +40,7 @@ # --------------------------------------------------------------------------- -def _load_config(client_code: Optional[str] = None) -> PacerConfig: +def _load_config(client_code: str | None = None) -> PacerConfig: """Config with the policy.csv overlay applied (human-provisioned creds only).""" cfg = get_config() # env / config.env; never prompts, never unlocks the vault cfg = apply_policy_csv(cfg) # may raise ValueError on a fat-fingered CSV (fail-closed) @@ -107,7 +107,7 @@ def spend_status() -> dict[str, Any]: refusing. """ cfg = get_config() # base config; never raises - policy_error: Optional[str] = None + policy_error: str | None = None try: apply_policy_csv(cfg) except PolicyError as exc: @@ -127,10 +127,10 @@ def spend_status() -> dict[str, Any]: def search_cases( - case_number: Optional[str] = None, - title: Optional[str] = None, - court: Optional[list[str]] = None, - client_code: Optional[str] = None, + case_number: str | None = None, + title: str | None = None, + court: list[str] | None = None, + client_code: str | None = None, ) -> dict[str, Any]: """Search the PACER Case Locator for cases (billable; gated by the spend cap).""" from .models import CaseSearchCriteria @@ -157,10 +157,10 @@ def search_cases( def search_parties( - last_name: Optional[str] = None, - first_name: Optional[str] = None, - court: Optional[list[str]] = None, - client_code: Optional[str] = None, + last_name: str | None = None, + first_name: str | None = None, + court: list[str] | None = None, + client_code: str | None = None, ) -> dict[str, Any]: """Search the PACER Case Locator for parties (billable; gated by the spend cap).""" from .models import CaseSearchCriteria, PartySearchCriteria @@ -190,7 +190,7 @@ def search_parties( def get_docket( case_number: str, court_id: str, - client_code: Optional[str] = None, + client_code: str | None = None, ) -> dict[str, Any]: """Download a case docket (billable; gated). Returns the saved path + page count.""" from .downloader import DocketDownloader @@ -216,7 +216,7 @@ def get_docket( def get_document( doc_link: str, doc_number: str = "0", - client_code: Optional[str] = None, + client_code: str | None = None, ) -> dict[str, Any]: """Download a single document by CM/ECF link (billable; gated).""" from .downloader import DocumentDownloader diff --git a/src/pacer_cli/models.py b/src/pacer_cli/models.py index 4df6cca..d94a033 100644 --- a/src/pacer_cli/models.py +++ b/src/pacer_cli/models.py @@ -30,16 +30,16 @@ def to_api_dict(self) -> dict: class Receipt(BaseModel): """Billing receipt from a PCL search.""" - transaction_date: Optional[str] = Field(None, alias="transactionDate") + transaction_date: str | None = Field(None, alias="transactionDate") billable_pages: int = Field(0, alias="billablePages") - login_id: Optional[str] = Field(None, alias="loginId") - client_code: Optional[str] = Field(None, alias="clientCode") - firm_id: Optional[str] = Field(None, alias="firmId") - search: Optional[str] = None - description: Optional[str] = None - cso_id: Optional[int] = Field(None, alias="csoId") - report_id: Optional[str] = Field(None, alias="reportId") - search_fee: Optional[str] = Field(None, alias="searchFee") + login_id: str | None = Field(None, alias="loginId") + client_code: str | None = Field(None, alias="clientCode") + firm_id: str | None = Field(None, alias="firmId") + search: str | None = None + description: str | None = None + cso_id: int | None = Field(None, alias="csoId") + report_id: str | None = Field(None, alias="reportId") + search_fee: str | None = Field(None, alias="searchFee") @property def fee_cents(self) -> int: @@ -67,26 +67,26 @@ class PageInfo(BaseModel): class CourtCase(BaseModel): """Court case information (nested in party results or standalone).""" - court_id: Optional[str] = Field(None, alias="courtId") - case_id: Optional[int] = Field(None, alias="caseId") - case_year: Optional[int] = Field(None, alias="caseYear") - case_number: Optional[int] = Field(None, alias="caseNumber") - case_office: Optional[str] = Field(None, alias="caseOffice") - case_type: Optional[str] = Field(None, alias="caseType") - case_title: Optional[str] = Field(None, alias="caseTitle") - date_filed: Optional[str] = Field(None, alias="dateFiled") - date_termed: Optional[str] = Field(None, alias="dateTermed") - date_dismissed: Optional[str] = Field(None, alias="dateDismissed") - date_discharged: Optional[str] = Field(None, alias="dateDischarged") - effective_date_closed: Optional[str] = Field(None, alias="effectiveDateClosed") - nature_of_suit: Optional[str] = Field(None, alias="natureOfSuit") - bankruptcy_chapter: Optional[str] = Field(None, alias="bankruptcyChapter") - disposition_method: Optional[str] = Field(None, alias="dispositionMethod") - joint_bankruptcy_flag: Optional[str] = Field(None, alias="jointBankruptcyFlag") - jurisdiction_type: Optional[str] = Field(None, alias="jurisdictionType") - case_link: Optional[str] = Field(None, alias="caseLink") - case_number_full: Optional[str] = Field(None, alias="caseNumberFull") - jpml_number: Optional[int] = Field(None, alias="jpmlNumber") + court_id: str | None = Field(None, alias="courtId") + case_id: int | None = Field(None, alias="caseId") + case_year: int | None = Field(None, alias="caseYear") + case_number: int | None = Field(None, alias="caseNumber") + case_office: str | None = Field(None, alias="caseOffice") + case_type: str | None = Field(None, alias="caseType") + case_title: str | None = Field(None, alias="caseTitle") + date_filed: str | None = Field(None, alias="dateFiled") + date_termed: str | None = Field(None, alias="dateTermed") + date_dismissed: str | None = Field(None, alias="dateDismissed") + date_discharged: str | None = Field(None, alias="dateDischarged") + effective_date_closed: str | None = Field(None, alias="effectiveDateClosed") + nature_of_suit: str | None = Field(None, alias="natureOfSuit") + bankruptcy_chapter: str | None = Field(None, alias="bankruptcyChapter") + disposition_method: str | None = Field(None, alias="dispositionMethod") + joint_bankruptcy_flag: str | None = Field(None, alias="jointBankruptcyFlag") + jurisdiction_type: str | None = Field(None, alias="jurisdictionType") + case_link: str | None = Field(None, alias="caseLink") + case_number_full: str | None = Field(None, alias="caseNumberFull") + jpml_number: int | None = Field(None, alias="jpmlNumber") @property def status(self) -> str: @@ -109,30 +109,30 @@ class CaseResult(CourtCase): class PartyResult(BaseModel): """Party search result.""" - court_id: Optional[str] = Field(None, alias="courtId") - case_id: Optional[int] = Field(None, alias="caseId") - case_year: Optional[int] = Field(None, alias="caseYear") - case_number: Optional[int] = Field(None, alias="caseNumber") - last_name: Optional[str] = Field(None, alias="lastName") - first_name: Optional[str] = Field(None, alias="firstName") - middle_name: Optional[str] = Field(None, alias="middleName") - generation: Optional[str] = None - party_type: Optional[str] = Field(None, alias="partyType") - party_role: Optional[str] = Field(None, alias="partyRole") - jurisdiction_type: Optional[str] = Field(None, alias="jurisdictionType") - court_case: Optional[CourtCase] = Field(None, alias="courtCase") + court_id: str | None = Field(None, alias="courtId") + case_id: int | None = Field(None, alias="caseId") + case_year: int | None = Field(None, alias="caseYear") + case_number: int | None = Field(None, alias="caseNumber") + last_name: str | None = Field(None, alias="lastName") + first_name: str | None = Field(None, alias="firstName") + middle_name: str | None = Field(None, alias="middleName") + generation: str | None = None + party_type: str | None = Field(None, alias="partyType") + party_role: str | None = Field(None, alias="partyRole") + jurisdiction_type: str | None = Field(None, alias="jurisdictionType") + court_case: CourtCase | None = Field(None, alias="courtCase") # Denormalized case fields (also present at party level) - date_filed: Optional[str] = Field(None, alias="dateFiled") - effective_date_closed: Optional[str] = Field(None, alias="effectiveDateClosed") - date_dismissed: Optional[str] = Field(None, alias="dateDismissed") - date_discharged: Optional[str] = Field(None, alias="dateDischarged") - nature_of_suit: Optional[str] = Field(None, alias="natureOfSuit") - bankruptcy_chapter: Optional[str] = Field(None, alias="bankruptcyChapter") - case_office: Optional[str] = Field(None, alias="caseOffice") - case_type: Optional[str] = Field(None, alias="caseType") - case_title: Optional[str] = Field(None, alias="caseTitle") - case_number_full: Optional[str] = Field(None, alias="caseNumberFull") - disposition: Optional[str] = None + date_filed: str | None = Field(None, alias="dateFiled") + effective_date_closed: str | None = Field(None, alias="effectiveDateClosed") + date_dismissed: str | None = Field(None, alias="dateDismissed") + date_discharged: str | None = Field(None, alias="dateDischarged") + nature_of_suit: str | None = Field(None, alias="natureOfSuit") + bankruptcy_chapter: str | None = Field(None, alias="bankruptcyChapter") + case_office: str | None = Field(None, alias="caseOffice") + case_type: str | None = Field(None, alias="caseType") + case_title: str | None = Field(None, alias="caseTitle") + case_number_full: str | None = Field(None, alias="caseNumberFull") + disposition: str | None = None @property def full_name(self) -> str: @@ -152,19 +152,19 @@ def full_name(self) -> str: class CaseSearchResponse(BaseModel): """Response from case search API.""" - receipt: Optional[Receipt] = None - page_info: Optional[PageInfo] = Field(None, alias="pageInfo") + receipt: Receipt | None = None + page_info: PageInfo | None = Field(None, alias="pageInfo") content: list[CaseResult] = [] - master_case: Optional[Any] = Field(None, alias="masterCase") + master_case: Any | None = Field(None, alias="masterCase") class PartySearchResponse(BaseModel): """Response from party search API.""" - receipt: Optional[Receipt] = None - page_info: Optional[PageInfo] = Field(None, alias="pageInfo") + receipt: Receipt | None = None + page_info: PageInfo | None = Field(None, alias="pageInfo") content: list[PartyResult] = [] - master_case: Optional[Any] = Field(None, alias="masterCase") + master_case: Any | None = Field(None, alias="masterCase") # ============================================================================= @@ -178,29 +178,29 @@ class CaseSearchCriteria(SearchCriteriaMixin, BaseModel): model_config = ConfigDict(populate_by_name=True) # All court types - jurisdiction_type: Optional[str] = Field(None, alias="jurisdictionType") - case_id: Optional[int] = Field(None, alias="caseId") - case_number_full: Optional[str] = Field(None, alias="caseNumberFull") - case_title: Optional[str] = Field(None, alias="caseTitle") - case_office: Optional[str] = Field(None, alias="caseOffice") - case_number: Optional[str] = Field(None, alias="caseNumber") - case_type: Optional[list[str]] = Field(None, alias="caseType") - case_year: Optional[str] = Field(None, alias="caseYear") - court_id: Optional[list[str]] = Field(None, alias="courtId") - date_filed_from: Optional[str] = Field(None, alias="dateFiledFrom") - date_filed_to: Optional[str] = Field(None, alias="dateFiledTo") - effective_date_closed_from: Optional[str] = Field(None, alias="effectiveDateClosedFrom") - effective_date_closed_to: Optional[str] = Field(None, alias="effectiveDateClosedTo") + jurisdiction_type: str | None = Field(None, alias="jurisdictionType") + case_id: int | None = Field(None, alias="caseId") + case_number_full: str | None = Field(None, alias="caseNumberFull") + case_title: str | None = Field(None, alias="caseTitle") + case_office: str | None = Field(None, alias="caseOffice") + case_number: str | None = Field(None, alias="caseNumber") + case_type: list[str] | None = Field(None, alias="caseType") + case_year: str | None = Field(None, alias="caseYear") + court_id: list[str] | None = Field(None, alias="courtId") + date_filed_from: str | None = Field(None, alias="dateFiledFrom") + date_filed_to: str | None = Field(None, alias="dateFiledTo") + effective_date_closed_from: str | None = Field(None, alias="effectiveDateClosedFrom") + effective_date_closed_to: str | None = Field(None, alias="effectiveDateClosedTo") # Bankruptcy only - federal_bankruptcy_chapter: Optional[list[str]] = Field(None, alias="federalBankruptcyChapter") - date_dismissed_from: Optional[str] = Field(None, alias="dateDismissedFrom") - date_dismissed_to: Optional[str] = Field(None, alias="dateDismissedTo") - date_discharged_from: Optional[str] = Field(None, alias="dateDischargedFrom") - date_discharged_to: Optional[str] = Field(None, alias="dateDischargedTo") + federal_bankruptcy_chapter: list[str] | None = Field(None, alias="federalBankruptcyChapter") + date_dismissed_from: str | None = Field(None, alias="dateDismissedFrom") + date_dismissed_to: str | None = Field(None, alias="dateDismissedTo") + date_discharged_from: str | None = Field(None, alias="dateDischargedFrom") + date_discharged_to: str | None = Field(None, alias="dateDischargedTo") # Civil/Appellate only - nature_of_suit: Optional[list[str]] = Field(None, alias="natureOfSuit") + nature_of_suit: list[str] | None = Field(None, alias="natureOfSuit") # JPML only - jpml_number: Optional[int] = Field(None, alias="jpmlNumber") + jpml_number: int | None = Field(None, alias="jpmlNumber") # Nested party search within case search party: Optional["PartyInCaseSearch"] = None @@ -208,10 +208,10 @@ class CaseSearchCriteria(SearchCriteriaMixin, BaseModel): class PartyInCaseSearch(BaseModel): """Party criteria when nested in a case search.""" - last_name: Optional[str] = Field(None, alias="lastName") - first_name: Optional[str] = Field(None, alias="firstName") - middle_name: Optional[str] = Field(None, alias="middleName") - role: Optional[list[str]] = None + last_name: str | None = Field(None, alias="lastName") + first_name: str | None = Field(None, alias="firstName") + middle_name: str | None = Field(None, alias="middleName") + role: list[str] | None = None class PartySearchCriteria(SearchCriteriaMixin, BaseModel): @@ -220,19 +220,19 @@ class PartySearchCriteria(SearchCriteriaMixin, BaseModel): model_config = ConfigDict(populate_by_name=True) # Party fields - last_name: Optional[str] = Field(None, alias="lastName") - first_name: Optional[str] = Field(None, alias="firstName") - middle_name: Optional[str] = Field(None, alias="middleName") - generation: Optional[str] = None - exact_name_match: Optional[bool] = Field(None, alias="exactNameMatch") - ssn: Optional[str] = None # Bankruptcy only - party_type: Optional[str] = Field(None, alias="partyType") - role: Optional[list[str]] = None + last_name: str | None = Field(None, alias="lastName") + first_name: str | None = Field(None, alias="firstName") + middle_name: str | None = Field(None, alias="middleName") + generation: str | None = None + exact_name_match: bool | None = Field(None, alias="exactNameMatch") + ssn: str | None = None # Bankruptcy only + party_type: str | None = Field(None, alias="partyType") + role: list[str] | None = None # Case year range - case_year_from: Optional[int] = Field(None, alias="caseYearFrom") - case_year_to: Optional[int] = Field(None, alias="caseYearTo") + case_year_from: int | None = Field(None, alias="caseYearFrom") + case_year_to: int | None = Field(None, alias="caseYearTo") # Nested case criteria - court_case: Optional[CaseSearchCriteria] = Field(None, alias="courtCase") + court_case: CaseSearchCriteria | None = Field(None, alias="courtCase") # ============================================================================= @@ -245,14 +245,14 @@ class BatchJobInfo(BaseModel): report_id: int = Field(..., alias="reportId") status: str # WAITING, RUNNING, COMPLETED - start_time: Optional[str] = Field(None, alias="startTime") - end_time: Optional[str] = Field(None, alias="endTime") - record_count: Optional[int] = Field(None, alias="recordCount") - unbilled_page_count: Optional[int] = Field(None, alias="unbilledPageCount") - download_fee: Optional[float] = Field(None, alias="downloadFee") - pages: Optional[int] = None - search_type: Optional[str] = Field(None, alias="searchType") - criteria: Optional[dict] = None + start_time: str | None = Field(None, alias="startTime") + end_time: str | None = Field(None, alias="endTime") + record_count: int | None = Field(None, alias="recordCount") + unbilled_page_count: int | None = Field(None, alias="unbilledPageCount") + download_fee: float | None = Field(None, alias="downloadFee") + pages: int | None = None + search_type: str | None = Field(None, alias="searchType") + criteria: dict | None = None @property def is_complete(self) -> bool: @@ -268,6 +268,6 @@ def is_running(self) -> bool: class BatchJobListResponse(BaseModel): """Response from batch job list API.""" - receipt: Optional[Receipt] = None - page_info: Optional[PageInfo] = Field(None, alias="pageInfo") + receipt: Receipt | None = None + page_info: PageInfo | None = Field(None, alias="pageInfo") content: list[BatchJobInfo] = [] diff --git a/src/pacer_cli/parser.py b/src/pacer_cli/parser.py index e6eb56a..1abf236 100644 --- a/src/pacer_cli/parser.py +++ b/src/pacer_cli/parser.py @@ -16,8 +16,8 @@ HAS_SELECTOLAX = False from .docket_types import ( - DocketMeta, DocketEntry, + DocketMeta, ParsedDocket, ) diff --git a/src/pacer_cli/pcl.py b/src/pacer_cli/pcl.py index 68faf76..10469f1 100644 --- a/src/pacer_cli/pcl.py +++ b/src/pacer_cli/pcl.py @@ -7,8 +7,9 @@ API Documentation: https://pacer.uscourts.gov/help/pacer/pacer-case-locator-api-user-guide """ +from collections.abc import Callable from dataclasses import dataclass, field -from typing import Callable, Optional, TypeVar +from typing import TypeVar import requests @@ -16,7 +17,6 @@ from .auth import authenticate from .config import PacerConfig -from .security import create_secure_session from .models import ( BatchJobInfo, BatchJobListResponse, @@ -25,6 +25,7 @@ PartySearchCriteria, PartySearchResponse, ) +from .security import create_secure_session class PCLError(Exception): @@ -64,7 +65,7 @@ class PCLClient: """ config: PacerConfig - _token: Optional[str] = field(default=None, repr=False) + _token: str | None = field(default=None, repr=False) _session: requests.Session = field(default_factory=requests.Session, repr=False) def __post_init__(self): @@ -93,7 +94,7 @@ def _make_request( self, method: str, endpoint: str, - payload: Optional[dict] = None, + payload: dict | None = None, retry_auth: bool = True, ) -> requests.Response: """Make an authenticated request to the PCL API. diff --git a/src/pacer_cli/reader.py b/src/pacer_cli/reader.py index 6272744..7a4a935 100644 --- a/src/pacer_cli/reader.py +++ b/src/pacer_cli/reader.py @@ -642,7 +642,7 @@ def search_docket( ] docket_path = self.processed_path / docket - with open(docket_path, "r", newline="", encoding="utf-8") as f: + with open(docket_path, newline="", encoding="utf-8") as f: reader = csv.reader(f, dialect="excel") for row in reader: if not header_passed: @@ -723,7 +723,7 @@ def write_all_matches(self, suffix: str, overwrite_flag: bool = False) -> None: output_file = self.output_path / f"all_match__{suffix}.csv" if not overwrite_flag and output_file.exists(): - raise IOError( + raise OSError( f'A .csv with the suffix "{suffix}" already exists. ' "Choose new suffix or specify overwrite_flag." ) @@ -763,7 +763,7 @@ def write_individual_matches(self, suffix: str, overwrite_flag: bool = False) -> for f in result_path.iterdir(): f.unlink() else: - raise IOError( + raise OSError( f'.csv files with the suffix "{suffix}" already exist. ' "Choose new suffix or specify overwrite_flag." ) diff --git a/src/pacer_cli/security.py b/src/pacer_cli/security.py index a0e4d24..f803fa0 100644 --- a/src/pacer_cli/security.py +++ b/src/pacer_cli/security.py @@ -21,11 +21,12 @@ import ssl import threading import time +from collections.abc import Iterator from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path -from typing import TYPE_CHECKING, Iterator, Literal, Optional +from typing import TYPE_CHECKING, Literal if TYPE_CHECKING: from .config import PacerConfig @@ -132,7 +133,7 @@ def reset(self) -> None: # Module-level rate limiter (resettable for tests) -_rate_limiter: Optional[RateLimiter] = None +_rate_limiter: RateLimiter | None = None _rate_limiter_lock = threading.Lock() @@ -212,10 +213,10 @@ def show_peak_hours_warning(entry_count: int = 0) -> None: class AuditLogger: """Append-only audit log for PACER operations.""" - def __init__(self, log_dir: Optional[Path] = None): + def __init__(self, log_dir: Path | None = None): # Resolve LOG_DIR at call time (not import) so tests can redirect it. self.log_dir = log_dir if log_dir is not None else LOG_DIR - self._logger: Optional[logging.Logger] = None + self._logger: logging.Logger | None = None self._init_lock = threading.Lock() def _ensure_logger(self) -> logging.Logger: @@ -248,10 +249,10 @@ def log_request( self, method: str, url: str, - status_code: Optional[int] = None, + status_code: int | None = None, cost: float = 0.0, - error: Optional[str] = None, - client_code: Optional[str] = None, + error: str | None = None, + client_code: str | None = None, ) -> None: logger = self._ensure_logger() parts = [f"{method} {url}"] @@ -272,7 +273,7 @@ def log_download( size_bytes: int, pages: int = 0, cost: float = 0.0, - client_code: Optional[str] = None, + client_code: str | None = None, ) -> None: logger = self._ensure_logger() line = ( @@ -284,7 +285,7 @@ def log_download( logger.info(line) -_audit_logger: Optional[AuditLogger] = None +_audit_logger: AuditLogger | None = None _audit_logger_lock = threading.Lock() @@ -367,7 +368,7 @@ def is_valid_matter_code(code: str) -> bool: return bool(_MATTER_CODE_RE.match(code)) -def spend_today(client_code: Optional[str] = None) -> float: +def spend_today(client_code: str | None = None) -> float: """Sum today's billed cost (UTC calendar day) from the current audit log. Reads the same append-only log AuditLogger writes - no separate store. Uses @@ -397,11 +398,11 @@ def spend_today(client_code: Optional[str] = None) -> float: def check_spend( - config: "PacerConfig", + config: PacerConfig, estimated_cost: float, *, prior_spend: float, - client_code: Optional[str] = None, + client_code: str | None = None, ) -> None: """Pure, side-effect-free preventive cap check. @@ -457,7 +458,7 @@ class SpendLockTimeout(GovernanceError): @contextmanager -def spend_lock(timeout: float = 30.0) -> "Iterator[None]": +def spend_lock(timeout: float = 30.0) -> Iterator[None]: """Hold the cross-process + in-process spend lock for a billable op. Reentrant within a process: the cross-process flock is taken only on the @@ -549,7 +550,7 @@ def close(self) -> None: pass # best-effort cleanup; socket already broken is harmless self._closed = True - def __enter__(self) -> "StreamingDownload": + def __enter__(self) -> StreamingDownload: return self def __exit__(self, *exc) -> None: @@ -560,7 +561,7 @@ def __exit__(self, *exc) -> None: def streaming_download( session: requests.Session, url: str, - headers: Optional[dict] = None, + headers: dict | None = None, timeout: int = 120, max_size: int = MAX_MEMORY_RESPONSE_SIZE, ) -> Iterator[StreamingDownload]: @@ -641,7 +642,7 @@ def request_with_retry( **kwargs, ) -> requests.Response: """Make an HTTP request with exponential backoff on retryable errors.""" - last_exc: Optional[Exception] = None + last_exc: Exception | None = None for attempt in range(max_retries + 1): try: resp = session.request(method, url, **kwargs) diff --git a/src/pacer_cli/selection.py b/src/pacer_cli/selection.py index b39f941..bb6e61a 100644 --- a/src/pacer_cli/selection.py +++ b/src/pacer_cli/selection.py @@ -1,6 +1,5 @@ """Interactive selection utilities for CLI.""" -from typing import List, Optional, Tuple from rich.console import Console from rich.panel import Panel @@ -15,7 +14,7 @@ class CaseSelector: """Interactive case selection from search results.""" - def __init__(self, results: List[CaseResult], page_size: int = 20): + def __init__(self, results: list[CaseResult], page_size: int = 20): self.results = results self.page_size = page_size self.current_page = 0 @@ -51,7 +50,7 @@ def display_page(self) -> None: console.print(table) console.print(f"[dim]{len(self.results)} total results[/]") - def prompt_selection(self) -> Optional[CaseResult]: + def prompt_selection(self) -> CaseResult | None: """Prompt user for single case selection.""" self.display_page() @@ -83,7 +82,7 @@ def prompt_selection(self) -> Optional[CaseResult]: except ValueError: console.print("[yellow]Enter a number or n/p/q[/]") - def prompt_multi_selection(self) -> List[CaseResult]: + def prompt_multi_selection(self) -> list[CaseResult]: """Select multiple cases with '1,3,5-10' syntax.""" self.display_page() @@ -114,7 +113,7 @@ def prompt_multi_selection(self) -> List[CaseResult]: return selected - def prompt_action(self, case: CaseResult) -> Optional[str]: + def prompt_action(self, case: CaseResult) -> str | None: """Prompt for action on selected case.""" console.print( Panel( @@ -138,7 +137,7 @@ def prompt_action(self, case: CaseResult) -> Optional[str]: return action -def interactive_case_select(results: List[CaseResult]) -> Optional[Tuple[CaseResult, str]]: +def interactive_case_select(results: list[CaseResult]) -> tuple[CaseResult, str] | None: """Run interactive selection loop. Returns: @@ -164,7 +163,7 @@ def interactive_case_select(results: List[CaseResult]) -> Optional[Tuple[CaseRes return (case, action) -def interactive_multi_select(results: List[CaseResult]) -> List[CaseResult]: +def interactive_multi_select(results: list[CaseResult]) -> list[CaseResult]: """Run interactive multi-selection. Returns: diff --git a/src/pacer_cli/vault.py b/src/pacer_cli/vault.py index 5782e8d..a6e3e41 100644 --- a/src/pacer_cli/vault.py +++ b/src/pacer_cli/vault.py @@ -16,7 +16,6 @@ import secrets from base64 import b64decode, b64encode from pathlib import Path -from typing import Optional from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.kdf.scrypt import Scrypt @@ -72,9 +71,9 @@ class PacerVault: def __init__(self, path: Path = VAULT_PATH): self.path = path - self._key: Optional[bytes] = None + self._key: bytes | None = None self._data: dict = {} - self._salt: Optional[bytes] = None + self._salt: bytes | None = None @property def is_locked(self) -> bool: @@ -125,7 +124,7 @@ def unlock(self, passphrase: str) -> None: if not self.exists: raise VaultError(f"No vault at {self.path}, run init() first") - with open(self.path, "r") as f: + with open(self.path) as f: vault_data = json.load(f) self._salt = b64decode(vault_data["salt"]) @@ -153,7 +152,7 @@ def lock(self) -> None: self._key = None self._data = {} - def get(self, name: str) -> Optional[str]: + def get(self, name: str) -> str | None: """Get a decrypted secret by name.""" if self.is_locked: raise VaultLocked("Vault is locked") From dddd96fbdc7628d593744bdc9cfcfc782ec0a09f Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Jun 2026 04:04:20 +0000 Subject: [PATCH 2/4] Ruff cleanup (2/2 part 1): fix E402/E701/E741/N818 + trailing whitespace - pcl.py: move .auth/.config/.models/.security imports above TypeVar (E402) - downloader.py: split one-line if-statements onto two lines (E701) - parser.py: rename ambiguous 'l' to 'ln' (E741) - security.py/vault.py: add # noqa: N818 to intentionally-named exceptions - cli.py: strip trailing/blank-line whitespace (W291/W293) https://claude.ai/code/session_01NNvhsYRVWhjfcdgaSmU5bt --- src/pacer_cli/cli.py | 4 ++-- src/pacer_cli/downloader.py | 24 ++++++++++++++++-------- src/pacer_cli/parser.py | 2 +- src/pacer_cli/pcl.py | 4 ++-- src/pacer_cli/security.py | 6 +++--- src/pacer_cli/vault.py | 2 +- 6 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/pacer_cli/cli.py b/src/pacer_cli/cli.py index 5526504..bed2984 100644 --- a/src/pacer_cli/cli.py +++ b/src/pacer_cli/cli.py @@ -62,7 +62,7 @@ class AliasGroup(click.Group): """Click Group that supports command aliases. - + Allows users to use shorter command names that map to full paths. Example: 'pacer docket' -> 'pacer download docket' """ @@ -315,7 +315,7 @@ def cli(ctx, yes: bool, agent: bool): \b Short aliases: pacer docket → pacer download docket - pacer doc → pacer download document + pacer doc → pacer download document pacer grep → pacer search pacer cases → pacer pcl cases pacer parties → pacer pcl parties diff --git a/src/pacer_cli/downloader.py b/src/pacer_cli/downloader.py index a7eb4f8..82cfbb3 100644 --- a/src/pacer_cli/downloader.py +++ b/src/pacer_cli/downloader.py @@ -785,14 +785,22 @@ def download_document( # Build POST data from goDLS params form_data = {} - if caseid: form_data['caseid'] = caseid - if de_seq: form_data['de_seq_num'] = de_seq - if got_receipt: form_data['got_receipt'] = got_receipt - if pdf_hdr: form_data['pdf_header'] = pdf_hdr - if pdf_toggle: form_data['pdf_toggle_possible'] = pdf_toggle - if magic: form_data['magic_num'] = magic - if hdr: form_data['hdr'] = hdr - if psf: form_data['psf_report'] = psf + if caseid: + form_data['caseid'] = caseid + if de_seq: + form_data['de_seq_num'] = de_seq + if got_receipt: + form_data['got_receipt'] = got_receipt + if pdf_hdr: + form_data['pdf_header'] = pdf_hdr + if pdf_toggle: + form_data['pdf_toggle_possible'] = pdf_toggle + if magic: + form_data['magic_num'] = magic + if hdr: + form_data['hdr'] = hdr + if psf: + form_data['psf_report'] = psf parsed = urlparse(doc_url) form_url = f"{parsed.scheme}://{parsed.netloc}{path}" diff --git a/src/pacer_cli/parser.py b/src/pacer_cli/parser.py index 1abf236..864e2ac 100644 --- a/src/pacer_cli/parser.py +++ b/src/pacer_cli/parser.py @@ -86,7 +86,7 @@ def parse_docket_selectolax(html: str) -> ParsedDocket: # Left column (60%) if 'v.' in cell_text or 'v ' in cell_text: # First line is case title - lines = [l.strip() for l in cell_text.split('\n') if l.strip()] + lines = [ln.strip() for ln in cell_text.split('\n') if ln.strip()] if lines: case_title = lines[0] diff --git a/src/pacer_cli/pcl.py b/src/pacer_cli/pcl.py index 10469f1..34fdeee 100644 --- a/src/pacer_cli/pcl.py +++ b/src/pacer_cli/pcl.py @@ -13,8 +13,6 @@ import requests -T = TypeVar("T") - from .auth import authenticate from .config import PacerConfig from .models import ( @@ -27,6 +25,8 @@ ) from .security import create_secure_session +T = TypeVar("T") + class PCLError(Exception): """Base exception for PCL API errors.""" diff --git a/src/pacer_cli/security.py b/src/pacer_cli/security.py index f803fa0..a847960 100644 --- a/src/pacer_cli/security.py +++ b/src/pacer_cli/security.py @@ -334,7 +334,7 @@ class BudgetError(GovernanceError): error_key = "budget_exceeded" -class MatterRequired(GovernanceError): +class MatterRequired(GovernanceError): # noqa: N818 """policy.csv requires a client/matter code for billable operations.""" error_key = "matter_required" @@ -347,7 +347,7 @@ class ScopeError(GovernanceError): error_key = "scope_empty" -class MatterInvalid(GovernanceError): +class MatterInvalid(GovernanceError): # noqa: N818 """The client/matter code contains unsafe characters or is too long. The code is written verbatim into the audit line that doubles as the spend @@ -438,7 +438,7 @@ def check_spend( ) -class SpendLockTimeout(GovernanceError): +class SpendLockTimeout(GovernanceError): # noqa: N818 """Could not acquire the spend lock in time — refuse rather than risk a concurrent overspend (fail-closed).""" diff --git a/src/pacer_cli/vault.py b/src/pacer_cli/vault.py index a6e3e41..48b4c9a 100644 --- a/src/pacer_cli/vault.py +++ b/src/pacer_cli/vault.py @@ -57,7 +57,7 @@ class VaultError(Exception): pass -class VaultLocked(VaultError): +class VaultLocked(VaultError): # noqa: N818 """Vault is locked, call unlock() first.""" pass From cc64dfcc819d964b3b3aacb49f59d34cfd7aadbf Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Jun 2026 04:09:07 +0000 Subject: [PATCH 3/4] Ruff cleanup (2/2): wrap remaining E501 long lines to <=100 Wrap all remaining long lines without changing runtime behavior or the line-length config (kept at 100). Breaks long function calls/defs at commas, long strings via implicit concatenation (content preserved), and long boolean/conditional expressions with parentheses. ruff check src/ now reports zero findings; pyflakes and tools/audit.py stay clean; pytest stays at 302 passed, 9 skipped. https://claude.ai/code/session_01NNvhsYRVWhjfcdgaSmU5bt --- src/pacer_cli/cli.py | 172 ++++++++++++++++++++++++++++------ src/pacer_cli/config.py | 4 +- src/pacer_cli/docket_types.py | 15 ++- src/pacer_cli/downloader.py | 78 ++++++++++++--- src/pacer_cli/errors.py | 15 ++- src/pacer_cli/mcp_server.py | 20 +++- src/pacer_cli/reader.py | 16 +++- src/pacer_cli/security.py | 4 +- 8 files changed, 267 insertions(+), 57 deletions(-) diff --git a/src/pacer_cli/cli.py b/src/pacer_cli/cli.py index bed2984..60f8639 100644 --- a/src/pacer_cli/cli.py +++ b/src/pacer_cli/cli.py @@ -296,7 +296,11 @@ def wrapper(*args, **kwargs): @click.group(cls=AliasGroup) @click.version_option() @click.option("--yes", "-y", is_flag=True, help="Skip cost confirmation prompts") -@click.option("--agent", is_flag=True, help="Non-interactive mode for AI agents (JSON errors, exit 3 on cap).") +@click.option( + "--agent", + is_flag=True, + help="Non-interactive mode for AI agents (JSON errors, exit 3 on cap).", +) @click.pass_context def cli(ctx, yes: bool, agent: bool): """PACER CLI - Download, parse, and search federal court documents. @@ -371,7 +375,11 @@ def cli(ctx, yes: bool, agent: bool): @cli.command("migrate") @click.option("--dry-run", is_flag=True, help="Show what would be moved without moving") -@click.option("--legacy-dir", type=click.Path(exists=True, path_type=Path), help="Legacy archive directory") +@click.option( + "--legacy-dir", + type=click.Path(exists=True, path_type=Path), + help="Legacy archive directory", +) @click.pass_context def migrate_archive(ctx, dry_run: bool, legacy_dir: Path | None): """Migrate from legacy flat archive to new hierarchical structure. @@ -600,14 +608,20 @@ def auth_init(ctx, qa: bool): try: code = generate_totp(totp_secret) console.print(f" [green]Valid![/] Current code: [bold]{code}[/]") - console.print(" [dim]This code should match what you'd see in Authy/Google Authenticator.[/]") + console.print( + " [dim]This code should match what you'd see in " + "Authy/Google Authenticator.[/]" + ) if Confirm.ask(" Does this code look correct?", default=True): break else: console.print(" [yellow]Let's try again.[/]") except Exception as e: console.print(f" [red]Invalid secret:[/] {e}") - console.print(" [dim]The secret should be a Base32 string (letters A-Z and digits 2-7).[/]") + console.print( + " [dim]The secret should be a Base32 string " + "(letters A-Z and digits 2-7).[/]" + ) if not Confirm.ask(" Try again?", default=True): totp_secret = None break @@ -698,7 +712,13 @@ def auth_init(ctx, qa: bool): help="Client billing code (optional)", ) @click.pass_context -def auth_login(ctx, username: str | None, password: str | None, totp_secret: str | None, client_code: str | None): +def auth_login( + ctx, + username: str | None, + password: str | None, + totp_secret: str | None, + client_code: str | None, +): """Store PACER credentials securely. \b @@ -741,7 +761,9 @@ def auth_login(ctx, username: str | None, password: str | None, totp_secret: str continue break - config_path = save_credentials(username, password, totp_secret, client_code, vault_passphrase=passphrase) + config_path = save_credentials( + username, password, totp_secret, client_code, vault_passphrase=passphrase + ) console.print(f"[green]Credentials saved to encrypted vault:[/] {config_path}") console.print("[dim]File permissions set to 600 (owner read/write only)[/]") @@ -749,7 +771,10 @@ def auth_login(ctx, username: str | None, password: str | None, totp_secret: str console.print("[green]MFA:[/] TOTP secret configured for automatic code generation") else: console.print("[yellow]MFA:[/] No TOTP secret provided.") - console.print("[dim]If your account has MFA enabled, use --totp-secret or pacer auth setup-mfa[/]") + console.print( + "[dim]If your account has MFA enabled, use --totp-secret or " + "pacer auth setup-mfa[/]" + ) @auth.command("code") @@ -784,7 +809,9 @@ def auth_code(ctx, watch: bool): code = generate_totp(secret) # Calculate seconds until next code remaining = 30 - (int(time.time()) % 30) - console.print(f"\r[bold green]{code}[/] [dim]expires in {remaining:2d}s[/]", end="") + console.print( + f"\r[bold green]{code}[/] [dim]expires in {remaining:2d}s[/]", end="" + ) time.sleep(1) except KeyboardInterrupt: console.print("\n") @@ -1002,7 +1029,10 @@ def use_status(ctx): documents_dir = context.case_path / "documents" doc_count = len(list(documents_dir.glob("*.pdf"))) if documents_dir.exists() else 0 - table.add_row("Docket", "[green]downloaded[/]" if docket.exists() else "[dim]not downloaded[/]") + table.add_row( + "Docket", + "[green]downloaded[/]" if docket.exists() else "[dim]not downloaded[/]", + ) table.add_row("Doc manifest", "[green]cached[/]" if docs.exists() else "[dim]none[/]") if doc_count > 0: table.add_row("Documents", f"[green]{doc_count} PDF(s)[/]") @@ -1033,7 +1063,12 @@ def download(): @click.argument("case_number", required=False) @click.argument("court_id", required=False) @click.option("--verbose", "-v", is_flag=True, help="Enable verbose/trace logging") -@click.option("--case-link", "-l", default=None, help="Direct CM/ECF case link URL (bypasses PCL search)") +@click.option( + "--case-link", + "-l", + default=None, + help="Direct CM/ECF case link URL (bypasses PCL search)", +) @click.option("--legacy", is_flag=True, help="Use legacy flat archive structure") @matter_option @click.pass_context @@ -1074,7 +1109,9 @@ def download_docket_cmd( context = ContextConfig.load() if not context.is_set: err_console.print("[red]Error:[/] No case specified and no context set.") - err_console.print("[dim]Either provide arguments or use:[/] pacer use case ") + err_console.print( + "[dim]Either provide arguments or use:[/] pacer use case " + ) sys.exit(1) case_number = case_number or context.case_number court_id = court_id or context.court @@ -1119,7 +1156,9 @@ def download_docket_cmd( TextColumn("[progress.description]{task.description}"), console=console, ) as progress: - progress.add_task(f"Downloading docket {case_number} from {court_normalized}...", total=None) + progress.add_task( + f"Downloading docket {case_number} from {court_normalized}...", total=None + ) downloader = DocketDownloader(config, verbose=verbose) @@ -1164,7 +1203,9 @@ def download_docket_cmd( @matter_option @click.pass_context @under_spend_lock -def download_document(ctx, doc_number: str, doc_link: str | None, verbose: bool, client_code: str | None): +def download_document( + ctx, doc_number: str, doc_link: str | None, verbose: bool, client_code: str | None +): """Download a single document from a case. \b @@ -1193,7 +1234,9 @@ def download_document(ctx, doc_number: str, doc_link: str | None, verbose: bool, # Need context to resolve the link if not context.is_set or not context.case_path: err_console.print("[red]Error:[/] No document link provided and no context set.") - err_console.print("[dim]Either provide a link or set context:[/] pacer use case ") + err_console.print( + "[dim]Either provide a link or set context:[/] pacer use case " + ) sys.exit(1) case_dir = context.case_path @@ -1335,7 +1378,8 @@ def list_documents(ctx, case: str | None, output_json: bool): ) console.print(table) - console.print(f"\n[dim]Total: {docs.get('document_count', len(docs.get('documents', [])))} documents with links[/]") + total_docs = docs.get("document_count", len(docs.get("documents", []))) + console.print(f"\n[dim]Total: {total_docs} documents with links[/]") console.print(f"[dim]Case: {docs.get('case_title', '')}[/]") @@ -1439,7 +1483,14 @@ def view_case( @matter_option @click.pass_context @under_spend_lock -def download_batch(ctx, csv_file: Path, column_court: str, column_case: str, verbose: bool, client_code: str | None): +def download_batch( + ctx, + csv_file: Path, + column_court: str, + column_case: str, + verbose: bool, + client_code: str | None, +): """Download multiple dockets from a CSV file. The CSV should have columns for court ID and case number. @@ -2067,7 +2118,11 @@ def pcl_cases( response = client.search_cases(criteria, page=page) all_results = response.content page_info = response.page_info - total_fee = float(response.receipt.search_fee) if response.receipt and response.receipt.search_fee else 0.0 + total_fee = ( + float(response.receipt.search_fee) + if response.receipt and response.receipt.search_fee + else 0.0 + ) # Record actual spend from the receipt so caps converge on real cost. if total_fee > 0: @@ -2098,7 +2153,17 @@ def pcl_cases( elif output_csv: import csv as csv_module import io - fieldnames = ["court_id", "case_number_full", "case_type", "case_title", "date_filed", "effective_date_closed", "jurisdiction_type", "nature_of_suit", "case_link"] + fieldnames = [ + "court_id", + "case_number_full", + "case_type", + "case_title", + "date_filed", + "effective_date_closed", + "jurisdiction_type", + "nature_of_suit", + "case_link", + ] if output: with open(output, "w", newline="") as f: @@ -2142,7 +2207,10 @@ def pcl_cases( f"({page_info.total_elements} total cases)[/]" ) if len(all_results) > 50: - console.print(f"[dim]Showing first 50 of {len(all_results)}. Use --json or --csv for full output.[/]") + console.print( + f"[dim]Showing first 50 of {len(all_results)}. " + "Use --json or --csv for full output.[/]" + ) # Interactive selection if interactive and all_results: @@ -2202,7 +2270,9 @@ def _handle_case_action(ctx, config: PacerConfig, case, action: str): ctx.invoke(view_case, case=str(docket_path)) else: err_console.print("[yellow]Docket not downloaded yet.[/]") - err_console.print(f"[dim]Download with:[/] pacer download docket \"{case_number}\" {court}") + err_console.print( + f'[dim]Download with:[/] pacer download docket "{case_number}" {court}' + ) elif action == "s": # Set as context @@ -2383,7 +2453,11 @@ def pcl_parties( response = client.search_parties(criteria, page=page) all_results = response.content page_info = response.page_info - total_fee = float(response.receipt.search_fee) if response.receipt and response.receipt.search_fee else 0.0 + total_fee = ( + float(response.receipt.search_fee) + if response.receipt and response.receipt.search_fee + else 0.0 + ) # Record actual spend from the receipt so caps converge on real cost. if total_fee > 0: @@ -2412,7 +2486,16 @@ def pcl_parties( elif output_csv: import csv as csv_module import io - fieldnames = ["last_name", "first_name", "middle_name", "party_role", "court_id", "case_number_full", "case_title", "date_filed"] + fieldnames = [ + "last_name", + "first_name", + "middle_name", + "party_role", + "court_id", + "case_number_full", + "case_title", + "date_filed", + ] if output: with open(output, "w", newline="") as f: @@ -2456,7 +2539,10 @@ def pcl_parties( f"({page_info.total_elements} total parties)[/]" ) if len(all_results) > 50: - console.print(f"[dim]Showing first 50 of {len(all_results)}. Use --json or --csv for full output.[/]") + console.print( + f"[dim]Showing first 50 of {len(all_results)}. " + "Use --json or --csv for full output.[/]" + ) except PCLError as e: err_console.print(f"[red]Error:[/] {e}") @@ -2487,7 +2573,13 @@ def pcl_batch(): @pcl_batch.command("list") -@click.option("--type", "search_type", type=click.Choice(["cases", "parties"]), default="cases", help="Search type") +@click.option( + "--type", + "search_type", + type=click.Choice(["cases", "parties"]), + default="cases", + help="Search type", +) @click.pass_context def batch_list(ctx, search_type): """List all batch jobs.""" @@ -2531,7 +2623,13 @@ def batch_list(ctx, search_type): @pcl_batch.command("status") @click.argument("report_id", type=int) -@click.option("--type", "search_type", type=click.Choice(["cases", "parties"]), default="cases", help="Search type") +@click.option( + "--type", + "search_type", + type=click.Choice(["cases", "parties"]), + default="cases", + help="Search type", +) @click.pass_context def batch_status(ctx, report_id, search_type): """Check status of a batch job.""" @@ -2567,8 +2665,20 @@ def batch_status(ctx, report_id, search_type): @pcl_batch.command("download") @click.argument("report_id", type=int) -@click.option("--type", "search_type", type=click.Choice(["cases", "parties"]), default="cases", help="Search type") -@click.option("--output", "-o", type=click.Path(path_type=Path), required=True, help="Output file (JSON)") +@click.option( + "--type", + "search_type", + type=click.Choice(["cases", "parties"]), + default="cases", + help="Search type", +) +@click.option( + "--output", + "-o", + type=click.Path(path_type=Path), + required=True, + help="Output file (JSON)", +) @click.pass_context def batch_download(ctx, report_id, search_type, output): """Download results from a completed batch job.""" @@ -2604,7 +2714,13 @@ def batch_download(ctx, report_id, search_type, output): @pcl_batch.command("delete") @click.argument("report_id", type=int) -@click.option("--type", "search_type", type=click.Choice(["cases", "parties"]), default="cases", help="Search type") +@click.option( + "--type", + "search_type", + type=click.Choice(["cases", "parties"]), + default="cases", + help="Search type", +) @click.pass_context def batch_delete(ctx, report_id, search_type): """Delete a batch job and its results.""" diff --git a/src/pacer_cli/config.py b/src/pacer_cli/config.py index f81ea6f..a7dd1a2 100644 --- a/src/pacer_cli/config.py +++ b/src/pacer_cli/config.py @@ -274,7 +274,9 @@ def save_credentials( Path to vault file """ if vault_passphrase: - return _save_credentials_vault(username, password, totp_secret, client_code, vault_passphrase) + return _save_credentials_vault( + username, password, totp_secret, client_code, vault_passphrase + ) return _save_credentials_legacy(username, password, totp_secret, client_code) diff --git a/src/pacer_cli/docket_types.py b/src/pacer_cli/docket_types.py index 317a903..7690bc4 100644 --- a/src/pacer_cli/docket_types.py +++ b/src/pacer_cli/docket_types.py @@ -44,7 +44,9 @@ def to_compact(self) -> str: atty_names = ', '.join(a.name for a in self.attorneys[:2]) if len(self.attorneys) > 2: atty_names += f" +{len(self.attorneys) - 2}" - return f"{role_abbrev}: {self.name} | Atty: {atty_names}" if atty_names else f"{role_abbrev}: {self.name}" + if atty_names: + return f"{role_abbrev}: {self.name} | Atty: {atty_names}" + return f"{role_abbrev}: {self.name}" @dataclass @@ -163,13 +165,20 @@ def to_markdown(self, verbose: bool = False) -> str: if self.meta.judge: lines.append(f"**Judge:** {self.meta.judge} ") if self.meta.nature_of_suit: - lines.append(f"**Nature of Suit:** {self.meta.nature_of_suit} - {self.meta.nos_description} ") + lines.append( + f"**Nature of Suit:** {self.meta.nature_of_suit} - " + f"{self.meta.nos_description} " + ) # Parties if self.parties: lines.extend(["", "## Parties", ""]) for party in self.parties: - atty_list = ", ".join(a.name for a in party.attorneys) if party.attorneys else "(none)" + atty_list = ( + ", ".join(a.name for a in party.attorneys) + if party.attorneys + else "(none)" + ) lines.append(f"- **{party.role}:** {party.name}") if party.pro_se: lines.append(" - *Pro Se*") diff --git a/src/pacer_cli/downloader.py b/src/pacer_cli/downloader.py index 82cfbb3..e27cca6 100644 --- a/src/pacer_cli/downloader.py +++ b/src/pacer_cli/downloader.py @@ -183,7 +183,10 @@ def _cso_login(self, court_id: str, app_url: str) -> bool: # Extract the ViewState (JSF CSRF token) - both javax and jakarta variants viewstate = None import re - vs_match = re.search(r'name="(?:javax|jakarta)\.faces\.ViewState"[^>]*value="([^"]*)"', resp.text) + vs_match = re.search( + r'name="(?:javax|jakarta)\.faces\.ViewState"[^>]*value="([^"]*)"', + resp.text, + ) if vs_match: viewstate = vs_match.group(1) self._log(f"Found ViewState: {viewstate[:50]}...") @@ -222,7 +225,10 @@ def _cso_login(self, court_id: str, app_url: str) -> bool: self._log("MFA step required, submitting OTP via PrimeFaces AJAX...") # Get new ViewState for MFA form - vs_match = re.search(r'name="(?:javax|jakarta)\.faces\.ViewState"[^>]*value="([^"]*)"', resp.text) + vs_match = re.search( + r'name="(?:javax|jakarta)\.faces\.ViewState"[^>]*value="([^"]*)"', + resp.text, + ) mfa_viewstate = vs_match.group(1) if vs_match else "" from .auth import generate_totp @@ -412,7 +418,10 @@ def download_docket_by_link( # Retry the request after login resp = self.session.get(case_link, headers=headers, timeout=30) - self._log(f"After CSO login, response: {resp.status_code}, length: {len(resp.text)}") + self._log( + f"After CSO login, response: {resp.status_code}, " + f"length: {len(resp.text)}" + ) # Check again for JS redirect (login may have failed) if "location.assign" in resp.text and "csologin" in resp.text: @@ -455,7 +464,9 @@ def download_docket_by_link( # Extract the form action URL (contains session token) import re - form_action_match = re.search(r']*action="([^"]+)"', resp.text, re.IGNORECASE) + form_action_match = re.search( + r']*action="([^"]+)"', resp.text, re.IGNORECASE + ) if form_action_match: form_action = form_action_match.group(1) # Handle relative URLs @@ -468,7 +479,11 @@ def download_docket_by_link( self._log("No form action found, using docket URL") # Extract hidden form fields - hidden_fields = re.findall(r']*type="hidden"[^>]*name="([^"]*)"[^>]*value="([^"]*)"', resp.text, re.IGNORECASE) + hidden_fields = re.findall( + r']*type="hidden"[^>]*name="([^"]*)"[^>]*value="([^"]*)"', + resp.text, + re.IGNORECASE, + ) # Submit form with default options (all entries) form_data = { @@ -531,7 +546,10 @@ def download_docket_by_link( ) docs_filepath = output_dir / "docs.json" docs_filepath.write_text(json.dumps(docs_meta, indent=2), encoding="utf-8") - self._log(f"Saved docs.json to: {docs_filepath} ({docs_meta.get('document_count', 0)} docs)") + self._log( + f"Saved docs.json to: {docs_filepath} " + f"({docs_meta.get('document_count', 0)} docs)" + ) except Exception as e: self._log(f"Warning: Could not save docs.json: {e}") @@ -737,7 +755,10 @@ def download_document( try: resp = self.session.get(doc_url, headers=headers, timeout=60, allow_redirects=True) - self._log(f"Response: {resp.status_code}, content-type: {resp.headers.get('content-type', 'unknown')}") + self._log( + f"Response: {resp.status_code}, " + f"content-type: {resp.headers.get('content-type', 'unknown')}" + ) # Check for login redirect early (before following any links) if self._is_login_redirect(resp): @@ -775,13 +796,24 @@ def download_document( if 'View Document' in resp.text and 'goDLS' in resp.text: self._log("PACER receipt page detected, extracting goDLS params...") - # Extract goDLS parameters: goDLS(path, caseid, de_seq, got_receipt, pdf_hdr, pdf_toggle, magic, hdr, psf) + # Extract goDLS parameters: goDLS(path, caseid, de_seq, + # got_receipt, pdf_hdr, pdf_toggle, magic, hdr, psf) godls_match = re.search( r"goDLS\('([^']+)','([^']+)','([^']+)','([^']*)','([^']*)','([^']*)','([^']*)','([^']*)','([^']*)'\)", resp.text ) if godls_match: - path, caseid, de_seq, got_receipt, pdf_hdr, pdf_toggle, magic, hdr, psf = godls_match.groups() + ( + path, + caseid, + de_seq, + got_receipt, + pdf_hdr, + pdf_toggle, + magic, + hdr, + psf, + ) = godls_match.groups() # Build POST data from goDLS params form_data = {} @@ -806,23 +838,37 @@ def download_document( form_url = f"{parsed.scheme}://{parsed.netloc}{path}" self._log(f"POSTing to {form_url} with goDLS params: {form_data}") - resp = self.session.post(form_url, data=form_data, headers=headers, timeout=120) - self._log(f"goDLS POST response: {resp.status_code}, type: {resp.headers.get('content-type', 'unknown')}") + resp = self.session.post( + form_url, data=form_data, headers=headers, timeout=120 + ) + self._log( + f"goDLS POST response: {resp.status_code}, " + f"type: {resp.headers.get('content-type', 'unknown')}" + ) # PACER returns HTML with iframe containing PDF URL - if 'text/html' in resp.headers.get('content-type', '') and ']+src="([^"]+)"', resp.text) if iframe_match: pdf_path = iframe_match.group(1) pdf_url = f"{parsed.scheme}://{parsed.netloc}{pdf_path}" self._log(f"Following iframe to PDF: {pdf_url}") resp = self.session.get(pdf_url, headers=headers, timeout=120) - self._log(f"PDF response: {resp.status_code}, type: {resp.headers.get('content-type', 'unknown')}, size: {len(resp.content)}") + self._log( + f"PDF response: {resp.status_code}, " + f"type: {resp.headers.get('content-type', 'unknown')}, " + f"size: {len(resp.content)}" + ) # Fallback: check for other form types elif '