feat(destinations): add Google Cloud Storage destination (csv/json/jsonl/parquet + gzip) (closes #169)#623
Merged
Merged
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
5 tasks
Contributor
Author
|
@yodakanohoshi — small ping when you have a window 🙏 GCS destination, Phase B of the v0.8 cloud-storage trio (S3 ✅ / GCS this PR / Azure Blob #624). Thin shim on top of the shared This one's the bigger review out of the four. If timing's tight, prioritising #623 / #624 over the docs PRs (#620 / #621) is fine. |
This was referenced Jun 9, 2026
masukai
added a commit
that referenced
this pull request
Jun 9, 2026
…e to 100% codecov/patch on the prior commit hit 94.73% (target 86.72%) — the gate passed but not at 100%. Uncovered slice was the 3 branches not exercised by the happy-path tests: - **Lines 152-163** — MERGE-path staging INSERT failure handler (the per-row try/except inside the staging-table INSERT loop) - **Line 196** — mirror's ``failed_indices`` skip path inside the ``_mirror_keys`` accumulator (skip rows that didn't make it into the destination so they don't count as "observed in source" for the end-of-sync DELETE) - **Line 202** — the ``Unsupported mode`` defensive fallthrough ValueError (unreachable in normal flow because Pydantic Literal validates at config-load time, but tracked by coverage) 4 new tests: 1. `test_merge_staging_insert_failure_on_error_skip` — first staging INSERT fails, second succeeds; verifies result.failed=1 + row_errors recorded + MERGE still runs against whatever made it into staging. 2. `test_merge_staging_insert_failure_on_error_fail_raises` — same failure scenario but with on_error=fail; verifies the exception re-raises and the connection is still closed via try/finally. 3. `test_unsupported_mode_raises` — manually corrupts ``config.mode`` to "garbage" after Pydantic construction (bypasses Literal validation via ``object.__setattr__``) and verifies the defensive ValueError fires. 4. `test_mirror_skips_failed_keys_from_delete_observed_set` — mirror load with a staging failure on row 1, then finalize_sync; verifies the DELETE's NOT-IN list contains only the survivor's key (id=2), not the failed row's key (id=1). This catches the semantic bug where a row that failed to load would be deleted from the destination on next mirror run. drt/destinations/databricks.py file coverage: 94% → 100% (119/119 stmts). Coverage now matches the S3 / GCS / Azure Blob destinations from #613 / #623 / #624. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
masukai
added a commit
that referenced
this pull request
Jun 9, 2026
…ge/mirror) (closes #167) (#629) * feat(destinations): add Databricks Delta Lake destination (insert/merge/mirror) (closes #167) Third DWH destination alongside Snowflake (#353) and BigQuery (#584 in flight) — completes the major-DWH lineup. Supports the same three modes as Snowflake's leg: - INSERT (append, `config.mode: insert`) - MERGE (upsert via Delta Lake's native MERGE INTO, `config.mode: merge`) - sync.mode: mirror (#340 family — Databricks leg) — MERGE upsert + end-of-sync DELETE-missing Auth via Databricks SQL Connector: - `host_env` — workspace hostname (dbc-*.cloud.databricks.com) - `http_path_env` — SQL warehouse HTTP path (/sql/1.0/warehouses/*) - `token_env` — personal access token (PAT, dapi*) Unity Catalog three-part names (catalog.schema.table) are the default; legacy workspaces use `catalog: hive_metastore`. Merge implementation note: Databricks Delta Lake doesn't have session-local temp tables (no `CREATE TEMP TABLE` syntax), so the merge path creates a uniquely-named scratch Delta table `catalog.schema.__drt_staging_<table>` cloned from the target's schema, stages rows via per-row INSERT, executes MERGE INTO, and DROP TABLEs the staging at the end. The `__drt_staging_*` prefix makes it identifiable in audit logs. The token-bearing principal needs CREATE on the schema in addition to MODIFY on the target. Mirror semantics match the Snowflake leg of #340: - `sync.mode: mirror` forces the MERGE write path regardless of `config.mode` - End-of-sync issues `DELETE FROM <table> WHERE upsert_key NOT IN (observed)` - Composite keys use `WHERE (c1, c2) NOT IN ((v1a, v1b), ...)` form - Safety guard: skips DELETE entirely when no batch produced records 22 unit tests in tests/unit/test_databricks_destination.py cover: - Config validation (schema: YAML alias, three-part FQN in describe(), Hive Metastore catalog) - Empty-batch short-circuit (#595 contract) - databricks.sql.connect() kwargs shape — protects against silent template-copy drift from the Snowflake destination - INSERT happy path + on_error=skip / on_error=fail - MERGE happy path + upsert_key required + composite key ON clause + all-columns-are-key (no UPDATE clause) - Mirror invariants: upsert_key validation, MERGE-write-path forcing, single-column DELETE, composite-key DELETE tuple form, skip-when-no-records safety guard, no-op finalize_sync for non-mirror modes - test_connection round-trip databricks.sql is mocked via sys.modules injection — no real Databricks workspace or databricks-sql-connector install required. Requires `pip install drt-core[databricks]` (depends on databricks-sql-connector>=3.0, already in pyproject extras). New `docs/connectors/databricks.md` covers all three modes, auth flow with PAT generation steps, Unity Catalog vs Hive Metastore, the merge-path staging design (why Delta scratch table and not CREATE TEMP TABLE), and a sync-mode compatibility table. README destination table updated on both English and Japanese sides (Databricks Delta Lake row added after Snowflake, v0.7.9). i18n marker bump for README.ja.md follows the established post-merge housekeeping pattern (#618-style). Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com> * test(databricks): cover MERGE/mirror edge paths to push patch coverage to 100% codecov/patch on the prior commit hit 94.73% (target 86.72%) — the gate passed but not at 100%. Uncovered slice was the 3 branches not exercised by the happy-path tests: - **Lines 152-163** — MERGE-path staging INSERT failure handler (the per-row try/except inside the staging-table INSERT loop) - **Line 196** — mirror's ``failed_indices`` skip path inside the ``_mirror_keys`` accumulator (skip rows that didn't make it into the destination so they don't count as "observed in source" for the end-of-sync DELETE) - **Line 202** — the ``Unsupported mode`` defensive fallthrough ValueError (unreachable in normal flow because Pydantic Literal validates at config-load time, but tracked by coverage) 4 new tests: 1. `test_merge_staging_insert_failure_on_error_skip` — first staging INSERT fails, second succeeds; verifies result.failed=1 + row_errors recorded + MERGE still runs against whatever made it into staging. 2. `test_merge_staging_insert_failure_on_error_fail_raises` — same failure scenario but with on_error=fail; verifies the exception re-raises and the connection is still closed via try/finally. 3. `test_unsupported_mode_raises` — manually corrupts ``config.mode`` to "garbage" after Pydantic construction (bypasses Literal validation via ``object.__setattr__``) and verifies the defensive ValueError fires. 4. `test_mirror_skips_failed_keys_from_delete_observed_set` — mirror load with a staging failure on row 1, then finalize_sync; verifies the DELETE's NOT-IN list contains only the survivor's key (id=2), not the failed row's key (id=1). This catches the semantic bug where a row that failed to load would be deleted from the destination on next mirror run. drt/destinations/databricks.py file coverage: 94% → 100% (119/119 stmts). Coverage now matches the S3 / GCS / Azure Blob destinations from #613 / #623 / #624. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
…onl/parquet + gzip) (closes #169) GCS destination — the natural pair for BigQuery users and the second of the v0.8 cloud-storage trio (S3 / GCS / Azure Blob). Same shape as the S3 destination from #613 — four formats (csv / json / jsonl / parquet) with optional gzip compression for the text formats (parquet keeps its own column-level compression via `parquet_compression`). Implementation is a thin `_client` + `blob.upload_from_string` shim on top of `drt/destinations/_blob_serializer.py` from #622 — the csv/json/jsonl/parquet + gzip + key-naming logic is shared with S3, not duplicated. Azure Blob (#170) lands next on the same module. Authentication: - Default: Application Default Credentials chain (GOOGLE_APPLICATION_CREDENTIALS env → `gcloud auth application-default login` → GCE/GKE/Cloud Run attached SA) - Override: `credentials_path` for non-GCP CI / cron environments - `project_id` optional (the keyfile usually carries one) Failure semantics: - [gcs] missing-extras → ImportError bubbles up (deployment mistake, surfaced once at the top by the engine) - [parquet] missing-extras → row failure with the `drt-core[parquet]` install hint preserved (matches S3) - Upload errors (network / auth / permissions) → row failures so other batches keep going - Empty batches short-circuit before any `google.cloud` import or GCS call (implicit "no driver was imported" contract from #595) 17 unit tests in `tests/unit/test_gcs_destination.py` cover config validation, the empty-batch short-circuit, `blob.upload_from_string` call shape per format, gzip → `blob.content_encoding = "gzip"` + `.gz` extension, ADC vs `project_id` vs `credentials_path` threading, key-template overrides, and the failure paths. The parquet orchestration test mocks `pandas` + `pyarrow` rather than calling them end-to-end — the S3 destination's parquet test already validates the real PAR1 binary, and double-registering the pyarrow type extension across two test classes raises `A type extension with name pandas.period already defined`. `docs/connectors/gcs.md` covers all four formats, the ADC / service-account auth flow, the object-naming convention, and notes on BigQuery external-table interop. README destination table updated on both English and Japanese sides. i18n marker bump for README.ja.md → README.md follow-up via the established post-merge housekeeping pattern (#618-style). Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
…inventory Surfaced by `make check-drift` after rebasing onto main (the drift-audit infra from #630 landed while this PR was open). GCS was registered in the connector registry but missing from two surfaces the audit tracks: - `/drt-create-sync` skill destinations list (as "Google Cloud Storage (GCS)" — the "(GCS)" suffix is load-bearing for the drift matcher, which looks for the `gcs` type key as a substring) - `drt_list_connectors` MCP inventory `make check-drift` now exits 0 on this branch, so GCS lands already consistent across docs / skill / MCP rather than tripping the weekly audit after merge. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
540abe2 to
63fabc6
Compare
masukai
added a commit
that referenced
this pull request
Jun 11, 2026
…l/parquet + gzip) (closes #170) Completes the v0.8 cloud-storage trio (S3 #168 / GCS #169 / Azure Blob #170). Same shape as the S3 destination from #613 and the GCS destination from #623 — four formats (csv/json/jsonl/parquet) with optional gzip for the text formats. Implementation is a thin `_service_client` + `blob_client.upload_blob` shim on top of `drt/destinations/_blob_serializer.py` from #622 — the csv/json/jsonl/parquet + gzip + key-naming logic is shared with S3 and GCS, not duplicated. Authentication offers two paths (exactly one must be set; config errors raise immediately): 1. Connection string via `connection_string_env` pointing at an env var holding the storage-account connection string — the right shape for CI / cron / non-Azure deployments. 2. DefaultAzureCredential via `account_url` pointing at the storage-account blob endpoint — the right shape for Azure-hosted apps with managed identity (App Service / AKS / Container Apps / VMs / Functions), plus local dev via `az login`. Content-Type and Content-Encoding metadata are carried on the blob via Azure's `ContentSettings` (the Azure equivalent of S3's `ContentType` / `ContentEncoding` put_object kwargs). `upload_blob` is called with `overwrite=True` — irrelevant under the default timestamped key (a fresh blob name every run) and intentional for `key_template` users who set a fixed name like `latest.csv`. Failure semantics: - [azure] missing-extras → ImportError bubbles up (deployment mistake, surfaced once at the top by the engine) - [parquet] missing-extras → row failure with the `drt-core[parquet]` install hint preserved (matches S3 / GCS) - Upload errors (network / auth / permissions) → row failures so other batches keep going - Config errors (empty `connection_string_env`, neither auth path set) → ValueError raised immediately - Empty batches short-circuit before any `azure.storage.blob` import or Azure call (implicit "no driver was imported" contract from #595) 18 unit tests in `tests/unit/test_azure_blob_destination.py` cover config validation, the empty-batch short-circuit, `upload_blob` call shape per format, gzip → `ContentSettings(content_type=..., content_encoding="gzip")` + `.gz` extension, both auth paths (connection-string env-var resolution + DefaultAzureCredential threading), the negative auth paths, key-template overrides, and the failure paths. The parquet orchestration test mocks `pandas` + `pyarrow` rather than calling them end-to-end — same `A type extension with name pandas.period already defined` rationale as the GCS suite (#623). `docs/connectors/azure-blob.md` covers all four formats, both auth paths with example connection-string and managed-identity flows, the blob-naming convention, and Content-Encoding interop notes for Azure Data Factory / Synapse / Databricks. README destination table updated on both English and Japanese sides. `[azure]` extras: `azure-storage-blob>=12.0` + `azure-identity>=1.15`. i18n marker bump for README.ja.md → README.md follow-up via the established post-merge housekeeping pattern (#618-style). Conflict expected against #623 (GCS) on `drt/config/models.py` / `drt/connectors/registry.py` / README destination tables / CHANGELOG — all are adjacent-line conflicts that resolve trivially during rebase. The destinations themselves don't touch each other. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
GCS destination — the natural pair for BigQuery users and the second of the v0.8 cloud-storage trio (S3 / GCS / Azure Blob). Same shape as the S3 destination from #613.
Phase B of the v0.8.0 cloud-storage work:
_blob_serializer.pyrefactor (merged refactor(destinations): extract shared blob serialiser for S3/GCS/Azure (prep #169 #170) #622)Implementation
Thin
_client+blob.upload_from_stringshim on top ofdrt/destinations/_blob_serializer.py— the csv / json / jsonl / parquet + gzip + key-naming logic is shared with S3, not duplicated.Formats: csv / json / jsonl / parquet, optional gzip for text formats.
Auth:
GOOGLE_APPLICATION_CREDENTIALSenv →gcloud auth application-default login→ GCE/GKE/Cloud Run attached SA)credentials_pathfor non-GCP CI / cron environmentsproject_idoptional (the keyfile usually carries one)Failure semantics:
[gcs]missing-extras →ImportErrorbubbles up (deployment mistake, surfaced once at the top by the engine)[parquet]missing-extras → row failure withdrt-core[parquet]hint preserved (matches S3)google.cloudimport or GCS call (implicit "no driver was imported" contract from feat(tests): SQL destination contract tests (Step 2b of #364 follow-up) #595)Example
For BigQuery external-table interop:
Tests
17 unit tests in
tests/unit/test_gcs_destination.py:gs://scheme)google.cloudimport)blob.content_encoding = "gzip"+.jsonl.gzextension + body decompresses correctlyproject_idvscredentials_path[gcs]missing-extras → ImportError bubble,[parquet]missing-extras → row failure with hint, serialisation error → row failures (no upload)Why the parquet test mocks pandas/pyarrow
The S3 destination's
test_parquet_uploads_binary_body_with_octet_streamalready validates the real PAR1 binary end-to-end. If GCS did the same, the two test classes would both calldf.to_parquet(...)and pyarrow raisesA type extension with name pandas.period already definedon the second registration. Mocked-pandas pattern matches the existingtest_parquet_orchestration_with_mocked_pandas_runs_on_ciin the S3 suite.Test plan
pytest tests/unit/test_gcs_destination.py— 17 passedpytest tests/unit/test_gcs_destination.py tests/unit/test_s3_destination.py tests/unit/test_blob_serializer.py— 61 passed (no cross-test pollution)make lint— ruff + mypy all greenDocs
docs/connectors/gcs.md— full reference (formats, auth, naming, sync modes, notes)i18n marker bump for README.ja.md follows the established post-merge housekeeping pattern (same as #618 for #613 S3).
CHANGELOG
[Unreleased] → Addedentry above the S3 entry.Related
🤖 Generated with Claude Code