From 960f61fc1101d5999abf732afda49502b72b158c Mon Sep 17 00:00:00 2001 From: Lucas Obadia Date: Mon, 17 Nov 2025 23:24:52 +0100 Subject: [PATCH 1/4] [BM] Fix CI --- .pre-commit-config.yaml | 5 +++- README.md | 2 +- pyproject.toml | 11 ++++++-- src/gcp_sales_pipeline/bq_client.py | 5 +--- src/gcp_sales_pipeline/cli.py | 25 ++++--------------- src/gcp_sales_pipeline/filters.py | 4 +-- src/gcp_sales_pipeline/gcs_client.py | 9 ++----- src/gcp_sales_pipeline/pipeline.py | 21 ++++------------ .../integration/test_pipeline_integration.py | 5 ++++ 9 files changed, 33 insertions(+), 54 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ca1a8cb..e8f52be 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,12 +3,13 @@ repos: rev: 24.4.2 hooks: - id: black - language_version: python3 + language_version: python3.10 - repo: https://github.com/pycqa/flake8 rev: 7.0.0 hooks: - id: flake8 + args: ["--max-line-length=120"] - repo: local hooks: @@ -25,3 +26,5 @@ repos: args: ["tests/integration"] language: system pass_filenames: false + + diff --git a/README.md b/README.md index 6394c39..7ca9a74 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,7 @@ The pipeline: Products are read from table: ```text -test.products +bm_mock_data.products ``` Expected schema: diff --git a/pyproject.toml b/pyproject.toml index 17e3e78..60e1b70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "google-cloud-storage>=2.13.0", "pyarrow>=15.0.0", "python-dotenv>=1.0.0", + "db-dtypes>=1.2.0", ] [project.optional-dependencies] @@ -26,14 +27,20 @@ dev = [ "pre-commit>=3.7.0", ] +[tool.setuptools] +package-dir = {"" = "src"} + +[tool.setuptools.packages.find] +where = ["src"] + [project.scripts] gcp-sales-pipeline = "gcp_sales_pipeline.cli:main" [tool.black] -line-length = 88 +line-length = 120 target-version = ["py310"] [tool.flake8] -max-line-length = 88 +max-line-length = 120 extend-ignore = ["E203", "W503"] diff --git a/src/gcp_sales_pipeline/bq_client.py b/src/gcp_sales_pipeline/bq_client.py index 858efd4..e77f956 100644 --- a/src/gcp_sales_pipeline/bq_client.py +++ b/src/gcp_sales_pipeline/bq_client.py @@ -60,10 +60,7 @@ def load_products( required_cols = {"product_id", "product_name", "category", "brand", "condition"} missing = required_cols - set(df.columns) if missing: - msg = ( - f"Products table {table_full_name} is missing required columns: " - f"{', '.join(sorted(missing))}" - ) + msg = f"Products table {table_full_name} is missing required columns: " f"{', '.join(sorted(missing))}" logger.error(msg) raise DataQualityError(msg) diff --git a/src/gcp_sales_pipeline/cli.py b/src/gcp_sales_pipeline/cli.py index 87af42e..b2fadbd 100644 --- a/src/gcp_sales_pipeline/cli.py +++ b/src/gcp_sales_pipeline/cli.py @@ -22,17 +22,12 @@ def _valid_date(value: str): try: return datetime.strptime(value, DATE_FORMAT).date() except ValueError as exc: - raise argparse.ArgumentTypeError( - f"Invalid date '{value}', expected YYYY-MM-DD." - ) from exc + raise argparse.ArgumentTypeError(f"Invalid date '{value}', expected YYYY-MM-DD.") from exc def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace: parser = argparse.ArgumentParser( - description=( - "Join products (BigQuery) and sales (GCS) filtered by date " - "and export as parquet/csv." - ) + description=("Join products (BigQuery) and sales (GCS) filtered by date " "and export as parquet/csv.") ) parser.add_argument( @@ -42,7 +37,6 @@ def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace: help="Reference date in YYYY-MM-DD format.", ) - parser.add_argument( "--project-id", default="bot-sandbox-interviews-eb7b", @@ -59,10 +53,7 @@ def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace: parser.add_argument( "--brand", action="append", - help=( - "Filter on one or more brands. " - "Can be specified multiple times (AND combined with other filters)." - ), + help=("Filter on one or more brands. " "Can be specified multiple times (AND combined with other filters)."), ) parser.add_argument( @@ -110,10 +101,7 @@ def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace: "--max-sales-files", type=int, default=500, - help=( - "Maximum number of daily sales files (days) to read from GCS before " - "failing (default: 500)." - ), + help=("Maximum number of daily sales files (days) to read from GCS before " "failing (default: 500)."), ) parser.add_argument( @@ -139,10 +127,7 @@ def _check_credentials_env() -> None: raise SystemExit(msg) if not Path(creds_path).is_file(): - msg = ( - "Credentials file specified by GOOGLE_APPLICATION_CREDENTIALS " - f"does not exist: {creds_path}" - ) + msg = "Credentials file specified by GOOGLE_APPLICATION_CREDENTIALS " f"does not exist: {creds_path}" raise SystemExit(msg) diff --git a/src/gcp_sales_pipeline/filters.py b/src/gcp_sales_pipeline/filters.py index 323d9b0..fab607f 100644 --- a/src/gcp_sales_pipeline/filters.py +++ b/src/gcp_sales_pipeline/filters.py @@ -39,9 +39,7 @@ def filter_sales_by_date( if granularity == TimeGranularity.DAY: mask = df["sold_at"].dt.date == ref_date elif granularity == TimeGranularity.MONTH: - mask = (df["sold_at"].dt.year == ref_date.year) & ( - df["sold_at"].dt.month == ref_date.month - ) + mask = (df["sold_at"].dt.year == ref_date.year) & (df["sold_at"].dt.month == ref_date.month) elif granularity == TimeGranularity.QUARTER: ref_quarter = (ref_date.month - 1) // 3 + 1 sold_quarter = (df["sold_at"].dt.month - 1) // 3 + 1 diff --git a/src/gcp_sales_pipeline/gcs_client.py b/src/gcp_sales_pipeline/gcs_client.py index ec26771..de38aed 100644 --- a/src/gcp_sales_pipeline/gcs_client.py +++ b/src/gcp_sales_pipeline/gcs_client.py @@ -76,7 +76,6 @@ def _debug_list_nearby_sales_files( logger.info(" - gs://%s/%s", bucket.name, name) - def load_sales_parquet( project_id: str, bucket_name: str, @@ -103,9 +102,7 @@ def load_sales_parquet( - order_id (STRING) """ if end_date < start_date: - raise ValueError( - f"end_date {end_date} must be on or after start_date {start_date}" - ) + raise ValueError(f"end_date {end_date} must be on or after start_date {start_date}") num_days = (end_date - start_date).days + 1 if max_files is not None and num_days > max_files: @@ -139,9 +136,7 @@ def load_sales_parquet( try: data = blob.download_as_bytes() except NotFound: - logger.warning( - "Sales file not found for date %s: %s", current_date, full_uri - ) + logger.warning("Sales file not found for date %s: %s", current_date, full_uri) _debug_list_nearby_sales_files(bucket, current_date, max_results=10) continue except GoogleAPIError as exc: diff --git a/src/gcp_sales_pipeline/pipeline.py b/src/gcp_sales_pipeline/pipeline.py index 147171a..9c2c2a8 100644 --- a/src/gcp_sales_pipeline/pipeline.py +++ b/src/gcp_sales_pipeline/pipeline.py @@ -9,7 +9,6 @@ import pandas as pd from .bq_client import load_products -from .exceptions import DataLoadError, DataQualityError, TooManyFilesError from .filters import TimeGranularity, filter_sales_by_date from .gcs_client import load_sales_parquet @@ -58,15 +57,11 @@ def _build_suffix(ref_date: date, granularity: TimeGranularity) -> str: def _build_output_path(config: PipelineConfig) -> Path: """Build the full output path.""" suffix = _build_suffix(config.ref_date, config.granularity) - filename = ( - f"sales_products_{config.granularity.value}_{suffix}.{config.output_format}" - ) + filename = f"sales_products_{config.granularity.value}_{suffix}.{config.output_format}" return config.output_dir / filename -def _compute_date_range( - ref_date: date, granularity: TimeGranularity -) -> tuple[date, date]: +def _compute_date_range(ref_date: date, granularity: TimeGranularity) -> tuple[date, date]: """ Compute [start_date, end_date] for the given reference date and granularity. """ @@ -201,8 +196,7 @@ def run_pipeline(config: PipelineConfig) -> Path: null_count = products_df["product_id"].isna().sum() if null_count > 0: logger.warning( - "Dropping %d product rows with NULL product_id from products table " - "%s.%s.%s before joining.", + "Dropping %d product rows with NULL product_id from products table " "%s.%s.%s before joining.", null_count, config.project_id, config.bq_dataset, @@ -230,8 +224,7 @@ def run_pipeline(config: PipelineConfig) -> Path: if sales_df.empty: logger.warning( - "No sales data loaded from GCS for range %s to %s. " - "Exporting an empty dataset.", + "No sales data loaded from GCS for range %s to %s. " "Exporting an empty dataset.", start_date, end_date, ) @@ -258,9 +251,7 @@ def run_pipeline(config: PipelineConfig) -> Path: "Dropping %d sales rows with NULL product_id before joining.", null_sales_count, ) - filtered_sales_df = filtered_sales_df[ - filtered_sales_df["product_id"].notna() - ].copy() + filtered_sales_df = filtered_sales_df[filtered_sales_df["product_id"].notna()].copy() # 6. Pragmatic dedup: ensure product_id is unique in products_df if not products_df["product_id"].is_unique: @@ -320,5 +311,3 @@ def run_pipeline(config: PipelineConfig) -> Path: logger.info("Pipeline finished successfully.") return output_path - - diff --git a/tests/integration/test_pipeline_integration.py b/tests/integration/test_pipeline_integration.py index 834d92f..141b369 100644 --- a/tests/integration/test_pipeline_integration.py +++ b/tests/integration/test_pipeline_integration.py @@ -68,5 +68,10 @@ def fake_load_sales_parquet( "category", "brand", "condition", + "processed_at", + "sale_date", + "quarter", + "year", + "month", } assert len(df) == 2 From 08fa967ab3d1e00d8ca08cc14253ba2924ef9548 Mon Sep 17 00:00:00 2001 From: Lucas Obadia Date: Mon, 17 Nov 2025 23:32:13 +0100 Subject: [PATCH 2/4] update CI --- .github/workflows/ci.yml | 73 +++++++++++++++------------------------- 1 file changed, 27 insertions(+), 46 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cd3bbd4..129a389 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,64 +6,45 @@ on: pull_request: jobs: - lint: + black: runs-on: ubuntu-latest steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 with: - python-version: "3.11" - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install ".[dev]" - - - name: Run black - run: black --check src tests + python-version: "3.10" + - run: pip install pre-commit ".[dev]" + - run: pre-commit run black --all-files - - name: Run flake8 - run: flake8 src tests + flake8: + needs: black + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.10" + - run: pip install pre-commit ".[dev]" + - run: pre-commit run flake8 --all-files unit-tests: - needs: lint + needs: flake8 runs-on: ubuntu-latest steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 with: - python-version: "3.11" - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install ".[dev]" - - - name: Run unit tests - run: pytest tests/unit + python-version: "3.10" + - run: pip install ".[dev]" + - run: pytest tests/unit integration-tests: needs: unit-tests runs-on: ubuntu-latest steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 with: - python-version: "3.11" - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install ".[dev]" - - - name: Run integration tests - run: pytest tests/integration + python-version: "3.10" + - run: pip install ".[dev]" + - run: pytest tests/integration From d3f4a3e5f02f94d9613ebc2634791bf0b7b6aa6b Mon Sep 17 00:00:00 2001 From: Lucas Obadia Date: Mon, 17 Nov 2025 23:35:15 +0100 Subject: [PATCH 3/4] update CI --- .github/workflows/ci.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 129a389..21ec1ff 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,7 +17,6 @@ jobs: - run: pre-commit run black --all-files flake8: - needs: black runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -28,7 +27,6 @@ jobs: - run: pre-commit run flake8 --all-files unit-tests: - needs: flake8 runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -39,7 +37,6 @@ jobs: - run: pytest tests/unit integration-tests: - needs: unit-tests runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 From 4dca84e415336399ed868ebef12dcdc26a30cf7d Mon Sep 17 00:00:00 2001 From: Lucas Obadia Date: Mon, 17 Nov 2025 23:36:19 +0100 Subject: [PATCH 4/4] update CI --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 21ec1ff..7f939cc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,6 +17,7 @@ jobs: - run: pre-commit run black --all-files flake8: + needs: black runs-on: ubuntu-latest steps: - uses: actions/checkout@v4