Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 26 additions & 47 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,64 +6,43 @@ on:
pull_request:

jobs:
lint:
black:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install ".[dev]"

- name: Run black
run: black --check src tests
python-version: "3.10"
- run: pip install pre-commit ".[dev]"
- run: pre-commit run black --all-files

- name: Run flake8
run: flake8 src tests
flake8:
needs: black
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.10"
- run: pip install pre-commit ".[dev]"
- run: pre-commit run flake8 --all-files

unit-tests:
needs: lint
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install ".[dev]"

- name: Run unit tests
run: pytest tests/unit
python-version: "3.10"
- run: pip install ".[dev]"
- run: pytest tests/unit

integration-tests:
needs: unit-tests
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install ".[dev]"

- name: Run integration tests
run: pytest tests/integration
python-version: "3.10"
- run: pip install ".[dev]"
- run: pytest tests/integration
5 changes: 4 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ repos:
rev: 24.4.2
hooks:
- id: black
language_version: python3
language_version: python3.10

- repo: https://github.com/pycqa/flake8
rev: 7.0.0
hooks:
- id: flake8
args: ["--max-line-length=120"]

- repo: local
hooks:
Expand All @@ -25,3 +26,5 @@ repos:
args: ["tests/integration"]
language: system
pass_filenames: false


2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ The pipeline:
Products are read from table:

```text
test.products
bm_mock_data.products
```

Expected schema:
Expand Down
11 changes: 9 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies = [
"google-cloud-storage>=2.13.0",
"pyarrow>=15.0.0",
"python-dotenv>=1.0.0",
"db-dtypes>=1.2.0",
]

[project.optional-dependencies]
Expand All @@ -26,14 +27,20 @@ dev = [
"pre-commit>=3.7.0",
]

[tool.setuptools]
package-dir = {"" = "src"}

[tool.setuptools.packages.find]
where = ["src"]

[project.scripts]
gcp-sales-pipeline = "gcp_sales_pipeline.cli:main"

[tool.black]
line-length = 88
line-length = 120
target-version = ["py310"]

[tool.flake8]
max-line-length = 88
max-line-length = 120
extend-ignore = ["E203", "W503"]

5 changes: 1 addition & 4 deletions src/gcp_sales_pipeline/bq_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ def load_products(
required_cols = {"product_id", "product_name", "category", "brand", "condition"}
missing = required_cols - set(df.columns)
if missing:
msg = (
f"Products table {table_full_name} is missing required columns: "
f"{', '.join(sorted(missing))}"
)
msg = f"Products table {table_full_name} is missing required columns: " f"{', '.join(sorted(missing))}"
logger.error(msg)
raise DataQualityError(msg)

Expand Down
25 changes: 5 additions & 20 deletions src/gcp_sales_pipeline/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,12 @@ def _valid_date(value: str):
try:
return datetime.strptime(value, DATE_FORMAT).date()
except ValueError as exc:
raise argparse.ArgumentTypeError(
f"Invalid date '{value}', expected YYYY-MM-DD."
) from exc
raise argparse.ArgumentTypeError(f"Invalid date '{value}', expected YYYY-MM-DD.") from exc


def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Join products (BigQuery) and sales (GCS) filtered by date "
"and export as parquet/csv."
)
description=("Join products (BigQuery) and sales (GCS) filtered by date " "and export as parquet/csv.")
)

parser.add_argument(
Expand All @@ -42,7 +37,6 @@ def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
help="Reference date in YYYY-MM-DD format.",
)


parser.add_argument(
"--project-id",
default="bot-sandbox-interviews-eb7b",
Expand All @@ -59,10 +53,7 @@ def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
parser.add_argument(
"--brand",
action="append",
help=(
"Filter on one or more brands. "
"Can be specified multiple times (AND combined with other filters)."
),
help=("Filter on one or more brands. " "Can be specified multiple times (AND combined with other filters)."),
)

parser.add_argument(
Expand Down Expand Up @@ -110,10 +101,7 @@ def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
"--max-sales-files",
type=int,
default=500,
help=(
"Maximum number of daily sales files (days) to read from GCS before "
"failing (default: 500)."
),
help=("Maximum number of daily sales files (days) to read from GCS before " "failing (default: 500)."),
)

parser.add_argument(
Expand All @@ -139,10 +127,7 @@ def _check_credentials_env() -> None:
raise SystemExit(msg)

if not Path(creds_path).is_file():
msg = (
"Credentials file specified by GOOGLE_APPLICATION_CREDENTIALS "
f"does not exist: {creds_path}"
)
msg = "Credentials file specified by GOOGLE_APPLICATION_CREDENTIALS " f"does not exist: {creds_path}"
raise SystemExit(msg)


Expand Down
4 changes: 1 addition & 3 deletions src/gcp_sales_pipeline/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ def filter_sales_by_date(
if granularity == TimeGranularity.DAY:
mask = df["sold_at"].dt.date == ref_date
elif granularity == TimeGranularity.MONTH:
mask = (df["sold_at"].dt.year == ref_date.year) & (
df["sold_at"].dt.month == ref_date.month
)
mask = (df["sold_at"].dt.year == ref_date.year) & (df["sold_at"].dt.month == ref_date.month)
elif granularity == TimeGranularity.QUARTER:
ref_quarter = (ref_date.month - 1) // 3 + 1
sold_quarter = (df["sold_at"].dt.month - 1) // 3 + 1
Expand Down
9 changes: 2 additions & 7 deletions src/gcp_sales_pipeline/gcs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def _debug_list_nearby_sales_files(
logger.info(" - gs://%s/%s", bucket.name, name)



def load_sales_parquet(
project_id: str,
bucket_name: str,
Expand All @@ -103,9 +102,7 @@ def load_sales_parquet(
- order_id (STRING)
"""
if end_date < start_date:
raise ValueError(
f"end_date {end_date} must be on or after start_date {start_date}"
)
raise ValueError(f"end_date {end_date} must be on or after start_date {start_date}")

num_days = (end_date - start_date).days + 1
if max_files is not None and num_days > max_files:
Expand Down Expand Up @@ -139,9 +136,7 @@ def load_sales_parquet(
try:
data = blob.download_as_bytes()
except NotFound:
logger.warning(
"Sales file not found for date %s: %s", current_date, full_uri
)
logger.warning("Sales file not found for date %s: %s", current_date, full_uri)
_debug_list_nearby_sales_files(bucket, current_date, max_results=10)
continue
except GoogleAPIError as exc:
Expand Down
21 changes: 5 additions & 16 deletions src/gcp_sales_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import pandas as pd

from .bq_client import load_products
from .exceptions import DataLoadError, DataQualityError, TooManyFilesError
from .filters import TimeGranularity, filter_sales_by_date
from .gcs_client import load_sales_parquet

Expand Down Expand Up @@ -58,15 +57,11 @@ def _build_suffix(ref_date: date, granularity: TimeGranularity) -> str:
def _build_output_path(config: PipelineConfig) -> Path:
"""Build the full output path."""
suffix = _build_suffix(config.ref_date, config.granularity)
filename = (
f"sales_products_{config.granularity.value}_{suffix}.{config.output_format}"
)
filename = f"sales_products_{config.granularity.value}_{suffix}.{config.output_format}"
return config.output_dir / filename


def _compute_date_range(
ref_date: date, granularity: TimeGranularity
) -> tuple[date, date]:
def _compute_date_range(ref_date: date, granularity: TimeGranularity) -> tuple[date, date]:
"""
Compute [start_date, end_date] for the given reference date and granularity.
"""
Expand Down Expand Up @@ -201,8 +196,7 @@ def run_pipeline(config: PipelineConfig) -> Path:
null_count = products_df["product_id"].isna().sum()
if null_count > 0:
logger.warning(
"Dropping %d product rows with NULL product_id from products table "
"%s.%s.%s before joining.",
"Dropping %d product rows with NULL product_id from products table " "%s.%s.%s before joining.",
null_count,
config.project_id,
config.bq_dataset,
Expand Down Expand Up @@ -230,8 +224,7 @@ def run_pipeline(config: PipelineConfig) -> Path:

if sales_df.empty:
logger.warning(
"No sales data loaded from GCS for range %s to %s. "
"Exporting an empty dataset.",
"No sales data loaded from GCS for range %s to %s. " "Exporting an empty dataset.",
start_date,
end_date,
)
Expand All @@ -258,9 +251,7 @@ def run_pipeline(config: PipelineConfig) -> Path:
"Dropping %d sales rows with NULL product_id before joining.",
null_sales_count,
)
filtered_sales_df = filtered_sales_df[
filtered_sales_df["product_id"].notna()
].copy()
filtered_sales_df = filtered_sales_df[filtered_sales_df["product_id"].notna()].copy()

# 6. Pragmatic dedup: ensure product_id is unique in products_df
if not products_df["product_id"].is_unique:
Expand Down Expand Up @@ -320,5 +311,3 @@ def run_pipeline(config: PipelineConfig) -> Path:

logger.info("Pipeline finished successfully.")
return output_path


5 changes: 5 additions & 0 deletions tests/integration/test_pipeline_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,10 @@ def fake_load_sales_parquet(
"category",
"brand",
"condition",
"processed_at",
"sale_date",
"quarter",
"year",
"month",
}
assert len(df) == 2