From 713203ad753345d3f8851c1cae889187678ee5a7 Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Fri, 21 Nov 2025 12:42:31 +0100 Subject: [PATCH 1/3] Snapshot fixes + use managed tables for spark examples + added unittests --- Makefile.dev | 2 +- docs/Data_Quality_Tests.md | 2 +- docs/examples/Incremental_Demo.md | 266 ++++++------------ docs/examples/Snapshot_Demo.md | 24 +- docs/index.md | 16 +- examples/api_demo/.env.dev_databricks | 6 +- examples/api_demo/project.yml | 15 +- examples/cache_demo/project.yml | 13 +- examples/dq_demo/project.yml | 12 +- .../.env.dev_databricks_delta | 7 +- .../.env.dev_databricks_iceberg | 7 +- .../.env.dev_databricks_parquet | 14 + examples/incremental_demo/Makefile | 4 +- examples/incremental_demo/README.md | 7 +- examples/incremental_demo/profiles.yml | 24 +- examples/incremental_demo/project.yml | 23 +- examples/incremental_demo/site/dag/index.html | 3 + examples/macros_demo/project.yml | 15 +- examples/materializations_demo/project.yml | 13 +- examples/snapshot_demo/.env.dev_databricks | 16 -- .../snapshot_demo/.env.dev_databricks_delta | 9 +- .../snapshot_demo/.env.dev_databricks_iceberg | 4 +- .../snapshot_demo/.env.dev_databricks_parquet | 14 + examples/snapshot_demo/Makefile | 2 +- examples/snapshot_demo/README.md | 6 +- examples/snapshot_demo/profiles.yml | 30 +- examples/snapshot_demo/project.yml | 15 +- pyproject.toml | 2 +- pytest.ini | 2 +- .../executors/databricks_spark.py | 31 +- .../executors/snowflake_snowpark.py | 2 + .../table_formats/spark_delta.py | 136 ++++++++- .../table_formats/spark_iceberg.py | 101 ++++++- tests/common/fixtures.py | 200 ++++++++++--- tests/common/mock/bigquery.py | 76 ++++- tests/common/mock/snowflake_snowpark.py | 53 ++++ tests/common/snapshot_helpers.py | 258 +++++++++++++++++ tests/integration/examples/config.py | 4 +- .../test_snapshots_bigquery_integration.py | 144 ++++++++++ ..._snapshots_databricks_spark_integration.py | 111 ++++++++ .../test_snapshots_duckdb_integration.py | 97 +++++++ .../test_snapshots_postgres_integration.py | 125 ++++++++ ...napshots_snowflake_snowpark_integration.py | 116 ++++++++ .../executors/test_snowflake_snowpark_exec.py | 62 ++-- uv.lock | 2 +- 45 files changed, 1644 insertions(+), 447 deletions(-) create mode 100644 examples/incremental_demo/.env.dev_databricks_parquet delete mode 100644 examples/snapshot_demo/.env.dev_databricks create mode 100644 examples/snapshot_demo/.env.dev_databricks_parquet create mode 100644 tests/common/mock/snowflake_snowpark.py create mode 100644 tests/common/snapshot_helpers.py create mode 100644 tests/integration/executors/bigquery/test_snapshots_bigquery_integration.py create mode 100644 tests/integration/executors/databricks_spark/test_snapshots_databricks_spark_integration.py create mode 100644 tests/integration/executors/duckdb/test_snapshots_duckdb_integration.py create mode 100644 tests/integration/executors/postgres/test_snapshots_postgres_integration.py create mode 100644 tests/integration/executors/snowflake_snowpark/test_snapshots_snowflake_snowpark_integration.py diff --git a/Makefile.dev b/Makefile.dev index 95c2918..89f9cda 100644 --- a/Makefile.dev +++ b/Makefile.dev @@ -31,7 +31,7 @@ utest-duckdb: # Lint & format helpers fmt: - $(UV) run ruff format src tests + $(UV) run ruff format lint: $(UV) run ruff check src tests --no-cache diff --git a/docs/Data_Quality_Tests.md b/docs/Data_Quality_Tests.md index a92fa45..393ee5c 100644 --- a/docs/Data_Quality_Tests.md +++ b/docs/Data_Quality_Tests.md @@ -20,7 +20,7 @@ The following values are currently supported for `type`: In addition, you can register **custom tests** (Python or SQL) with any logical name (e.g. `min_positive_share`, `no_future_orders`) and use that name in -`project.yml β†’ tests:`. See [Custom DQ Tests (Python & SQL)](#custom-dq-tests-python--sql). +`project.yml β†’ tests:`. See [Custom DQ Tests (Python & SQL)](#custom-dq-tests-python-sql). ## Usage Overview diff --git a/docs/examples/Incremental_Demo.md b/docs/examples/Incremental_Demo.md index 2182460..90891da 100644 --- a/docs/examples/Incremental_Demo.md +++ b/docs/examples/Incremental_Demo.md @@ -22,6 +22,7 @@ incremental_demo/ .env .env.dev_duckdb .env.dev_postgres + .env.dev_databricks_parquet .env.dev_databricks_delta .env.dev_databricks_iceberg .env.dev_bigquery_pandas @@ -383,151 +384,69 @@ The executor uses the `meta.incremental` / `meta.unique_key` / `meta.updated_at` ## Delta & Iceberg variants (Databricks / Spark) -In addition to the β€œregular” incremental models, the demo also includes **Delta Lake** and **Iceberg** variants -that shows how to: +The Databricks/Spark executor can materialize the same incremental models as **managed parquet**, +**Delta Lake**, or **Iceberg** tables. Instead of wiring storage paths in `project.yml`, each table +format gets its own Databricks profile with a dedicated database and warehouse. -- route a model to **Delta tables** via `project.yml` -- reuse the same incremental pattern, but with a **Delta-backed** table on Databricks/Spark -- keep Parquet and Delta models side-by-side in the same project +### Managed storage per format -This is optional and only relevant for the `databricks_spark` engine. - ---- - -### Storage configuration for the Delta / Iceberg models - -In `project.yml`, the Delta variant gets its own storage entry, separate from the Parquet fact table: +`profiles.yml` ships with three out-of-the-box profiles: ```yaml -models: - storage: - # Existing Parquet fact table - fct_events_sql_inline: - path: ".local/spark/fct_events_sql_inline" - format: parquet - - # πŸ”Ή Delta-based fact table (Spark/Databricks only) - fct_events_sql_inline_delta: - path: ".local/spark_delta/fct_events_sql_inline" - format: delta - - # ❄️ Iceberg-based fact table (Spark 4 / Databricks only) - fct_events_sql_inline_iceberg: - # Points into the Iceberg warehouse; must match your Iceberg catalog config - path: ".local/iceberg_warehouse/incremental_demo/fct_events_sql_inline" - format: iceberg -```` - -Notes: - -* The key `fct_events_sql_inline_delta` must match the **model name**. -* `format: delta` tells the Databricks/Spark executor to create `USING DELTA LOCATION ...`. -* The path is different from the Parquet path so artifacts don’t clash. - ---- - -### Delta fact model - -The Delta fact model is a close sibling of `fct_events_sql_inline.ff.sql`, but: - -* is tagged only for the Databricks/Spark engine -* is configured for incremental **merge** with a `unique_key` + `updated_at` column - -Example (conceptual) model: - -```sql --- models/common/fct_events_sql_inline_delta.ff.sql - -{{ config( - materialized='table', - tags=[ - 'example:incremental_demo', - 'kind:incremental', - 'engine:databricks_spark', - ], - meta={ - 'incremental': True, - 'unique_key': ['event_id'], - 'updated_at': 'updated_at', - 'delta': { - 'sql': " - with base as ( - select event_id, updated_at, value - from {{ ref('events_base.ff') }} - ) - select - event_id, - updated_at, - value - from base - where updated_at > ( - select coalesce(max(updated_at), timestamp '1970-01-01 00:00:00') - from {{ this }} - ) - " - }, - }, -) }} - --- canonical full-select (used for docs / full-refresh) -select - event_id, - updated_at, - value -from {{ ref('events_base.ff') }}; +dev_databricks_parquet: + engine: databricks_spark + databricks_spark: + database: incremental_demo_parquet + warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_parquet" + +dev_databricks_delta: + engine: databricks_spark + databricks_spark: + database: incremental_demo_delta + warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_delta" + +dev_databricks_iceberg: + engine: databricks_spark + databricks_spark: + database: incremental_demo_iceberg + warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_iceberg" + table_format: iceberg + extra_conf: + spark.sql.catalog.iceberg: org.apache.iceberg.spark.SparkCatalog + spark.sql.catalog.iceberg.type: hadoop + spark.sql.catalog.iceberg.warehouse: "file://{{ project_dir() }}/.local/iceberg_warehouse" ``` -What happens: - -* On the **first run**, the engine sees no existing table and does a full materialization - (a Delta table at `.local/spark_delta/fct_events_sql_inline`). -* On **subsequent runs**, the executor uses the `delta.sql` query as the **incremental delta** and: - - * attempts a `MERGE INTO` for Delta tables, or - * falls back to a full-refresh strategy if MERGE is not supported. +Because every format has its own schema/warehouse, you can flip `FF_DBR_TABLE_FORMAT` or swap +profiles without worrying about leftover Parquet metadata interfering with a Delta run (and vice +versa). ---- - -### Running the Delta variant +### Running the Spark formats -Once your Databricks/Spark profile is configured (e.g. `dev_databricks` in `profiles.yml` and `.env.dev_databricks`), -you can run the Delta model like any other: +Pick the `.env.dev_databricks_*` file that matches your target format, export it, and run the demo: ```bash # From the repo root cd examples/incremental_demo -# Seed -FFT_ACTIVE_ENV=dev_databricks fft seed . - -# Run only the Delta variant -FFT_ACTIVE_ENV=dev_databricks fft run . \ - --select fct_events_sql_inline_delta.ff \ - --select tag:engine:databricks_spark - -# Or include it in the general incremental demo selection -FFT_ACTIVE_ENV=dev_databricks fft run . \ - --select tag:example:incremental_demo \ - --select tag:engine:databricks_spark -``` - -Optionally, you can add a small `not_null` test to `project.yml` to verify the Delta model: - -```yaml -tests: - - type: not_null - table: fct_events_sql_inline_delta - column: event_id - tags: [batch, delta] -``` +# Parquet tables (default) +set -a; source .env.dev_databricks_parquet; set +a +FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft run . \ + --select tag:example:incremental_demo --select tag:engine:databricks_spark -Then run: +# Delta Lake tables +set -a; source .env.dev_databricks_delta; set +a +FFT_ACTIVE_ENV=dev_databricks_delta FF_DBR_TABLE_FORMAT=delta fft run . \ + --select tag:example:incremental_demo --select tag:engine:databricks_spark -```bash -FFT_ACTIVE_ENV=dev_databricks fft test . --select tag:delta +# Iceberg tables +set -a; source .env.dev_databricks_iceberg; set +a +FFT_ACTIVE_ENV=dev_databricks_iceberg FF_DBR_TABLE_FORMAT=iceberg fft run . \ + --select tag:example:incremental_demo --select tag:engine:databricks_spark ``` -to validate the Delta-backed incremental table specifically. +`FF_DBR_TABLE_FORMAT` is optional when the profile already sets `databricks_spark.table_format`, but +passing it explicitly makes the intent clear in logs and CI. --- @@ -605,10 +524,10 @@ Make sure `.env.dev_snowflake` sets the required `FF_SF_*` variables and install ### Databricks Spark ```bash -FFT_ACTIVE_ENV=dev_databricks fft seed . -FFT_ACTIVE_ENV=dev_databricks fft run . \ +FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft seed . +FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft run . \ --select tag:example:incremental_demo --select tag:engine:databricks_spark -FFT_ACTIVE_ENV=dev_databricks fft test . \ +FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft test . \ --select tag:example:incremental_demo ```` @@ -616,77 +535,52 @@ FFT_ACTIVE_ENV=dev_databricks fft test . \ You can run the incremental demo on Databricks/Spark against either **parquet** or **Delta** tables. -FFT reads the desired table format from the `FF_DBR_TABLE_FORMAT` environment variable, which overrides -`databricks_spark.table_format` from `profiles.yml`. - -When `FF_DBR_TABLE_FORMAT=delta`, the Databricks/Spark executor automatically wires Delta Lake into the -SparkSession (downloads the Maven artifact via `delta-spark`, adds -`spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension`, and sets -`spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog` unless you already -overrode those settings). No extra `spark-submit --conf` flags are neededβ€”just ensure the -`delta-spark >= 4.0` Python package is installed. - -From the repo root: - -```bash -cd examples/incremental_demo -```` +Each format has its own profile/database (`dev_databricks_parquet` β†’ `incremental_demo_parquet`, +`dev_databricks_delta` β†’ `incremental_demo_delta`), so cleanup and reruns never reuse stale metadata. +`FF_DBR_TABLE_FORMAT` still overrides `databricks_spark.table_format` if you want to switch formats +without changing profiles. Run with **parquet** tables (default): ```bash -FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks fft seed . -FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks fft run . \ +FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft seed . +FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft run . \ --select tag:example:incremental_demo --select tag:engine:databricks_spark -FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks fft test . \ +FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft test . \ --select tag:example:incremental_demo ``` Run with **Delta** tables: ```bash -FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks fft seed . -FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks fft run . \ +FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks_delta fft seed . +FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks_delta fft run . \ --select tag:example:incremental_demo --select tag:engine:databricks_spark -FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks fft test . \ +FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks_delta fft test . \ --select tag:example:incremental_demo ``` -This way you can switch between parquet and Delta just by changing the `FF_DBR_TABLE_FORMAT` -environment variable, without touching the models or project.yml. - -Adjust environment names to match your `profiles.yml`. +`delta-spark >= 4.0` must be installed locally so the executor can wire Delta extensions into +SparkSession automatically (the profile/CLI already sets the required Spark configs). ### Databricks Spark (Iceberg / Spark 4+) If you are on Spark 4 / Databricks with Iceberg support, you can also run the incremental demo -purely against Iceberg tables using a dedicated profile (for example `dev_databricks_iceberg`). - -That profile typically: - -* uses `engine: databricks_spark` -* sets `databricks_spark.table_format: iceberg` -* configures an Iceberg catalog via `extra_conf`, for example: - - models: - storage: - # Example warehouse location, adjust as needed - fct_events_sql_inline_iceberg: - path: ".local/iceberg_warehouse/incremental_demo/fct_events_sql_inline" - format: iceberg - -and in the profile (profiles.yml) something like: - - dev_databricks_iceberg: - engine: databricks_spark - databricks_spark: - master: "local[*]" - app_name: "incremental_demo" - warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse" - extra_conf: - spark.sql.catalog.iceberg: org.apache.iceberg.spark.SparkCatalog - spark.sql.catalog.iceberg.type: hadoop - spark.sql.catalog.iceberg.warehouse: "file:///{{ project_dir() }}/.local/iceberg_warehouse" +purely against Iceberg tables using `dev_databricks_iceberg`. That profile: + +```yaml +dev_databricks_iceberg: + engine: databricks_spark + databricks_spark: + database: incremental_demo_iceberg + warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_iceberg" + table_format: iceberg + extra_conf: + spark.jars.packages: org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0 + spark.sql.catalog.iceberg: org.apache.iceberg.spark.SparkCatalog + spark.sql.catalog.iceberg.type: hadoop + spark.sql.catalog.iceberg.warehouse: "file://{{ project_dir() }}/.local/iceberg_warehouse" +``` From the repo root: @@ -694,12 +588,12 @@ From the repo root: Run seeds and models against Iceberg: - FFT_ACTIVE_ENV=dev_databricks_iceberg fft seed . + FF_DBR_TABLE_FORMAT=iceberg FFT_ACTIVE_ENV=dev_databricks_iceberg fft seed . - FFT_ACTIVE_ENV=dev_databricks_iceberg fft run . \ + FF_DBR_TABLE_FORMAT=iceberg FFT_ACTIVE_ENV=dev_databricks_iceberg fft run . \ --select tag:example:incremental_demo --select tag:engine:databricks_spark - FFT_ACTIVE_ENV=dev_databricks_iceberg fft test . \ + FF_DBR_TABLE_FORMAT=iceberg FFT_ACTIVE_ENV=dev_databricks_iceberg fft test . \ --select tag:example:incremental_demo Under this profile, all `ref()` / `source()` calls in Spark SQL and Python models are resolved diff --git a/docs/examples/Snapshot_Demo.md b/docs/examples/Snapshot_Demo.md index 912f684..d05ac0b 100644 --- a/docs/examples/Snapshot_Demo.md +++ b/docs/examples/Snapshot_Demo.md @@ -21,8 +21,8 @@ The snapshot demo is intentionally tiny and mirrors the basic demo structure: | `models/staging/users_clean.ff.sql` | Same as in the basic demo: cleans emails, casts types, derives `email_domain`. | | `models/marts/mart_users_by_domain.ff.sql` | Same as in the basic demo: aggregates users per email domain. | | `models/snapshots/users_clean_snapshot.ff.sql` | **New:** snapshot model that captures slowly changing history of `users_clean.ff`. | -| `profiles.yml` | Reused from the basic demo: defines `dev_duckdb`, `dev_postgres`, `dev_databricks_delta`, `dev_databricks_iceberg`, `dev_bigquery`. | -| `.env.dev_*` | Engine-specific environment files (`.env.dev_duckdb`, `.env.dev_postgres`, `.env.dev_databricks_delta`, `.env.dev_databricks_iceberg`). | +| `profiles.yml` | Reused from the basic demo: defines `dev_duckdb`, `dev_postgres`, `dev_databricks_parquet`, `dev_databricks_delta`, `dev_databricks_iceberg`, `dev_bigquery`. | +| `.env.dev_*` | Engine-specific environment files (`.env.dev_duckdb`, `.env.dev_postgres`, `.env.dev_databricks_parquet`, `.env.dev_databricks_delta`, `.env.dev_databricks_iceberg`). | | `Makefile` | Adds snapshot-aware targets on top of the usual `seed` / `run` / `test` / `dag`. | ### The snapshot model @@ -96,8 +96,9 @@ Assuming you’ve already wired `examples/snapshot_demo/Makefile` similarly to t # set -a; source .env.dev_postgres; set +a # Or Databricks - # Delta/parquet: set -a; source .env.dev_databricks_delta; set +a - # Iceberg: set -a; source .env.dev_databricks_iceberg; set +a + # Parquet: set -a; source .env.dev_databricks_parquet; set +a + # Delta: set -a; source .env.dev_databricks_delta; set +a + # Iceberg: set -a; source .env.dev_databricks_iceberg; set +a # (optionally export FF_DBR_TABLE_FORMAT=delta|iceberg to override the table format) # Or BigQuery (requires GCP setup) @@ -128,15 +129,20 @@ Assuming you’ve already wired `examples/snapshot_demo/Makefile` similarly to t Just like the incremental demo, the snapshot project lets you flip Spark table formats without editing models. Pass `DBR_TABLE_FORMAT=parquet|delta|iceberg` to `make snapshot_demo` or export -`FF_DBR_TABLE_FORMAT` when invoking `fft` directly. The `dev_databricks_delta` profile uses the same -Hive-compatible metastore as before, while `dev_databricks_iceberg` wires in an Iceberg catalog -(`spark.jars.packages` / `spark.sql.catalog.iceberg.*`). When running locally you still need the -matching Python packages (for example `pip install delta-spark` for Delta Lake and the Iceberg -runtime JARs bundled via the profile). +`FF_DBR_TABLE_FORMAT` when invoking `fft` directly. `dev_databricks_parquet`, +`dev_databricks_delta`, and `dev_databricks_iceberg` each point to their own managed database / +warehouse (`snapshot_demo_parquet`, `snapshot_demo_delta`, `snapshot_demo_iceberg`), so switching +formats never reuses stale Hive metadata. The Iceberg profile wires in the catalog via +`spark.sql.catalog.iceberg.*`; Delta still requires the `delta-spark` package. Manual CLI examples: ```bash +# Parquet snapshots +FF_DBR_TABLE_FORMAT=parquet \ + FFT_ACTIVE_ENV=dev_databricks_parquet FF_ENGINE=databricks_spark \ + fft snapshot run . --select tag:example:snapshot_demo --select tag:engine:databricks_spark + # Delta Lake snapshots FF_DBR_TABLE_FORMAT=delta \ FFT_ACTIVE_ENV=dev_databricks_delta FF_ENGINE=databricks_spark \ diff --git a/docs/index.md b/docs/index.md index 20034b2..e0c2003 100644 --- a/docs/index.md +++ b/docs/index.md @@ -9,13 +9,13 @@ Use this page as the front door into the docs: start with the orientation sectio ## Table of Contents - [Quick Orientation](#quick-orientation) -- [Build & Run Projects](#build--run-projects) -- [Modeling & Configuration](#modeling--configuration) -- [Execution & State Management](#execution--state-management) -- [Testing & Data Quality](#testing--data-quality) -- [Docs, Debugging & Operations](#docs-debugging--operations) -- [Examples & Tutorials](#examples--tutorials) -- [Reference & Contribution](#reference--contribution) +- [Build & Run Projects](#build-run-projects) +- [Modeling & Configuration](#modeling-configuration) +- [Execution & State Management](#execution-state-management) +- [Testing & Data Quality](#testing-data-quality) +- [Docs, Debugging & Operations](#docs-debugging-operations) +- [Examples & Tutorials](#examples-tutorials) +- [Reference & Contribution](#reference-contribution) - [Need Help?](#need-help) --- @@ -85,7 +85,7 @@ All demos live in the top-level `examples/` directory and ship with Makefiles pl ## Reference & Contribution -- **API reference:** Browse the generated [API Reference](reference/) (MkDocStrings) for public functions, classes, and executors under `src/fastflowtransform`. +- **API reference:** Browse the generated [API Reference](reference/index.md) (MkDocStrings) for public functions, classes, and executors under `src/fastflowtransform`. - **Architecture internals:** The [Technical Overview](Technical_Overview.md#part-ii-architecture-internals) dives into registries, DAG building, validation, and engine abstractions. - **Contributing:** Follow [Contributing.md](Contributing.md) for dev environment setup (`uv`, `pyproject.toml`), coding standards, tests, and PR expectations. - **License:** Apache 2.0 β€” see [License.md](License.md). diff --git a/examples/api_demo/.env.dev_databricks b/examples/api_demo/.env.dev_databricks index 1d8876d..724e570 100644 --- a/examples/api_demo/.env.dev_databricks +++ b/examples/api_demo/.env.dev_databricks @@ -8,7 +8,5 @@ FF_DBR_DATABASE=api_demo # Uncomment to switch to Delta Lake (requires delta-spark dependency) # FF_DBR_TABLE_FORMAT=delta -# Prefer an existing JAVA_HOME (e.g., in CI); fall back to the macOS brew path for local use. -if [ -z "${JAVA_HOME:-}" ] && [ -d "/opt/homebrew/opt/openjdk@17" ]; then - JAVA_HOME=/opt/homebrew/opt/openjdk@17 -fi +# Configure Java for local Spark sessions when needed +# JAVA_HOME=/opt/homebrew/opt/openjdk@17 diff --git a/examples/api_demo/project.yml b/examples/api_demo/project.yml index 2aa9492..13f8276 100644 --- a/examples/api_demo/project.yml +++ b/examples/api_demo/project.yml @@ -3,20 +3,7 @@ version: "0.1" vars: {} -models: - storage: - users: - path: ".local/spark/users" - format: parquet - api_users_http: - path: ".local/spark/api_users_http" - format: parquet - api_users_requests: - path: ".local/spark/api_users_requests" - format: parquet - mart_users_join.ff: - path: ".local/spark/mart_users_join" - format: parquet +models: {} seeds: storage: diff --git a/examples/cache_demo/project.yml b/examples/cache_demo/project.yml index fda1476..77a0e83 100644 --- a/examples/cache_demo/project.yml +++ b/examples/cache_demo/project.yml @@ -3,18 +3,7 @@ version: "0.1" vars: {} -models: - storage: - stg_users.ff: - path: ".local/spark/stg_users" - stg_orders.ff: - path: ".local/spark/stg_orders" - mart_user_orders.ff: - path: ".local/spark/mart_user_orders" - py_constants: - path: ".local/spark/py_constants" - http_users: - path: ".local/spark/http_users" +models: {} seeds: storage: diff --git a/examples/dq_demo/project.yml b/examples/dq_demo/project.yml index 750e755..8806dd4 100644 --- a/examples/dq_demo/project.yml +++ b/examples/dq_demo/project.yml @@ -3,17 +3,7 @@ version: "0.1" vars: {} -models: - storage: - customers.ff: - path: ".local/spark/customers" - format: parquet - orders.ff: - path: ".local/spark/orders" - format: parquet - mart_orders_agg.ff: - path: ".local/spark/mart_orders_agg" - format: parquet +models: {} seeds: storage: diff --git a/examples/incremental_demo/.env.dev_databricks_delta b/examples/incremental_demo/.env.dev_databricks_delta index 03fa4fb..30fefca 100644 --- a/examples/incremental_demo/.env.dev_databricks_delta +++ b/examples/incremental_demo/.env.dev_databricks_delta @@ -1,11 +1,10 @@ # Databricks Spark profile defaults for incremental demo FF_SPARK_MASTER=local[*] -FF_SPARK_APP_NAME=incremental_demo +FF_SPARK_APP_NAME=incremental_demo_delta # Managed table configuration (Hive-compatible Spark session) FF_DBR_ENABLE_HIVE=1 -FF_DBR_DATABASE=incremental_demo -# Optional: Delta Lake -# FF_DBR_TABLE_FORMAT=delta +FF_DBR_DATABASE=incremental_demo_delta +FF_DBR_TABLE_FORMAT=delta JAVA_HOME=/opt/homebrew/opt/openjdk@17 diff --git a/examples/incremental_demo/.env.dev_databricks_iceberg b/examples/incremental_demo/.env.dev_databricks_iceberg index 03fa4fb..5311d13 100644 --- a/examples/incremental_demo/.env.dev_databricks_iceberg +++ b/examples/incremental_demo/.env.dev_databricks_iceberg @@ -1,11 +1,10 @@ # Databricks Spark profile defaults for incremental demo FF_SPARK_MASTER=local[*] -FF_SPARK_APP_NAME=incremental_demo +FF_SPARK_APP_NAME=incremental_demo_iceberg # Managed table configuration (Hive-compatible Spark session) FF_DBR_ENABLE_HIVE=1 -FF_DBR_DATABASE=incremental_demo -# Optional: Delta Lake -# FF_DBR_TABLE_FORMAT=delta +FF_DBR_DATABASE=incremental_demo_iceberg +FF_DBR_TABLE_FORMAT=iceberg JAVA_HOME=/opt/homebrew/opt/openjdk@17 diff --git a/examples/incremental_demo/.env.dev_databricks_parquet b/examples/incremental_demo/.env.dev_databricks_parquet new file mode 100644 index 0000000..143fbd8 --- /dev/null +++ b/examples/incremental_demo/.env.dev_databricks_parquet @@ -0,0 +1,14 @@ +# Databricks Spark (parquet) defaults for the incremental demo +FF_SPARK_MASTER=local[*] +FF_SPARK_APP_NAME=incremental_demo_parquet + +# Managed Hive-compatible metastore/database for parquet tables +FF_DBR_ENABLE_HIVE=1 +FF_DBR_DATABASE=incremental_demo_parquet +# Optional: Unity Catalog overrides +# FF_DBR_CATALOG=hive_metastore + +# Configure Java for local Spark sessions when needed +# JAVA_HOME=/opt/homebrew/opt/openjdk@17 + +# Leave FF_DBR_TABLE_FORMAT unset (defaults to parquet unless overridden) diff --git a/examples/incremental_demo/Makefile b/examples/incremental_demo/Makefile index efe8e10..09629af 100644 --- a/examples/incremental_demo/Makefile +++ b/examples/incremental_demo/Makefile @@ -33,15 +33,15 @@ endif ifeq ($(ENGINE),databricks_spark) ENGINE_TAG = engine:databricks_spark # Choose profile based on table format so we can have separate configs: + # - dev_databricks_parquet (managed parquet) # - dev_databricks_delta (Delta Lake) # - dev_databricks_iceberg (Iceberg) - # - dev_databricks (generic / parquet) ifeq ($(DBR_TABLE_FORMAT),delta) PROFILE_ENV = dev_databricks_delta else ifeq ($(DBR_TABLE_FORMAT),iceberg) PROFILE_ENV = dev_databricks_iceberg else - PROFILE_ENV = dev_databricks_delta + PROFILE_ENV = dev_databricks_parquet endif endif ifeq ($(ENGINE),bigquery) diff --git a/examples/incremental_demo/README.md b/examples/incremental_demo/README.md index 400823f..6b24c75 100644 --- a/examples/incremental_demo/README.md +++ b/examples/incremental_demo/README.md @@ -4,7 +4,12 @@ Small FFT example that showcases incremental models and Delta/Iceberg-style merg across DuckDB, Postgres, Databricks Spark, BigQuery (pandas or BigFrames), and Snowflake Snowpark. ## How to use -- Fill an `.env.dev_*` for your engine (DuckDB/Postgres/Databricks/BigQuery/Snowflake). For BigQuery use `.env.dev_bigquery_pandas` or `.env.dev_bigquery_bigframes`; for Snowflake use `.env.dev_snowflake`. +- Fill an `.env.dev_*` for your engine (DuckDB/Postgres/Databricks/BigQuery/Snowflake). For Databricks pick the format-specific files: + - `.env.dev_databricks_parquet` + - `.env.dev_databricks_delta` + - `.env.dev_databricks_iceberg` + Each profile uses its own managed database/warehouse so switching formats never reuses stale tables. + For BigQuery use `.env.dev_bigquery_pandas` or `.env.dev_bigquery_bigframes`; for Snowflake use `.env.dev_snowflake`. - From this directory run `make demo ENGINE=` (set `BQ_FRAME` for BigQuery, `DBR_TABLE_FORMAT` for Spark). - Artifacts: DAG HTML in `site/dag/index.html`, FFT metadata in `.fastflowtransform/target/`. - See `docs/examples/Incremental_Demo.md` for a full walkthrough of the models and incremental configs. diff --git a/examples/incremental_demo/profiles.yml b/examples/incremental_demo/profiles.yml index 692e7b5..cf6e309 100644 --- a/examples/incremental_demo/profiles.yml +++ b/examples/incremental_demo/profiles.yml @@ -9,12 +9,27 @@ dev_postgres: dsn: "{{ env('FF_PG_DSN') }}" db_schema: "{{ env('FF_PG_SCHEMA', 'public') }}" +dev_databricks_parquet: &incremental_databricks_parquet + engine: databricks_spark + databricks_spark: + master: "{{ env('FF_SPARK_MASTER', 'local[*]') }}" + app_name: "{{ env('FF_SPARK_APP_NAME', 'incremental_demo_parquet') }}" + warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_parquet" + database: "{{ env('FF_DBR_DATABASE', 'incremental_demo_parquet') }}" + extra_conf: + spark.hadoop.javax.jdo.option.ConnectionURL: "jdbc:derby:{{ project_dir() }}/.local/metastore_db;create=true" + spark.hadoop.datanucleus.rdbms.datastoreAdapterClassName: "org.datanucleus.store.rdbms.adapter.DerbyAdapter" + spark.hadoop.datanucleus.schema.autoCreateAll: "true" + spark.hadoop.javax.jdo.option.ConnectionDriverName: "org.apache.derby.jdbc.EmbeddedDriver" + spark.driver.extraJavaOptions: "-Dderby.stream.error.file={{ project_dir() }}/.local/derby.log" + dev_databricks_delta: engine: databricks_spark databricks_spark: master: "{{ env('FF_SPARK_MASTER', 'local[*]') }}" - app_name: "{{ env('FF_SPARK_APP_NAME', 'incremental_demo') }}" - warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse" + app_name: "{{ env('FF_SPARK_APP_NAME', 'incremental_demo_delta') }}" + warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_delta" + database: "{{ env('FF_DBR_DATABASE', 'incremental_demo_delta') }}" extra_conf: spark.hadoop.javax.jdo.option.ConnectionURL: "jdbc:derby:{{ project_dir() }}/.local/metastore_db;create=true" spark.hadoop.datanucleus.rdbms.datastoreAdapterClassName: "org.datanucleus.store.rdbms.adapter.DerbyAdapter" @@ -28,6 +43,7 @@ dev_databricks_iceberg: master: "{{ env('FF_SPARK_MASTER', 'local[*]') }}" app_name: "{{ env('FF_SPARK_APP_NAME', 'incremental_demo_iceberg') }}" warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_iceberg" + database: "{{ env('FF_DBR_DATABASE', 'incremental_demo_iceberg') }}" table_format: "iceberg" extra_conf: spark.jars.packages: "org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0" @@ -95,3 +111,7 @@ dev_snowflake: schema: "{{ env('FF_SF_SCHEMA', 'INCREMENTAL_DEMO') }}" role: "{{ env('FF_SF_ROLE', '') }}" allow_create_schema: true + +# Backwards-compatible alias for scripts referencing dev_databricks +dev_databricks: + <<: *incremental_databricks_parquet diff --git a/examples/incremental_demo/project.yml b/examples/incremental_demo/project.yml index aaa5a8b..e29c791 100644 --- a/examples/incremental_demo/project.yml +++ b/examples/incremental_demo/project.yml @@ -4,23 +4,6 @@ version: "0.1" vars: {} models: - storage: - events_base.ff: - path: ".local/spark/events_base" - # format: parquet - - fct_events_sql_inline.ff: - path: ".local/spark/fct_events_sql_inline" - # format: parquet - - fct_events_sql_yaml.ff: - path: ".local/spark/fct_events_sql_yaml" - # format: parquet - - fct_events_py_incremental.ff: - path: ".local/spark/fct_events_py_incremental" - # format: parquet - incremental: fct_events_sql_inline.ff: unique_key: "event_id" @@ -37,11 +20,7 @@ models: fct_events_sql_inline_delta.ff: unique_key: "event_id" -seeds: - storage: - seed_events: - path: ".local/spark/seed_events" - # format: parquet +seeds: {} tests: - type: not_null diff --git a/examples/incremental_demo/site/dag/index.html b/examples/incremental_demo/site/dag/index.html index 6dda26b..6fc455c 100644 --- a/examples/incremental_demo/site/dag/index.html +++ b/examples/incremental_demo/site/dag/index.html @@ -73,6 +73,7 @@ .badge-table { background:#eef7ff; color:#0a3a77; border-color:#bcd8fb; } .badge-view { background:#eefcf4; color:#0b5d2a; border-color:#bdebcf; } .badge-ephemeral { background:#fff7e8; color:#7a4a00; border-color:#f6db9b; } + .badge-snapshot { background:#f3e8ff; color:#5b21b6; border-color:#d8b4fe; } .badge-sql { background: var(--chip-sql-bg); color: var(--chip-sql-fg); } .badge-py { background: var(--chip-py-bg); color: var(--chip-py-fg); } .subline { display:block; margin-top:2px; font-size:12px; color: var(--muted); line-height:1.35; } @@ -120,6 +121,8 @@

DAG

incremental + snapshot +
flowchart TD classDef sql fill:#e8f1ff,stroke:#5b8def,color:#0a1f44; diff --git a/examples/macros_demo/project.yml b/examples/macros_demo/project.yml index 16c6435..3d0a371 100644 --- a/examples/macros_demo/project.yml +++ b/examples/macros_demo/project.yml @@ -5,20 +5,7 @@ vars: # used by macros and examples default_country: "DE" -models: - storage: - stg_users: - path: ".local/spark/stg_users" - format: parquet - stg_orders: - path: ".local/spark/stg_orders" - format: parquet - dim_users.ff: - path: ".local/spark/dim_users" - format: parquet - fct_user_sales.ff: - path: ".local/spark/fct_user_sales" - format: parquet +models: {} seeds: storage: diff --git a/examples/materializations_demo/project.yml b/examples/materializations_demo/project.yml index a94c8f4..7acb2dd 100644 --- a/examples/materializations_demo/project.yml +++ b/examples/materializations_demo/project.yml @@ -3,18 +3,7 @@ version: "0.1" vars: {} -models: - storage: - # Optionally persist specific relations to predictable paths (Spark/Delta etc.) - dim_customers.ff: - path: ".local/spark/dim_customers" - format: parquet - fct_orders_inc.ff: - path: ".local/spark/fct_orders_inc" - format: parquet - mart_order_summary.ff: - path: ".local/spark/mart_order_summary" - format: parquet +models: {} seeds: storage: diff --git a/examples/snapshot_demo/.env.dev_databricks b/examples/snapshot_demo/.env.dev_databricks deleted file mode 100644 index a8d5dc9..0000000 --- a/examples/snapshot_demo/.env.dev_databricks +++ /dev/null @@ -1,16 +0,0 @@ -# Databricks (or local Spark) defaults for the snapshot demo -FF_SPARK_MASTER=local[*] -FF_SPARK_APP_NAME=snapshot_demo - -# Optional overrides for Databricks SQL warehouses or Unity Catalog -FF_DBR_DATABASE=snapshot_demo -# FF_DBR_CATALOG=hive_metastore - -# Enable a local Hive-compatible metastore (required for snapshots when running Spark standalone) -FF_DBR_ENABLE_HIVE=1 - -# Configure Java for local Spark sessions when needed -# JAVA_HOME=/opt/homebrew/opt/openjdk@17 - -# To target Delta Lake tables explicitly, set: -# FF_DBR_TABLE_FORMAT=delta diff --git a/examples/snapshot_demo/.env.dev_databricks_delta b/examples/snapshot_demo/.env.dev_databricks_delta index 0cfccaf..bea46a6 100644 --- a/examples/snapshot_demo/.env.dev_databricks_delta +++ b/examples/snapshot_demo/.env.dev_databricks_delta @@ -1,16 +1,15 @@ # Databricks Spark (Delta) profile defaults for the snapshot demo FF_SPARK_MASTER=local[*] -FF_SPARK_APP_NAME=snapshot_demo +FF_SPARK_APP_NAME=snapshot_demo_delta # Managed metastore/database when running Spark locally FF_DBR_ENABLE_HIVE=1 -FF_DBR_DATABASE=snapshot_demo +FF_DBR_DATABASE=snapshot_demo_delta # Optional: Unity Catalog # FF_DBR_CATALOG=hive_metastore -# Switch the managed table format (parquet|delta|iceberg) -# Defaults to parquet unless Makefile/CLI overrides FF_DBR_TABLE_FORMAT -# FF_DBR_TABLE_FORMAT=delta +# Force Delta tables unless Makefile overrides +FF_DBR_TABLE_FORMAT=delta # Configure Java for local Spark sessions when needed # JAVA_HOME=/opt/homebrew/opt/openjdk@17 diff --git a/examples/snapshot_demo/.env.dev_databricks_iceberg b/examples/snapshot_demo/.env.dev_databricks_iceberg index d82e3c4..a384f17 100644 --- a/examples/snapshot_demo/.env.dev_databricks_iceberg +++ b/examples/snapshot_demo/.env.dev_databricks_iceberg @@ -1,10 +1,10 @@ # Databricks Spark (Iceberg) profile defaults for the snapshot demo FF_SPARK_MASTER=local[*] -FF_SPARK_APP_NAME=snapshot_demo +FF_SPARK_APP_NAME=snapshot_demo_iceberg # Managed Iceberg catalog metadata lives under .local/iceberg_warehouse_snapshot # The profile wires the Iceberg catalog; just ensure the directory is writable. -FF_DBR_DATABASE=snapshot_demo +FF_DBR_DATABASE=snapshot_demo_iceberg # Tell FFT/Spark to request Iceberg tables (Makefile also injects this) FF_DBR_TABLE_FORMAT=iceberg diff --git a/examples/snapshot_demo/.env.dev_databricks_parquet b/examples/snapshot_demo/.env.dev_databricks_parquet new file mode 100644 index 0000000..5936f51 --- /dev/null +++ b/examples/snapshot_demo/.env.dev_databricks_parquet @@ -0,0 +1,14 @@ +# Databricks Spark (parquet) profile defaults for the snapshot demo +FF_SPARK_MASTER=local[*] +FF_SPARK_APP_NAME=snapshot_demo_parquet + +# Managed metastore/database for parquet tables +FF_DBR_ENABLE_HIVE=1 +FF_DBR_DATABASE=snapshot_demo_parquet +# Optional: Unity Catalog overrides +# FF_DBR_CATALOG=hive_metastore + +# Configure Java for local Spark sessions when needed +# JAVA_HOME=/opt/homebrew/opt/openjdk@17 + +# Leave FF_DBR_TABLE_FORMAT unset (defaults to parquet unless Makefile overrides) diff --git a/examples/snapshot_demo/Makefile b/examples/snapshot_demo/Makefile index 7389e16..0ee8b27 100644 --- a/examples/snapshot_demo/Makefile +++ b/examples/snapshot_demo/Makefile @@ -31,7 +31,7 @@ ifeq ($(ENGINE),databricks_spark) else ifeq ($(DBR_TABLE_FORMAT),iceberg) PROFILE_ENV = dev_databricks_iceberg else - PROFILE_ENV = dev_databricks_delta + PROFILE_ENV = dev_databricks_parquet endif endif ifeq ($(ENGINE),bigquery) diff --git a/examples/snapshot_demo/README.md b/examples/snapshot_demo/README.md index 61a7ab6..bf6f788 100644 --- a/examples/snapshot_demo/README.md +++ b/examples/snapshot_demo/README.md @@ -12,9 +12,13 @@ Copy one of the `.env.dev_*` files and export it before running `make`: | --- | --- | | `.env.dev_duckdb` | Local DuckDB file for the demo | | `.env.dev_postgres` | Postgres DSN/schema | -| `.env.dev_databricks_delta` | Local Spark or Databricks defaults for parquet/Delta tables | +| `.env.dev_databricks_parquet` | Local Spark defaults for managed parquet tables | +| `.env.dev_databricks_delta` | Local Spark defaults for Delta Lake tables | | `.env.dev_databricks_iceberg` | Spark 4+/Databricks configuration with the Iceberg catalog wired in | +Each Databricks profile uses its own managed database/warehouse (`snapshot_demo_parquet`, +`snapshot_demo_delta`, `snapshot_demo_iceberg`) so switching formats never reuses stale metadata. + `FF_DBR_TABLE_FORMAT` can always override the physical Spark table format (`parquet`, `delta`, `iceberg`) even if the profile defaults differ. diff --git a/examples/snapshot_demo/profiles.yml b/examples/snapshot_demo/profiles.yml index c23c3bd..5fbca70 100644 --- a/examples/snapshot_demo/profiles.yml +++ b/examples/snapshot_demo/profiles.yml @@ -11,12 +11,27 @@ dev_postgres: dsn: "{{ env('FF_PG_DSN') }}" db_schema: "{{ env('FF_PG_SCHEMA', 'snapshot_demo') }}" +dev_databricks_parquet: &snapshot_demo_databricks_parquet + engine: databricks_spark + databricks_spark: + master: "{{ env('FF_SPARK_MASTER', 'local[*]') }}" + app_name: "{{ env('FF_SPARK_APP_NAME', 'snapshot_demo_parquet') }}" + warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_parquet" + database: "{{ env('FF_DBR_DATABASE', 'snapshot_demo_parquet') }}" + extra_conf: + spark.hadoop.javax.jdo.option.ConnectionURL: "jdbc:derby:{{ project_dir() }}/.local/metastore_db;create=true" + spark.hadoop.datanucleus.rdbms.datastoreAdapterClassName: "org.datanucleus.store.rdbms.adapter.DerbyAdapter" + spark.hadoop.datanucleus.schema.autoCreateAll: "true" + spark.hadoop.javax.jdo.option.ConnectionDriverName: "org.apache.derby.jdbc.EmbeddedDriver" + spark.driver.extraJavaOptions: "-Dderby.stream.error.file={{ project_dir() }}/.local/derby.log" + dev_databricks_delta: engine: databricks_spark databricks_spark: master: "{{ env('FF_SPARK_MASTER', 'local[*]') }}" - app_name: "{{ env('FF_SPARK_APP_NAME', 'snapshot_demo') }}" - warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse" + app_name: "{{ env('FF_SPARK_APP_NAME', 'snapshot_demo_delta') }}" + warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_delta" + database: "{{ env('FF_DBR_DATABASE', 'snapshot_demo_delta') }}" extra_conf: spark.hadoop.javax.jdo.option.ConnectionURL: "jdbc:derby:{{ project_dir() }}/.local/metastore_db;create=true" spark.hadoop.datanucleus.rdbms.datastoreAdapterClassName: "org.datanucleus.store.rdbms.adapter.DerbyAdapter" @@ -28,17 +43,22 @@ dev_databricks_iceberg: engine: databricks_spark databricks_spark: master: "{{ env('FF_SPARK_MASTER', 'local[*]') }}" - app_name: "{{ env('FF_SPARK_APP_NAME', 'snapshot_demo') }}" - warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse" + app_name: "{{ env('FF_SPARK_APP_NAME', 'snapshot_demo_iceberg') }}" + warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_iceberg" + database: "{{ env('FF_DBR_DATABASE', 'snapshot_demo_iceberg') }}" table_format: "iceberg" extra_conf: spark.jars.packages: "org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0" spark.sql.catalog.iceberg: "org.apache.iceberg.spark.SparkCatalog" spark.sql.catalog.iceberg.type: "hadoop" - spark.sql.catalog.iceberg.warehouse: "file://{{ project_dir() }}/.local/iceberg_warehouse" + spark.sql.catalog.iceberg.warehouse: "file://{{ project_dir() }}/.local/iceberg_warehouse_snapshot" spark.sql.catalog.iceberg.write.metadata.version-hint.enabled: "false" spark.sql.catalog.iceberg.read.metadata.version-hint.enabled: "false" +# Backwards-compatible alias (defaults to parquet) +dev_databricks: + <<: *snapshot_demo_databricks_parquet + dev_bigquery_bigframes: engine: bigquery bigquery: diff --git a/examples/snapshot_demo/project.yml b/examples/snapshot_demo/project.yml index 6ce1031..16e8674 100644 --- a/examples/snapshot_demo/project.yml +++ b/examples/snapshot_demo/project.yml @@ -7,20 +7,7 @@ docs: vars: {} -models: - storage: - mart_users_by_domain.ff: - path: ".local/spark/events_base" - # format: parquet - users_clean.ff: - path: ".local/spark/users_clean" - # format: parquet - users_clean_snapshot.ff: - path: ".local/spark/users_clean_snapshot" - # format: parquet - mart_users_by_domain_snapshot.ff: - path: ".local/spark/mart_users_by_domain_snapshot" - # format: parquet +models: {} tests: # -------------------------------------------------------------------------- diff --git a/pyproject.toml b/pyproject.toml index a8eb2aa..04d43d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "fastflowtransform" -version = "0.6.2" +version = "0.6.4" description = "Python framework for SQL & Python data transformation, ETL pipelines, and dbt-style data modeling" readme = "README.md" license = { text = "Apache-2.0" } diff --git a/pytest.ini b/pytest.ini index 24520a0..d3d73bc 100644 --- a/pytest.ini +++ b/pytest.ini @@ -5,7 +5,7 @@ markers = postgres: marks tests that require Postgres databricks_spark: marks tests that require Spark bigquery: marks tests that require BigQuery - snowflake: marks tests that require Snowflake + snowflake_snowpark: marks tests that require Snowflake http: marks tests that exercise the HTTP client/API streaming: marks tests that exercise streaming functionality unit: marks unit tests diff --git a/src/fastflowtransform/executors/databricks_spark.py b/src/fastflowtransform/executors/databricks_spark.py index 0bede06..fc06218 100644 --- a/src/fastflowtransform/executors/databricks_spark.py +++ b/src/fastflowtransform/executors/databricks_spark.py @@ -957,6 +957,13 @@ def _snapshot_incremental_run( if "__ff_new_hash" in snapshot_df.columns: snapshot_df = snapshot_df.drop("__ff_new_hash") + # Break lineage so Spark doesn't see this as "read from and overwrite the same table" + try: + snapshot_df = snapshot_df.localCheckpoint(eager=True) + except Exception: + snapshot_df = snapshot_df.cache() + snapshot_df.count() + storage_meta = self._storage_meta(node, rel_name) self._save_df_as_table(rel_name, snapshot_df, storage=storage_meta) @@ -1003,7 +1010,29 @@ def snapshot_prune( return pruned = ranked.filter(F.col("__ff_rn") <= int(keep_last)).drop("__ff_rn") - self._save_df_as_table(relation, pruned) + + # Materialize before overwrite to avoid Spark's + # [UNSUPPORTED_OVERWRITE.TABLE] "target that is also being read from". + materialized: list[SDF] = [] + + def _materialize(df: SDF) -> SDF: + try: + cp = df.localCheckpoint(eager=True) + materialized.append(cp) + return cp + except Exception: + cached = df.cache() + cached.count() + materialized.append(cached) + return cached + + try: + out = _materialize(pruned) + self._save_df_as_table(relation, out) + finally: + for handle in materialized: + with suppress(Exception): + handle.unpersist() # ────────────────────────── local helpers / shim ────────────────────────── diff --git a/src/fastflowtransform/executors/snowflake_snowpark.py b/src/fastflowtransform/executors/snowflake_snowpark.py index 3195a85..d60fd8a 100644 --- a/src/fastflowtransform/executors/snowflake_snowpark.py +++ b/src/fastflowtransform/executors/snowflake_snowpark.py @@ -342,6 +342,7 @@ def run_snapshot_sql(self, node: Node, env: Environment) -> None: raise ValueError( "strategy='timestamp' snapshot requires a non-null updated_at column." ) + create_sql = f""" CREATE OR REPLACE TABLE {target} AS SELECT @@ -361,6 +362,7 @@ def run_snapshot_sql(self, node: Node, env: Environment) -> None: upd_expr = ( f"s.{cfg.updated_at}" if cfg.updated_at is not None else "CURRENT_TIMESTAMP()" ) + create_sql = f""" CREATE OR REPLACE TABLE {target} AS SELECT diff --git a/src/fastflowtransform/table_formats/spark_delta.py b/src/fastflowtransform/table_formats/spark_delta.py index 756a2e5..b858b8f 100644 --- a/src/fastflowtransform/table_formats/spark_delta.py +++ b/src/fastflowtransform/table_formats/spark_delta.py @@ -1,6 +1,7 @@ # fastflowtransform/table_formats/spark_delta.py from __future__ import annotations +from contextlib import suppress from typing import TYPE_CHECKING, Any from fastflowtransform.table_formats.base import SparkFormatHandler @@ -59,22 +60,141 @@ def _delta_table_for(self, table_name: str) -> DeltaTable: f"or is not registered as a Delta table: {exc}" ) from exc + @staticmethod + def _quote_identifier(name: str) -> str: + parts = [p for p in name.split(".") if p] + if not parts: + esc = name.replace("`", "``") + return f"`{esc}`" + return ".".join(f"`{part.replace('`', '``')}`" for part in parts) + + @staticmethod + def _sql_literal(value: str) -> str: + return "'" + value.replace("'", "''") + "'" + + def _restore_table_metadata( + self, + table_ident: str, + *, + table_comment: str | None, + column_comments: dict[str, str], + table_properties: dict[str, Any], + ) -> None: + if table_comment: + with suppress(Exception): + self.spark.sql( + f"COMMENT ON TABLE {table_ident} IS {self._sql_literal(table_comment)}" + ) + + if table_properties: + assignments = [] + for key, value in table_properties.items(): + if value is None: + continue + key_str = str(key) + if key_str.lower() in {"transient_lastddltime"}: + continue + assignments.append(f"{self._sql_literal(key_str)}={self._sql_literal(str(value))}") + if assignments: + props_sql = ", ".join(assignments) + with suppress(Exception): + self.spark.sql(f"ALTER TABLE {table_ident} SET TBLPROPERTIES ({props_sql})") + + for col, comment in column_comments.items(): + if not comment: + continue + col_ident = f"{table_ident}.{self._quote_identifier(col)}" + with suppress(Exception): + self.spark.sql(f"COMMENT ON COLUMN {col_ident} IS {self._sql_literal(comment)}") + # ---------- Required API ---------- def save_df_as_table(self, table_name: str, df: SDF) -> None: """ Save DataFrame as a managed Delta table. - Overwrites the table content: - - writer.format("delta") - - writer.mode("overwrite") - - options from self.table_options + For existing tables we bypass Hive's ALTER TABLE path by overwriting the + physical Delta location directly (with schema overwrite) and refreshing + the table metadata. New tables go through saveAsTable so they are + registered in the metastore. """ - writer = df.write.format("delta").mode("overwrite") - if self.table_options: - writer = writer.options(**self.table_options) + def _writer() -> Any: + w = df.write.format("delta").mode("overwrite") + if self.table_options: + w = w.options(**self.table_options) + return w + + exists = False + try: + exists = self.spark.catalog.tableExists(table_name) + except Exception: + exists = False + + if not exists: + _writer().saveAsTable(table_name) + return + + table_comment: str | None = None + table_properties: dict[str, Any] = {} + column_comments: dict[str, str] = {} + table_ident = self._quote_identifier(table_name) + + try: + info = self.spark.catalog.getTable(table_name) + table_comment = getattr(info, "description", None) + props = getattr(info, "properties", None) + if isinstance(props, dict): + table_properties = dict(props) + except Exception: + pass + + try: + cols = self.spark.catalog.listColumns(table_name) + column_comments = {} + for col in cols: + comment = getattr(col, "comment", None) + if comment: + column_comments[col.name] = comment + except Exception: + column_comments = {} + + location: str | None = None + try: + detail = self._delta_table_for(table_name).detail().collect() + if detail: + location = detail[0].get("location") or detail[0].get("path") + except Exception: + location = None + + if not location: + try: + info = self.spark.catalog.getTable(table_name) + location = getattr(info, "location", None) + except Exception: + location = None + + if not location: + # Fallback: drop and recreate if we can't resolve the location. + with suppress(Exception): + self.spark.sql(f"DROP TABLE IF EXISTS {table_ident}") + _writer().saveAsTable(table_name) + self._restore_table_metadata( + table_ident, + table_comment=table_comment, + column_comments=column_comments, + table_properties=table_properties, + ) + return - writer.saveAsTable(table_name) + _writer().option("overwriteSchema", "true").save(location) + with suppress(Exception): + self.spark.catalog.refreshTable(table_name) + self._restore_table_metadata( + table_ident, + table_comment=table_comment, + column_comments=column_comments, + table_properties=table_properties, + ) # ---------- Incremental API ---------- # incremental_insert: base implementation is fine: diff --git a/src/fastflowtransform/table_formats/spark_iceberg.py b/src/fastflowtransform/table_formats/spark_iceberg.py index 9175139..0f8f66f 100644 --- a/src/fastflowtransform/table_formats/spark_iceberg.py +++ b/src/fastflowtransform/table_formats/spark_iceberg.py @@ -1,5 +1,6 @@ from __future__ import annotations +from contextlib import suppress from typing import Any from fastflowtransform.table_formats.base import SparkFormatHandler @@ -24,8 +25,10 @@ def __init__( *, table_options: dict[str, Any] | None = None, ) -> None: - super().__init__(spark, table_format="iceberg", table_options=table_options or {}) - self.catalog_name = "iceberg" + options = dict(table_options or {}) + catalog = options.pop("catalog_name", None) or options.pop("__catalog_name__", None) + self.catalog_name = str(catalog) if catalog else "iceberg" + super().__init__(spark, table_format="iceberg", table_options=options) # ---------- Core helpers ---------- def _qualify_table_name(self, table_name: str, database: str | None = None) -> str: @@ -60,11 +63,58 @@ def allows_unmanaged_paths(self) -> bool: def relation_exists(self, table_name: str, *, database: str | None = None) -> bool: ident = self.qualify_identifier(table_name, database=database) try: - self.spark.table(ident) - return True + return bool(self.spark.catalog.tableExists(ident)) except Exception: return False + @staticmethod + def _quote_part(value: str) -> str: + return f"`{value.replace('`', '``')}`" + + def _sql_identifier(self, table_name: str, *, database: str | None = None) -> str: + qualified = self._qualify_table_name(table_name, database=database) + parts = [p for p in qualified.split(".") if p] + return ".".join(self._quote_part(part) for part in parts) + + @staticmethod + def _sql_literal(value: str) -> str: + return "'" + value.replace("'", "''") + "'" + + def _restore_table_metadata( + self, + table_ident: str, + *, + table_comment: str | None, + column_comments: dict[str, str], + table_properties: dict[str, Any], + ) -> None: + if table_comment: + with suppress(Exception): + self.spark.sql( + f"COMMENT ON TABLE {table_ident} IS {self._sql_literal(table_comment)}" + ) + + if table_properties: + assignments = [] + for key, value in table_properties.items(): + if value is None: + continue + key_str = str(key) + if key_str.lower() in {"transient_lastddltime"}: + continue + assignments.append(f"{self._sql_literal(key_str)}={self._sql_literal(str(value))}") + if assignments: + props = ", ".join(assignments) + with suppress(Exception): + self.spark.sql(f"ALTER TABLE {table_ident} SET TBLPROPERTIES ({props})") + + for name, comment in column_comments.items(): + if not comment: + continue + col_ident = f"{table_ident}.{self._quote_part(name)}" + with suppress(Exception): + self.spark.sql(f"COMMENT ON COLUMN {col_ident} IS {self._sql_literal(comment)}") + # ---------- Required API ---------- def save_df_as_table(self, table_name: str, df: SDF) -> None: """ @@ -75,21 +125,58 @@ def save_df_as_table(self, table_name: str, df: SDF) -> None: df.writeTo("iceberg.db.table").using("iceberg").createOrReplace() """ full_name = self._qualify_table_name(table_name) - writer = df.writeTo(full_name).using("iceberg") for k, v in self.table_options.items(): writer = writer.tableProperty(str(k), str(v)) + existed = False + table_comment: str | None = None + table_properties: dict[str, Any] = {} + column_comments: dict[str, str] = {} + table_ident = self._sql_identifier(table_name) + + try: + existed = bool(self.spark.catalog.tableExists(full_name)) + except Exception: + existed = False + + if existed: + try: + info = self.spark.catalog.getTable(full_name) + table_comment = getattr(info, "description", None) + props = getattr(info, "properties", None) + if isinstance(props, dict): + table_properties = dict(props) + except Exception: + pass + + try: + cols = self.spark.catalog.listColumns(full_name) + for col in cols: + comment = getattr(col, "comment", None) + if comment: + column_comments[col.name] = comment + except Exception: + column_comments = {} + # Upsert semantics for seeds / full-refresh writer.createOrReplace() + if existed: + self._restore_table_metadata( + table_ident, + table_comment=table_comment, + column_comments=column_comments, + table_properties=table_properties, + ) + # ---------- Incremental API ---------- def incremental_insert(self, table_name: str, select_body_sql: str) -> None: body = select_body_sql.strip().rstrip(";\n\t ") if not body.lower().startswith("select"): raise ValueError(f"incremental_insert expects SELECT body, got: {body[:40]!r}") - full_name = self._qualify_table_name(table_name) + full_name = self._sql_identifier(table_name) self.spark.sql(f"INSERT INTO {full_name} {body}") def incremental_merge( @@ -112,7 +199,7 @@ def incremental_merge( self.incremental_insert(table_name, body) return - full_name = self._qualify_table_name(table_name) + full_name = self._sql_identifier(table_name) pred = " AND ".join([f"t.`{k}` = s.`{k}`" for k in unique_key]) self.spark.sql( diff --git a/tests/common/fixtures.py b/tests/common/fixtures.py index a3021f3..1f29b4e 100644 --- a/tests/common/fixtures.py +++ b/tests/common/fixtures.py @@ -2,7 +2,7 @@ from __future__ import annotations import os -from contextlib import suppress +import types from pathlib import Path from types import SimpleNamespace from typing import TYPE_CHECKING, Any @@ -12,9 +12,16 @@ import pytest import sqlalchemy as sa from dotenv import load_dotenv -from jinja2 import Environment, FileSystemLoader, select_autoescape +from jinja2 import DictLoader, Environment, FileSystemLoader, select_autoescape from sqlalchemy import text +import fastflowtransform.executors.bigquery.base as bq_base +import fastflowtransform.executors.bigquery.pandas as bq_pandas +import fastflowtransform.typing as fft_typing +from fastflowtransform.executors.bigquery.pandas import BigQueryExecutor +from tests.common.mock.bigquery import install_fake_bigquery +from tests.common.mock.snowflake_snowpark import FakeSnowflakeSession + if TYPE_CHECKING: # pragma: no cover - typing only import psycopg from psycopg import sql @@ -39,7 +46,7 @@ from fastflowtransform import utest from fastflowtransform.core import REGISTRY -from tests.common.utils import ROOT, run +from tests.common.utils import ROOT try: # Optional: Spark deps may not be installed in core runs from fastflowtransform.executors.databricks_spark import DatabricksSparkExecutor @@ -47,6 +54,22 @@ DatabricksSparkExecutor = None # type: ignore +try: # Optional: Spark deps may not be installed in core runs + from fastflowtransform.executors.databricks_spark import DatabricksSparkExecutor +except ModuleNotFoundError: # pragma: no cover - import guard + DatabricksSparkExecutor = None # type: ignore + +# --- Snowflake ---------------------------------------------------- +try: + from fastflowtransform.executors.snowflake_snowpark import ( + SnowflakeSnowparkExecutor, + _SFCursorShim, + ) +except ModuleNotFoundError: # pragma: no cover + SnowflakeSnowparkExecutor = None # type: ignore[assignment] + _SFCursorShim = None # type: ignore[assignment] + + # ---- Load Env Variables ---- @pytest.fixture(scope="session", autouse=True) def load_test_env(): @@ -104,23 +127,6 @@ def duckdb_env(duckdb_db_path): return {"FF_ENGINE": "duckdb", "FF_DUCKDB_PATH": str(duckdb_db_path)} -@pytest.fixture(scope="function") -def duckdb_seeded(duckdb_project, duckdb_env): - db_path = duckdb_env.get("FF_DUCKDB_PATH") - db_file = Path(db_path) if db_path else None - if db_file: - if db_file.exists(): - db_file.unlink() - db_file.parent.mkdir(parents=True, exist_ok=True) - run(["fft", "seed", str(duckdb_project), "--env", "dev"], duckdb_env) - try: - yield - finally: - if db_file: - with suppress(Exception): - db_file.unlink() - - # ---- Postgres ---- @pytest.fixture(scope="session") def pg_project(): @@ -134,21 +140,6 @@ def pg_env(): return {"FF_ENGINE": "postgres", "FF_PG_DSN": dsn, "FF_PG_SCHEMA": schema} -@pytest.fixture(scope="module") -def pg_seeded(pg_project, pg_env): - dsn = pg_env.get("FF_PG_DSN") - schema = pg_env.get("FF_PG_SCHEMA") or "public" - if psycopg is None or sql is None: - pytest.skip("psycopg not installed; install fastflowtransform[postgres] to run PG fixtures") - if dsn and schema and ("psycopg://" in dsn or "+psycopg" in dsn): - with suppress(Exception), psycopg.connect(dsn) as conn: - conn.execute(sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format(sql.Identifier(schema))) - conn.execute(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(schema))) - conn.commit() - run(["fft", "seed", str(pg_project), "--env", "stg"], pg_env) - yield - - # ---- Spark ---- @pytest.fixture def exec_minimal(monkeypatch): @@ -368,3 +359,142 @@ def bigquery_engine_env(): env["GOOGLE_APPLICATION_CREDENTIALS"] = creds return env + + +# ---- Snowflake Snowpark ---- +@pytest.fixture(scope="session") +def snowflake_engine_env(): + """ + Basic env for Snowflake Snowpark examples / integration tests. + + Skips if required env vars are missing, so the test suite works without + a Snowflake account configured. + """ + account = os.environ.get("FF_SF_ACCOUNT") + user = os.environ.get("FF_SF_USER") + password = os.environ.get("FF_SF_PASSWORD") + warehouse = os.environ.get("FF_SF_WAREHOUSE") + database = os.environ.get("FF_SF_DATABASE") + schema = os.environ.get("FF_SF_SCHEMA") + role = os.environ.get("FF_SF_ROLE") + allow_create_schema = os.environ.get("FF_SF_ALLOW_CREATE_SCHEMA", "1") + + # If any core bits are missing, skip Snowflake tests entirely + required = [account, user, password, warehouse, database, schema] + if not all(required): + pytest.skip( + "Snowflake env not configured for tests " + "(need FF_SF_ACCOUNT, FF_SF_USER, FF_SF_PASSWORD, " + "FF_SF_WAREHOUSE, FF_SF_DATABASE, FF_SF_SCHEMA)." + ) + + env = { + "FF_ENGINE": "snowflake_snowpark", + "FF_SF_ACCOUNT": account, + "FF_SF_USER": user, + "FF_SF_PASSWORD": password, + "FF_SF_WAREHOUSE": warehouse, + "FF_SF_DATABASE": database, + "FF_SF_SCHEMA": schema, + "FF_SF_ALLOW_CREATE_SCHEMA": allow_create_schema, + } + if role: + env["FF_SF_ROLE"] = role + return env + + +@pytest.fixture(scope="session") +def snowflake_cfg(snowflake_engine_env): + """ + Canonical config dict for SnowflakeSnowparkExecutor, derived from env. + """ + return { + "account": snowflake_engine_env["FF_SF_ACCOUNT"], + "user": snowflake_engine_env["FF_SF_USER"], + "password": snowflake_engine_env["FF_SF_PASSWORD"], + "warehouse": snowflake_engine_env["FF_SF_WAREHOUSE"], + "database": snowflake_engine_env["FF_SF_DATABASE"], + "schema": snowflake_engine_env["FF_SF_SCHEMA"], + "role": snowflake_engine_env.get("FF_SF_ROLE"), + "allow_create_schema": bool( + int(snowflake_engine_env.get("FF_SF_ALLOW_CREATE_SCHEMA", "1")) + ), + } + + +@pytest.fixture +def snowflake_executor_fake() -> Any: + """ + Fake SnowflakeSnowparkExecutor using an in-memory FakeSnowflakeSession. + This does NOT talk to a real Snowflake account and does not need env vars. + It is intended for SQL-shape tests, similar to the BigQuery fake. + """ + if SnowflakeSnowparkExecutor is None: + pytest.skip("SnowflakeSnowparkExecutor not importable") + + # Build instance without running its __init__ (which would try to connect). + ex: Any = SnowflakeSnowparkExecutor.__new__(SnowflakeSnowparkExecutor) + + # Minimal attributes that the executor expects. + ex.database = "FF_TEST_DB" + ex.schema = "FF_TEST_SCHEMA" + ex.allow_create_schema = False + + # Fake Snowflake session and cursor shim. + session = FakeSnowflakeSession() + ex.session = session + if _SFCursorShim is not None: + ex.con = _SFCursorShim(session) + else: + # Cheap fallback if for some reason the shim isn't available + ex.con = types.SimpleNamespace(execute=lambda sql, params=None: None) + + return ex + + +@pytest.fixture(scope="session") +def jinja_env_bigquery() -> Environment: + """ + Very small Jinja environment for BigQuery unit tests. + (You can also reuse your global jinja_env from tests/common/fixtures.py + and just import that instead.) + """ + env = Environment( + loader=DictLoader({}), + autoescape=select_autoescape([]), + trim_blocks=True, + lstrip_blocks=True, + ) + # same globals your main jinja_env normally has + env.globals.setdefault("is_incremental", lambda: False) + env.globals.setdefault("this", None) + return env + + +@pytest.fixture +def bq_executor_fake(monkeypatch) -> BigQueryExecutor: + """ + BigQueryExecutor wired against the FakeClient from tests/common/mock/bigquery.py. + No real BigQuery project / dataset / credentials required. + """ + # Make sure all FFT modules that cache a `bigquery` symbol + # see the fake module. + fake_bq = install_fake_bigquery( + monkeypatch, + target_modules=[fft_typing, bq_base, bq_pandas], + ) + + # Instantiate FakeClient via the fake module so the types line up + client = fake_bq.Client(project="ff_test_project", location="EU") # type: ignore[attr-defined] + + # Optionally pretend the dataset exists (for _ensure_dataset tests) + client.add_dataset("ff_test_project.ff_snapshots") # type: ignore[attr-defined] + + ex = BigQueryExecutor( + project="ff_test_project", + dataset="ff_snapshots", + location="EU", + client=client, + allow_create_dataset=True, + ) + return ex diff --git a/tests/common/mock/bigquery.py b/tests/common/mock/bigquery.py index 49d8ace..d4abadf 100644 --- a/tests/common/mock/bigquery.py +++ b/tests/common/mock/bigquery.py @@ -10,7 +10,7 @@ # Optional dependency: provide lightweight fallbacks when google libs are absent. try: - from google.api_core.exceptions import BadRequest, NotFound + from google.api_core.exceptions import BadRequest, NotFound # type: ignore[attr-defined] except Exception: # pragma: no cover - when google is not installed class BadRequest(Exception): @@ -40,6 +40,13 @@ def __init__(self, rows: list[tuple] | None = None, schema: list[FakeField] | No self._rows = rows or [] self._schema = schema or [] + def __iter__(self): + # Allow patterns like: list(job.result()) + return iter(self._rows) + + def __len__(self) -> int: + return len(self._rows) + def to_dataframe(self, create_bqstorage_client: bool = True): if not self._rows: return pd.DataFrame([]) @@ -112,6 +119,7 @@ def __init__(self, project: str, location: str | None = None): self.queries: list[tuple[str, str | None, Any | None]] = [] self._datasets: dict[str, FakeDataset] = {} self._tables: dict[str, list[Any]] = {} + self._relations: set[str] = set() # ---- Test helper ---- def add_dataset(self, ds_id: str) -> None: @@ -123,20 +131,70 @@ def add_table(self, dataset_id: str, table_id: str) -> None: # ---- Emulator methods ---- def query(self, sql: str, location: str | None = None, job_config: Any | None = None): self.queries.append((sql, location, job_config)) + upper_sql = sql.upper() + + # --- INFORMATION_SCHEMA existence checks --- + if "INFORMATION_SCHEMA.TABLES" in upper_sql or "INFORMATION_SCHEMA.VIEWS" in upper_sql: + rel_name: str | None = None + # Extract the @rel parameter from QueryJobConfig, if present + if job_config is not None and hasattr(job_config, "kwargs"): + params = job_config.kwargs.get("query_parameters") or [] + for p in params: + if getattr(p, "name", None) == "rel": + rel_name = getattr(p, "value", None) + break + + if rel_name and rel_name in self._relations: + # Relation exists + return FakeJob(rows=[(1,)]) + # Relation does NOT exist + return FakeJob(rows=[]) + + # --- CREATE TABLE / CREATE OR REPLACE TABLE --- + trimmed_upper = upper_sql.lstrip() + trimmed_orig = sql.lstrip() + + lower_trimmed = trimmed_orig.lower() + if trimmed_upper.startswith("CREATE TABLE") or trimmed_upper.startswith( + "CREATE OR REPLACE TABLE" + ): + # After "table" comes the identifier: `project.dataset.table` or similar + after_table = trimmed_orig[lower_trimmed.index("table") + len("table") :].strip() + if after_table.lower().startswith("if not exists"): + after_table = after_table[len("if not exists") :].strip() + # Cut at common delimiters: AS / ( / whitespace-newline + for sep in [" AS", " (", "\n", "\t"]: + idx = after_table.find(sep) + if idx != -1: + after_table = after_table[:idx] + break + ident = after_table.strip().strip("`") + table_name = ident.split(".")[-1] + self._relations.add(table_name) + return FakeJob() - # INFORMATION_SCHEMA β†’ 1 Row back - if "INFORMATION_SCHEMA.TABLES" in sql or "INFORMATION_SCHEMA.VIEWS" in sql: - return FakeJob(rows=[(1,)]) + # --- CREATE OR REPLACE VIEW --- + if trimmed_upper.startswith("CREATE OR REPLACE VIEW"): + after_view = trimmed_orig[lower_trimmed.index("view") + len("view") :].strip() + for sep in [" AS", " (", "\n", "\t"]: + idx = after_view.find(sep) + if idx != -1: + after_view = after_view[:idx] + break + ident = after_view.strip().strip("`") + view_name = ident.split(".")[-1] + self._relations.add(view_name) + return FakeJob() - # Probe-Query (SELECT ... WHERE 1=0) β†’ Schema back - if "WHERE 1=0" in sql: + # --- Probe-Query (SELECT ... WHERE 1=0) β†’ schema back for alter_table_sync_schema --- + if "WHERE 1=0" in upper_sql: return FakeJob(schema=[FakeField("id"), FakeField("new_col", "INT64")]) - # ALTER TABLE ... ADD COLUMN ... - if sql.lstrip().upper().startswith("ALTER TABLE"): + # --- ALTER TABLE ... ADD COLUMN ... --- + if trimmed_upper.startswith("ALTER TABLE"): return FakeJob() - # everything else β†’ empty return + # Default: generic, empty job return FakeJob() def list_tables(self, dataset_id: str): diff --git a/tests/common/mock/snowflake_snowpark.py b/tests/common/mock/snowflake_snowpark.py new file mode 100644 index 0000000..091b206 --- /dev/null +++ b/tests/common/mock/snowflake_snowpark.py @@ -0,0 +1,53 @@ +# tests/common/mock/snowflake_snowpark.py +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class FakeSFRow: + """Simple Row-like object with asDict() support.""" + + data: dict[str, Any] + + def asDict(self) -> dict[str, Any]: + return dict(self.data) + + +class FakeSFResult: + """ + Minimal Snowpark Result mimic: only .collect() is used by the executor. + """ + + def __init__(self, rows: list[FakeSFRow] | None = None): + self._rows = rows or [] + + def collect(self) -> list[FakeSFRow]: + return self._rows + + +@dataclass +class FakeSnowflakeSession: + """ + Fake Snowflake Snowpark Session that: + - records every SQL statement in `sql_calls` + - returns empty results for all statements + This is enough to test that our executor *emits* the right SQL. + """ + + sql_calls: list[str] = field(default_factory=list) + + # --- API used by SnowflakeSnowparkExecutor --- + + def sql(self, query: str) -> FakeSFResult: + self.sql_calls.append(query) + # For our tests we don't need real rows, just a non-crashing collect(). + return FakeSFResult([]) + + # In these SQL-only snapshot tests we never actually call `table(...)`, + # but we provide a stub so accesses fail loudly if that changes. + def table(self, name: str) -> Any: + raise NotImplementedError( + "FakeSnowflakeSession.table is not implemented for SQL-only tests." + ) diff --git a/tests/common/snapshot_helpers.py b/tests/common/snapshot_helpers.py new file mode 100644 index 0000000..696eaf2 --- /dev/null +++ b/tests/common/snapshot_helpers.py @@ -0,0 +1,258 @@ +# tests/common/snapshot_helpers.py +from __future__ import annotations + +from collections.abc import Callable +from types import SimpleNamespace +from typing import Any + +import pandas as pd + +from fastflowtransform.core import relation_for +from fastflowtransform.executors.base import BaseExecutor + +SnapshotReadFn = Callable[[Any, str], pd.DataFrame] + +VF_COL = BaseExecutor.SNAPSHOT_VALID_FROM_COL +VT_COL = BaseExecutor.SNAPSHOT_VALID_TO_COL +IS_CUR_COL = BaseExecutor.SNAPSHOT_IS_CURRENT_COL +HASH_COL = BaseExecutor.SNAPSHOT_HASH_COL +UPD_META_COL = BaseExecutor.SNAPSHOT_UPDATED_AT_COL + + +# ── Node factories ────────────────────────────────────────────────────────── +def make_timestamp_snapshot_node( + name: str = "users_snapshot.ff", + *, + unique_key: str | list[str] = "id", + updated_at: str = "updated_at", +) -> SimpleNamespace: + """ + Minimal Node-like object for timestamp snapshots. + Works across all SQL executors (DuckDB, Postgres, Spark SQL, BigQuery, Snowflake). + """ + meta: dict[str, Any] = { + "materialized": "snapshot", + "strategy": "timestamp", + "unique_key": unique_key, + "updated_at": updated_at, + } + return SimpleNamespace( + name=name, + kind="sql", + path=f"models/{name}", + deps=["users_clean.ff"], + meta=meta, + ) + + +def make_check_snapshot_node( + name: str = "users_snapshot_check.ff", + *, + unique_key: str | list[str] = "id", + check_cols: list[str] | str = "value", + updated_at: str | None = None, +) -> SimpleNamespace: + """ + Minimal Node-like object for check-based snapshots. + """ + meta: dict[str, Any] = { + "materialized": "snapshot", + "strategy": "check", + "unique_key": unique_key, + "check_cols": check_cols, + } + if updated_at: + meta["updated_at"] = updated_at + + return SimpleNamespace( + name=name, + kind="sql", + path=f"models/{name}", + deps=["users_clean.ff"], + meta=meta, + ) + + +# ── Small utilities ──────────────────────────────────────────────────────── +def patch_render_sql(executor: Any, sql_text: str) -> None: + """ + Monkeypatch executor.render_sql(...) to always return `sql_text`. + Avoids depending on loader/REGISTRY in unit tests. + """ + + def _render(node: Any, env: Any, ref_resolver: Any, source_resolver: Any) -> str: + return sql_text + + executor.render_sql = _render # type: ignore[assignment] + + +def read_snapshot_relation(executor: Any, node_name: str, reader: SnapshotReadFn) -> pd.DataFrame: + rel = relation_for(node_name) + return reader(executor, rel) + + +# ── Generic scenarios / assertions ───────────────────────────────────────── +def scenario_timestamp_first_and_second_run( + executor: Any, + node: Any, + jinja_env: Any, + read_fn: SnapshotReadFn, + *, + sql_first: str, + sql_second: str, +) -> tuple[pd.DataFrame, pd.DataFrame]: + """ + Run a common timestamp snapshot scenario: + 1) first run with (id=1, value=a, updated_at=2024-01-01...) + 2) second run with (id=1, value=b, updated_at=2024-02-01...) + + Returns (df_after_first_run, df_after_second_run). + Performs structural assertions on the snapshot columns. + """ + # first run + patch_render_sql(executor, sql_first) + executor.run_snapshot_sql(node, jinja_env) + df1 = read_snapshot_relation(executor, node.name, read_fn) + + # basic structural checks + for col in ("id", "value", "updated_at", VF_COL, VT_COL, IS_CUR_COL, UPD_META_COL, HASH_COL): + assert col in df1.columns, f"missing column {col!r} on first snapshot run" + + assert len(df1) == 1, "first snapshot run should generate exactly one row (one business key)" + row = df1.iloc[0] + assert row["id"] == 1 + assert row["value"] == "a" + assert bool(row[IS_CUR_COL]) + assert pd.isna(row[VT_COL]) + # for timestamp strategy we expect valid_from == updated_at == updated_at_meta + assert str(row[VF_COL]) == str(row["updated_at"]) + assert str(row[UPD_META_COL]) == str(row["updated_at"]) + + # second run + patch_render_sql(executor, sql_second) + executor.run_snapshot_sql(node, jinja_env) + df2 = read_snapshot_relation(executor, node.name, read_fn) + + # two versions now + assert len(df2) == 2 + + cur = df2[df2[IS_CUR_COL] == True] # noqa: E712 + old = df2[df2[IS_CUR_COL] == False] # noqa: E712 + + assert len(cur) == 1 + assert len(old) == 1 + + cur_row = cur.iloc[0] + old_row = old.iloc[0] + + # current row: latest payload, open version + assert cur_row["id"] == 1 + assert cur_row["value"] == "b" + assert pd.isna(cur_row[VT_COL]) + assert str(cur_row[VF_COL]) == str(cur_row["updated_at"]) + assert str(cur_row[UPD_META_COL]) == str(cur_row["updated_at"]) + + # closed row: old payload, valid_to set + assert old_row["id"] == 1 + assert old_row["value"] == "a" + assert not bool(old_row[IS_CUR_COL]) + assert not pd.isna(old_row[VT_COL]) + assert str(old_row[VF_COL]) == str(old_row["updated_at"]) + + return df1, df2 + + +def scenario_snapshot_prune_keep_last( + executor: Any, + node: Any, + jinja_env: Any, + read_fn: SnapshotReadFn, + *, + sql_first: str, + sql_second: str, + unique_key: list[str] | None = None, +) -> None: + """ + Generic prune scenario: + - run timestamp snapshot twice (2 versions) + - dry-run prune keep_last=1 β†’ no change + - real prune keep_last=1 β†’ only latest version kept + """ + unique_key = unique_key or ["id"] + + _, _ = scenario_timestamp_first_and_second_run( + executor, node, jinja_env, read_fn, sql_first=sql_first, sql_second=sql_second + ) + df_before = read_snapshot_relation(executor, node.name, read_fn) + assert len(df_before) == 2 + + rel = relation_for(node.name) + + # dry-run + executor.snapshot_prune(rel, unique_key=unique_key, keep_last=1, dry_run=True) + df_dry = read_snapshot_relation(executor, node.name, read_fn) + assert len(df_dry) == 2 + + # real prune + executor.snapshot_prune(rel, unique_key=unique_key, keep_last=1, dry_run=False) + df_after = read_snapshot_relation(executor, node.name, read_fn) + assert len(df_after) == 1 + + row = df_after.iloc[0] + assert row["id"] == 1 + assert row["value"] == "b" + assert bool(row[IS_CUR_COL]) + assert pd.isna(row[VT_COL]) + + +def scenario_check_strategy_detects_changes( + executor: Any, + node: Any, + jinja_env: Any, + read_fn: SnapshotReadFn, + *, + sql_first: str, + sql_second: str, +) -> None: + """ + Check strategy scenario: + - first run: (id=1, value='alpha', updated_at=X) + - second run: same id/updated_at, but value='beta' + β†’ still must open new version based on check_cols hash + """ + # first run + patch_render_sql(executor, sql_first) + executor.run_snapshot_sql(node, jinja_env) + df1 = read_snapshot_relation(executor, node.name, read_fn) + + assert len(df1) == 1 + r1 = df1.iloc[0] + assert r1["value"] == "alpha" + assert bool(r1[IS_CUR_COL]) + # hash column must be non-null for check strategy + assert HASH_COL in df1.columns + assert pd.notna(r1[HASH_COL]) + + # second run with only check_cols changed + patch_render_sql(executor, sql_second) + executor.run_snapshot_sql(node, jinja_env) + df2 = read_snapshot_relation(executor, node.name, read_fn) + + assert len(df2) == 2 + + cur = df2[df2[IS_CUR_COL] == True] # noqa: E712 + old = df2[df2[IS_CUR_COL] == False] # noqa: E712 + + assert len(cur) == 1 + assert len(old) == 1 + + cur_row = cur.iloc[0] + old_row = old.iloc[0] + + assert cur_row["value"] == "beta" + assert pd.isna(cur_row[VT_COL]) + + assert old_row["value"] == "alpha" + assert not pd.isna(old_row[VT_COL]) + # valid_from must differ if a second version was created + assert str(cur_row[VF_COL]) != str(old_row[VF_COL]) diff --git a/tests/integration/examples/config.py b/tests/integration/examples/config.py index 2ab89fb..399d4d9 100644 --- a/tests/integration/examples/config.py +++ b/tests/integration/examples/config.py @@ -64,7 +64,7 @@ class ExampleConfig: env_by_engine={ "duckdb": "dev_duckdb", "postgres": "dev_postgres", - "databricks_spark": "dev_databricks", + "databricks_spark": "dev_databricks_parquet", }, spark_table_formats=["parquet", "delta", "iceberg"], ), @@ -95,7 +95,7 @@ class ExampleConfig: env_by_engine={ "duckdb": "dev_duckdb", "postgres": "dev_postgres", - "databricks_spark": "dev_databricks", + "databricks_spark": "dev_databricks_parquet", }, spark_table_formats=["parquet", "delta", "iceberg"], ), diff --git a/tests/integration/executors/bigquery/test_snapshots_bigquery_integration.py b/tests/integration/executors/bigquery/test_snapshots_bigquery_integration.py new file mode 100644 index 0000000..5048795 --- /dev/null +++ b/tests/integration/executors/bigquery/test_snapshots_bigquery_integration.py @@ -0,0 +1,144 @@ +# tests/unit/executors/bigquery/test_snapshots_bigquery_fake.py +from __future__ import annotations + +import pytest +from tests.common.snapshot_helpers import ( + make_check_snapshot_node, + make_timestamp_snapshot_node, + patch_render_sql, +) + +from fastflowtransform.core import relation_for +from fastflowtransform.executors.base import BaseExecutor + +VF_COL = BaseExecutor.SNAPSHOT_VALID_FROM_COL +VT_COL = BaseExecutor.SNAPSHOT_VALID_TO_COL +IS_CUR_COL = BaseExecutor.SNAPSHOT_IS_CURRENT_COL +HASH_COL = BaseExecutor.SNAPSHOT_HASH_COL +UPD_META_COL = BaseExecutor.SNAPSHOT_UPDATED_AT_COL + +# Simple SQL bodies - they're never actually executed by a real engine, +# we only inspect the resulting BigQuery SQL sent to the fake client. +SQL_TS_FIRST = """ +select + 1 as id, + 'a' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + +SQL_TS_SECOND = """ +select + 1 as id, + 'b' as value, + timestamp '2024-02-01 00:00:00' as updated_at +""" + +SQL_CHECK_FIRST = """ +select + 1 as id, + 'alpha' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + +SQL_CHECK_SECOND = """ +select + 1 as id, + 'beta' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + + +def _all_sql(executor) -> str: + """Helper: concatenate all SQL strings recorded on the fake client.""" + client = executor.client # BigQueryBaseExecutor attribute + sqls = [entry[0] for entry in getattr(client, "queries", [])] + return "\n".join(sqls) + + +@pytest.mark.bigquery +@pytest.mark.integration +def test_bigquery_snapshot_timestamp_emits_create_and_update(bq_executor_fake, jinja_env_bigquery): + """ + Smoke test for timestamp snapshots on BigQuery: + - first run: CREATE TABLE ... with snapshot columns + - second run: emits follow-up DML (INSERT/DELETE/MERGE) also referencing + snapshot metadata columns. + """ + ex = bq_executor_fake + node = make_timestamp_snapshot_node() + + # First run β†’ CREATE TABLE ... AS SELECT ... + patch_render_sql(ex, SQL_TS_FIRST) + ex.run_snapshot_sql(node, jinja_env_bigquery) + + sql_first = _all_sql(ex).upper() + assert "CREATE TABLE" in sql_first + # snapshot metadata columns should appear in the create + assert VF_COL.upper() in sql_first + assert VT_COL.upper() in sql_first + assert IS_CUR_COL.upper() in sql_first + assert UPD_META_COL.upper() in sql_first + + # Reset recorded queries for a cleaner second-assertion + ex.client.queries.clear() # type: ignore[attr-defined] + + # Second run β†’ incremental behaviour (no need to fully emulate the engine, + # we just check that BQ DML is generated and references snapshot cols) + patch_render_sql(ex, SQL_TS_SECOND) + ex.run_snapshot_sql(node, jinja_env_bigquery) + + sql_second = _all_sql(ex).upper() + # we expect some form of INSERT / DELETE / MERGE against the same table + assert relation_for(node.name).upper() in sql_second + assert VF_COL.upper() in sql_second + assert VT_COL.upper() in sql_second + assert IS_CUR_COL.upper() in sql_second + assert UPD_META_COL.upper() in sql_second + + +@pytest.mark.bigquery +@pytest.mark.integration +def test_bigquery_snapshot_check_strategy_has_hash_column(bq_executor_fake, jinja_env_bigquery): + """ + Check-strategy snapshots should generate a hash column in BigQuery SQL. + """ + ex = bq_executor_fake + node = make_check_snapshot_node() + + patch_render_sql(ex, SQL_CHECK_FIRST) + ex.run_snapshot_sql(node, jinja_env_bigquery) + + sql = _all_sql(ex).upper() + assert "CREATE TABLE" in sql + assert HASH_COL.upper() in sql # hash column present in schema / projection + + +@pytest.mark.bigquery +@pytest.mark.integration +def test_bigquery_snapshot_prune_emits_window_rank(bq_executor_fake, jinja_env_bigquery): + """ + snapshot_prune should emit a query with ROW_NUMBER() / PARTITION BY + over the business key(s). We don't emulate deletions; we only assert + that the right style of SQL is generated and that it doesn't crash. + """ + ex = bq_executor_fake + node = make_timestamp_snapshot_node() + + # First + second run just to ensure the table "exists" logically + patch_render_sql(ex, SQL_TS_FIRST) + ex.run_snapshot_sql(node, jinja_env_bigquery) + patch_render_sql(ex, SQL_TS_SECOND) + ex.run_snapshot_sql(node, jinja_env_bigquery) + + # Clear previous queries so we only see prune SQL + ex.client.queries.clear() # type: ignore[attr-defined] + + rel = relation_for(node.name) + ex.snapshot_prune(rel, unique_key=["id"], keep_last=1, dry_run=True) + + sql = _all_sql(ex).upper() + # Basic shape of the ranking / prune CTE: + assert "ROW_NUMBER" in sql + assert "OVER" in sql + assert "PARTITION BY" in sql + assert rel.upper() in sql diff --git a/tests/integration/executors/databricks_spark/test_snapshots_databricks_spark_integration.py b/tests/integration/executors/databricks_spark/test_snapshots_databricks_spark_integration.py new file mode 100644 index 0000000..4190a14 --- /dev/null +++ b/tests/integration/executors/databricks_spark/test_snapshots_databricks_spark_integration.py @@ -0,0 +1,111 @@ +# tests/integration/executors/test_snapshots_databricks_spark_integration.py +from __future__ import annotations + +from contextlib import suppress + +import pytest +from tests.common.snapshot_helpers import ( + make_check_snapshot_node, + make_timestamp_snapshot_node, + scenario_check_strategy_detects_changes, + scenario_snapshot_prune_keep_last, + scenario_timestamp_first_and_second_run, +) + +from fastflowtransform.core import relation_for +from fastflowtransform.executors.databricks_spark import DatabricksSparkExecutor + +# Spark-friendly SQL (very similar; Spark understands timestamp '...') +SQL_TS_FIRST = """ +select + 1 as id, + 'a' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + +SQL_TS_SECOND = """ +select + 1 as id, + 'b' as value, + timestamp '2024-02-01 00:00:00' as updated_at +""" + +SQL_CHECK_FIRST = """ +select + 1 as id, + 'alpha' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + +SQL_CHECK_SECOND = """ +select + 1 as id, + 'beta' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + + +def _reset_snapshot_table(executor, node_name: str) -> None: + """ + Spark keeps tables across tests (shared SparkSession + warehouse). + Ensure each snapshot scenario starts from a clean table so that + the first snapshot run is truly a 'first run'. + """ + rel = relation_for(node_name) + physical = executor._physical_identifier(rel) + with suppress(Exception): + executor.spark.sql(f"DROP TABLE IF EXISTS {physical}") + + +def _read_spark(ex: DatabricksSparkExecutor, relation: str): + physical = ex._physical_identifier(relation) + return ex.spark.table(physical).toPandas().sort_values(["id", ex.SNAPSHOT_VALID_FROM_COL]) + + +@pytest.mark.databricks_spark +@pytest.mark.integration +def test_spark_snapshot_timestamp_first_and_second_run(jinja_env, spark_exec): + node = make_timestamp_snapshot_node() + _reset_snapshot_table(spark_exec, node.name) + + scenario_timestamp_first_and_second_run( + executor=spark_exec, + node=node, + jinja_env=jinja_env, + read_fn=_read_spark, + sql_first=SQL_TS_FIRST, + sql_second=SQL_TS_SECOND, + ) + + +@pytest.mark.databricks_spark +@pytest.mark.integration +def test_spark_snapshot_prune_keep_last(jinja_env, spark_exec): + node = make_timestamp_snapshot_node() + _reset_snapshot_table(spark_exec, node.name) + + scenario_snapshot_prune_keep_last( + executor=spark_exec, + node=node, + jinja_env=jinja_env, + read_fn=_read_spark, + sql_first=SQL_TS_FIRST, + sql_second=SQL_TS_SECOND, + unique_key=["id"], + ) + + +@pytest.mark.databricks_spark +@pytest.mark.integration +def test_spark_snapshot_check_strategy(jinja_env, spark_exec): + node = make_check_snapshot_node() + _reset_snapshot_table(spark_exec, node.name) + + scenario_check_strategy_detects_changes( + executor=spark_exec, + node=node, + jinja_env=jinja_env, + read_fn=_read_spark, + sql_first=SQL_CHECK_FIRST, + sql_second=SQL_CHECK_SECOND, + ) diff --git a/tests/integration/executors/duckdb/test_snapshots_duckdb_integration.py b/tests/integration/executors/duckdb/test_snapshots_duckdb_integration.py new file mode 100644 index 0000000..1eed6d2 --- /dev/null +++ b/tests/integration/executors/duckdb/test_snapshots_duckdb_integration.py @@ -0,0 +1,97 @@ +# tests/integration/executors/test_snapshots_duckdb_integration.py +from __future__ import annotations + +import pandas as pd +import pytest +from tests.common.snapshot_helpers import ( + make_check_snapshot_node, + make_timestamp_snapshot_node, + scenario_check_strategy_detects_changes, + scenario_snapshot_prune_keep_last, + scenario_timestamp_first_and_second_run, +) + +from fastflowtransform.executors.duckdb import DuckExecutor + +# Common-ish SQL that DuckDB is happy with +SQL_TS_FIRST = """ +select + 1 as id, + 'a' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + +SQL_TS_SECOND = """ +select + 1 as id, + 'b' as value, + timestamp '2024-02-01 00:00:00' as updated_at +""" + +SQL_CHECK_FIRST = """ +select + 1 as id, + 'alpha' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + +SQL_CHECK_SECOND = """ +select + 1 as id, + 'beta' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + + +def _read_duckdb(ex: DuckExecutor, relation: str) -> pd.DataFrame: + # Order by id, valid_from for deterministic tests + return ex.con.execute(f'SELECT * FROM "{relation}" ORDER BY 1, 2').df() + + +@pytest.mark.duckdb +@pytest.mark.integration +def test_duckdb_snapshot_timestamp_first_and_second_run(jinja_env): + ex = DuckExecutor(db_path=":memory:") + node = make_timestamp_snapshot_node() + + scenario_timestamp_first_and_second_run( + executor=ex, + node=node, + jinja_env=jinja_env, + read_fn=_read_duckdb, + sql_first=SQL_TS_FIRST, + sql_second=SQL_TS_SECOND, + ) + + +@pytest.mark.duckdb +@pytest.mark.integration +def test_duckdb_snapshot_prune_keep_last(jinja_env): + ex = DuckExecutor(db_path=":memory:") + node = make_timestamp_snapshot_node() + + scenario_snapshot_prune_keep_last( + executor=ex, + node=node, + jinja_env=jinja_env, + read_fn=_read_duckdb, + sql_first=SQL_TS_FIRST, + sql_second=SQL_TS_SECOND, + unique_key=["id"], + ) + + +@pytest.mark.duckdb +@pytest.mark.integration +def test_duckdb_snapshot_check_strategy(jinja_env): + ex = DuckExecutor(db_path=":memory:") + node = make_check_snapshot_node() + + scenario_check_strategy_detects_changes( + executor=ex, + node=node, + jinja_env=jinja_env, + read_fn=_read_duckdb, + sql_first=SQL_CHECK_FIRST, + sql_second=SQL_CHECK_SECOND, + ) diff --git a/tests/integration/executors/postgres/test_snapshots_postgres_integration.py b/tests/integration/executors/postgres/test_snapshots_postgres_integration.py new file mode 100644 index 0000000..c697b57 --- /dev/null +++ b/tests/integration/executors/postgres/test_snapshots_postgres_integration.py @@ -0,0 +1,125 @@ +# tests/unit/test_snapshots_postgres.py +from __future__ import annotations + +import os + +import pandas as pd +import pytest +from sqlalchemy import text +from tests.common.snapshot_helpers import ( + make_check_snapshot_node, + make_timestamp_snapshot_node, + scenario_check_strategy_detects_changes, + scenario_snapshot_prune_keep_last, + scenario_timestamp_first_and_second_run, +) + +from fastflowtransform.core import relation_for +from fastflowtransform.executors.postgres import PostgresExecutor + +# Postgres-friendly SQL (same as DuckDB in practice) +SQL_TS_FIRST = """ +select + 1 as id, + 'a' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + +SQL_TS_SECOND = """ +select + 1 as id, + 'b' as value, + timestamp '2024-02-01 00:00:00' as updated_at +""" + +SQL_CHECK_FIRST = """ +select + 1 as id, + 'alpha' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + +SQL_CHECK_SECOND = """ +select + 1 as id, + 'beta' as value, + timestamp '2024-01-01 00:00:00' as updated_at +""" + + +def _reset_snapshot_table(executor: PostgresExecutor, node_name: str) -> None: + """ + Drop the snapshot table for this node so each test starts from a clean state. + """ + rel = relation_for(node_name) # e.g. "users_snapshot" + qrel = executor._qualified(rel) # schema-qualified name + + with executor.engine.begin() as conn: + conn.execute(text(f"DROP TABLE IF EXISTS {qrel} CASCADE")) + + +def _make_pg_executor() -> PostgresExecutor: + dsn = os.environ.get("FF_PG_DSN") + schema = os.environ.get("FF_PG_SCHEMA", "public") + if not dsn: + pytest.skip("FF_PG_DSN not set; skipping Postgres snapshot tests") + return PostgresExecutor(dsn=dsn, schema=schema) + + +def _read_pg(ex: PostgresExecutor, relation: str) -> pd.DataFrame: + qualified = ex._qualified(relation) + with ex.engine.begin() as conn: + ex._set_search_path(conn) + return pd.read_sql_query(text(f"select * from {qualified} order by 1, 2"), conn) + + +@pytest.mark.postgres +@pytest.mark.integration +def test_postgres_snapshot_timestamp_first_and_second_run(jinja_env): + ex = _make_pg_executor() + node = make_timestamp_snapshot_node() + _reset_snapshot_table(executor=ex, node_name=node.name) + + scenario_timestamp_first_and_second_run( + executor=ex, + node=node, + jinja_env=jinja_env, + read_fn=_read_pg, + sql_first=SQL_TS_FIRST, + sql_second=SQL_TS_SECOND, + ) + + +@pytest.mark.postgres +@pytest.mark.integration +def test_postgres_snapshot_prune_keep_last(jinja_env): + ex = _make_pg_executor() + node = make_timestamp_snapshot_node() + _reset_snapshot_table(executor=ex, node_name=node.name) + + scenario_snapshot_prune_keep_last( + executor=ex, + node=node, + jinja_env=jinja_env, + read_fn=_read_pg, + sql_first=SQL_TS_FIRST, + sql_second=SQL_TS_SECOND, + unique_key=["id"], + ) + + +@pytest.mark.postgres +@pytest.mark.integration +def test_postgres_snapshot_check_strategy(jinja_env): + ex = _make_pg_executor() + node = make_check_snapshot_node() + _reset_snapshot_table(executor=ex, node_name=node.name) + + scenario_check_strategy_detects_changes( + executor=ex, + node=node, + jinja_env=jinja_env, + read_fn=_read_pg, + sql_first=SQL_CHECK_FIRST, + sql_second=SQL_CHECK_SECOND, + ) diff --git a/tests/integration/executors/snowflake_snowpark/test_snapshots_snowflake_snowpark_integration.py b/tests/integration/executors/snowflake_snowpark/test_snapshots_snowflake_snowpark_integration.py new file mode 100644 index 0000000..4d8c57f --- /dev/null +++ b/tests/integration/executors/snowflake_snowpark/test_snapshots_snowflake_snowpark_integration.py @@ -0,0 +1,116 @@ +# tests/integration/executors/snowflake_snowpark/test_snapshots_snowflake_snowpark_integration.py +from __future__ import annotations + +from typing import Any + +import pytest +from jinja2 import Environment +from tests.common.snapshot_helpers import ( + make_check_snapshot_node, + make_timestamp_snapshot_node, + patch_render_sql, +) + +from fastflowtransform.executors.base import BaseExecutor + +SQL_TS_FIRST = """ +select 1 as id, + 'a'::string as value, + to_timestamp_ntz('2024-01-01 00:00:00') as updated_at +""" + +SQL_TS_SECOND = """ +select 1 as id, + 'b'::string as value, + to_timestamp_ntz('2024-02-01 00:00:00') as updated_at +""" + +SQL_CHECK_FIRST = """ +select 1 as id, + 'alpha'::string as value, + to_timestamp_ntz('2024-01-01 00:00:00') as updated_at +""" + +SQL_CHECK_SECOND = """ +select 1 as id, + 'beta'::string as value, + to_timestamp_ntz('2024-01-01 00:00:00') as updated_at +""" + + +def _all_sql(executor: Any) -> str: + """ + Concatenate all SQL statements issued by the fake Snowflake session. + """ + sess = getattr(executor, "session", None) + calls = getattr(sess, "sql_calls", []) + return "\n".join(calls) + + +@pytest.mark.snowflake_snowpark +@pytest.mark.integration +def test_snowflake_timestamp_snapshot_emits_create_table( + snowflake_executor_fake, jinja_env: Environment +): + ex = snowflake_executor_fake + node = make_timestamp_snapshot_node() + + patch_render_sql(ex, SQL_TS_FIRST) + ex.run_snapshot_sql(node, jinja_env) + + sql = _all_sql(ex).upper() + assert "CREATE OR REPLACE TABLE" in sql + assert BaseExecutor.SNAPSHOT_VALID_FROM_COL.upper() in sql + assert BaseExecutor.SNAPSHOT_VALID_TO_COL.upper() in sql + assert BaseExecutor.SNAPSHOT_IS_CURRENT_COL.upper() in sql + assert BaseExecutor.SNAPSHOT_UPDATED_AT_COL.upper() in sql + # timestamp strategy should not rely on a hash column + # but we allow the implementation to include it if desired + + +@pytest.mark.snowflake_snowpark +@pytest.mark.integration +def test_snowflake_check_snapshot_emits_hash_column( + snowflake_executor_fake, jinja_env: Environment +): + ex = snowflake_executor_fake + node = make_check_snapshot_node() + + patch_render_sql(ex, SQL_CHECK_FIRST) + ex.run_snapshot_sql(node, jinja_env) + + sql = _all_sql(ex).upper() + # first run should create the table with a hash column in the projection + assert "CREATE OR REPLACE TABLE" in sql + assert BaseExecutor.SNAPSHOT_HASH_COL.upper() in sql + + +@pytest.mark.snowflake_snowpark +@pytest.mark.integration +def test_snowflake_snapshot_prune_emits_row_number_window( + snowflake_executor_fake, jinja_env: Environment +): + """ + snapshot_prune should emit a query with ROW_NUMBER() OVER / PARTITION BY / + ORDER BY over the business key(s). We don't emulate deletions; we only + assert that the right *style* of SQL is generated and that it doesn't crash. + """ + ex = snowflake_executor_fake + node = make_timestamp_snapshot_node() + + # First/second run to "logically" create the snapshot table; in the fake + # session this just issues SQL, which is fine for this test. + patch_render_sql(ex, SQL_TS_FIRST) + ex.run_snapshot_sql(node, jinja_env) + + patch_render_sql(ex, SQL_TS_SECOND) + ex.run_snapshot_sql(node, jinja_env) + + # Now call prune; we only care that it emits the right window pattern. + ex.snapshot_prune("users_snapshot", unique_key=["id"], keep_last=1, dry_run=False) + + sql = _all_sql(ex).upper() + assert "ROW_NUMBER()" in sql + assert "OVER" in sql + assert "PARTITION BY" in sql + assert "ORDER BY" in sql diff --git a/tests/unit/executors/test_snowflake_snowpark_exec.py b/tests/unit/executors/test_snowflake_snowpark_exec.py index 34584d2..d68994b 100644 --- a/tests/unit/executors/test_snowflake_snowpark_exec.py +++ b/tests/unit/executors/test_snowflake_snowpark_exec.py @@ -176,7 +176,7 @@ def sf_exec(monkeypatch): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_init_sets_db_schema_and_con(sf_exec): assert sf_exec.database == "DB1" assert sf_exec.schema == "SC1" @@ -185,14 +185,14 @@ def test_init_sets_db_schema_and_con(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_q_and_qualified(sf_exec): assert sf_exec._q("x") == '"x"' assert sf_exec._qualified("TBL") == "DB1.SC1.TBL" @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_read_relation_calls_session_table(sf_exec): node = Node(name="n", kind="sql", path=Path(".")) df = sf_exec._read_relation("MY_TBL", node, deps=[]) @@ -202,7 +202,7 @@ def test_read_relation_calls_session_table(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_materialize_relation_happy(sf_exec, monkeypatch): called: dict[str, str] = {} @@ -244,7 +244,7 @@ def _fake_to_df(*cols: str): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_materialize_relation_raises_on_non_frame(sf_exec): node = Node(name="m", kind="python", path=Path(".")) with pytest.raises(TypeError): @@ -252,7 +252,7 @@ def test_materialize_relation_raises_on_non_frame(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_create_view_over_table_issues_sql(sf_exec): node = Node(name="x", kind="sql", path=Path(".")) sf_exec._create_view_over_table("V_USERS", "USERS", node) @@ -263,7 +263,7 @@ def test_create_view_over_table_issues_sql(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_validate_required_single_df_ok(sf_exec): SNDF = sf_mod.SNDF df = SNDF(sf_exec.session) # type: ignore[call-arg] @@ -277,7 +277,7 @@ def test_validate_required_single_df_ok(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_validate_required_single_df_missing(sf_exec): SNDF = sf_mod.SNDF df = SNDF(sf_exec.session) # type: ignore[call-arg] @@ -297,7 +297,7 @@ def test_validate_required_single_df_missing(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_validate_required_multi_input_ok(sf_exec): df1 = FakeSnowparkDataFrame(sf_exec.session, cols=["ID", "NAME"]) df2 = FakeSnowparkDataFrame(sf_exec.session, cols=["USER_ID", "ORDER_ID"]) @@ -309,7 +309,7 @@ def test_validate_required_multi_input_ok(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_validate_required_multi_input_missing_key(sf_exec): df1 = FakeSnowparkDataFrame(sf_exec.session, cols=["ID", "NAME"]) with pytest.raises(ValueError) as exc: @@ -322,7 +322,7 @@ def test_validate_required_multi_input_missing_key(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_validate_required_is_case_insensitive(sf_exec): SNDF = sf_mod.SNDF df = SNDF(sf_exec.session) # type: ignore[call-arg] @@ -338,14 +338,14 @@ def test_validate_required_is_case_insensitive(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_columns_of(sf_exec): df = FakeSnowparkDataFrame(sf_exec.session, cols=["A", "B"]) assert sf_exec._columns_of(df) == ["A", "B"] @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_is_frame(sf_exec): df = sf.DataFrame(sf_exec.session) df.schema = SimpleNamespace(names=["ID"]) # type: ignore[attr-defined] @@ -355,20 +355,20 @@ def test_is_frame(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_frame_name(sf_exec): assert sf_exec._frame_name() == "Snowpark" @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_format_relation_for_ref(sf_exec): r = sf_exec._format_relation_for_ref("my_model") assert r == "DB1.SC1.my_model" @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_format_source_reference_happy(sf_exec): cfg = {"identifier": "SRC_TBL", "database": "DBX", "schema": "RAW"} ref = sf_exec._format_source_reference(cfg, "src", "tbl") @@ -376,7 +376,7 @@ def test_format_source_reference_happy(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_format_source_reference_raises_on_location(sf_exec): cfg = {"identifier": "X", "location": "s3://foo"} with pytest.raises(NotImplementedError): @@ -384,14 +384,14 @@ def test_format_source_reference_raises_on_location(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_format_source_reference_raises_on_missing_identifier(sf_exec): with pytest.raises(KeyError): sf_exec._format_source_reference({}, "src", "tbl") @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_create_or_replace_view_calls_session_sql(sf_exec): node = Node(name="x", kind="sql", path=Path(".")) sf_exec._create_or_replace_view('"DB1"."SC1"."V1"', "SELECT 1", node) @@ -399,7 +399,7 @@ def test_create_or_replace_view_calls_session_sql(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_create_or_replace_table_calls_session_sql(sf_exec): node = Node(name="x", kind="sql", path=Path(".")) sf_exec._create_or_replace_table('"DB1"."SC1"."T1"', "SELECT 1", node) @@ -407,7 +407,7 @@ def test_create_or_replace_table_calls_session_sql(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_create_or_replace_view_from_table_calls_session_sql(sf_exec): node = Node(name="x", kind="sql", path=Path(".")) sf_exec._create_or_replace_view_from_table("V1", "T1", node) @@ -418,7 +418,7 @@ def test_create_or_replace_view_from_table_calls_session_sql(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_on_node_built_calls_meta(monkeypatch, sf_exec): called = {"ensure": 0, "upsert": 0} @@ -438,7 +438,7 @@ def fake_upsert(ex, name, rel, fp, eng): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_on_node_built_raises_on_meta_error(monkeypatch, sf_exec): def bad_upsert(ex, name, rel, fp, eng): raise RuntimeError("meta boom") @@ -452,7 +452,7 @@ def bad_upsert(ex, name, rel, fp, eng): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_exists_relation_true(sf_exec, monkeypatch): # our fake session already returns one row for information_schema.tables ok = sf_exec.exists_relation("SOME_TBL") @@ -460,7 +460,7 @@ def test_exists_relation_true(sf_exec, monkeypatch): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_exists_relation_false_on_error(sf_exec, monkeypatch): def boom(sql: str): raise RuntimeError("bad") @@ -471,7 +471,7 @@ def boom(sql: str): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_create_table_as_strips_semicolon(sf_exec): sf_exec.session.sql_calls.clear() sf_exec.create_table_as("DST", "SELECT 1;") @@ -482,7 +482,7 @@ def test_create_table_as_strips_semicolon(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_incremental_insert_strips_semicolon(sf_exec): sf_exec.session.sql_calls.clear() sf_exec.incremental_insert("DST", "SELECT 1;") @@ -492,7 +492,7 @@ def test_incremental_insert_strips_semicolon(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_incremental_merge_builds_two_statements(sf_exec): sf_exec.session.sql_calls.clear() @@ -512,7 +512,7 @@ def test_incremental_merge_builds_two_statements(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_alter_table_sync_schema_adds_missing(sf_exec): sf_exec.session.sql_calls.clear() sf_exec.alter_table_sync_schema("EXISTING", "SELECT 1 AS ID, 2 AS NEW_COL") @@ -521,7 +521,7 @@ def test_alter_table_sync_schema_adds_missing(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_alter_table_sync_schema_noop_on_unknown_mode(sf_exec): sf_exec.session.sql_calls.clear() sf_exec.alter_table_sync_schema("EXISTING", "SELECT 1", mode="replace_all") # unknown @@ -530,7 +530,7 @@ def test_alter_table_sync_schema_noop_on_unknown_mode(sf_exec): @pytest.mark.unit -@pytest.mark.snowflake +@pytest.mark.snowflake_snowpark def test_sfcursorshim_execute_returns_rows(sf_exec): class FakeRow: """Mimic Snowpark Row: has attributes *and* asDict().""" diff --git a/uv.lock b/uv.lock index 63bdea9..79b1fef 100644 --- a/uv.lock +++ b/uv.lock @@ -733,7 +733,7 @@ wheels = [ [[package]] name = "fastflowtransform" -version = "0.6.2" +version = "0.6.4" source = { editable = "." } dependencies = [ { name = "duckdb" }, From 076b5748f7017660666b1a0bd5e05e08a0d57001 Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Fri, 21 Nov 2025 12:56:07 +0100 Subject: [PATCH 2/3] Unittest fixes --- tests/common/fixtures.py | 2 +- .../executors/test_snowflake_snowpark_exec.py | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/common/fixtures.py b/tests/common/fixtures.py index 1f29b4e..ccfaef7 100644 --- a/tests/common/fixtures.py +++ b/tests/common/fixtures.py @@ -444,7 +444,7 @@ def snowflake_executor_fake() -> Any: session = FakeSnowflakeSession() ex.session = session if _SFCursorShim is not None: - ex.con = _SFCursorShim(session) + ex.con = _SFCursorShim(session) # type: ignore[arg-type] else: # Cheap fallback if for some reason the shim isn't available ex.con = types.SimpleNamespace(execute=lambda sql, params=None: None) diff --git a/tests/unit/executors/test_snowflake_snowpark_exec.py b/tests/unit/executors/test_snowflake_snowpark_exec.py index d68994b..e018c78 100644 --- a/tests/unit/executors/test_snowflake_snowpark_exec.py +++ b/tests/unit/executors/test_snowflake_snowpark_exec.py @@ -68,6 +68,10 @@ def toDF(self, *cols: str) -> FakeSnowparkDataFrame: return FakeSnowparkDataFrame(self._session, self._sql, list(cols)) +# Make sure the executor treats FakeSnowparkDataFrame as SNDF when optional deps are missing. +sf_mod.SNDF = FakeSnowparkDataFrame # type: ignore[attr-defined] + + class FakeSession: """ Minimal session mock: @@ -265,8 +269,8 @@ def test_create_view_over_table_issues_sql(sf_exec): @pytest.mark.unit @pytest.mark.snowflake_snowpark def test_validate_required_single_df_ok(sf_exec): - SNDF = sf_mod.SNDF - df = SNDF(sf_exec.session) # type: ignore[call-arg] + df = FakeSnowparkDataFrame(sf_exec.session, cols=["ID", "NAME", "AGE"]) + df.schema = SimpleNamespace(names=["ID", "NAME", "AGE"]) # type: ignore[attr-defined] sf_exec._validate_required( @@ -279,8 +283,8 @@ def test_validate_required_single_df_ok(sf_exec): @pytest.mark.unit @pytest.mark.snowflake_snowpark def test_validate_required_single_df_missing(sf_exec): - SNDF = sf_mod.SNDF - df = SNDF(sf_exec.session) # type: ignore[call-arg] + df = FakeSnowparkDataFrame(sf_exec.session, cols=["ID", "NAME", "AGE"]) + df.schema = SimpleNamespace(names=["ID"]) # type: ignore[attr-defined] with pytest.raises(ValueError) as exc: @@ -324,8 +328,8 @@ def test_validate_required_multi_input_missing_key(sf_exec): @pytest.mark.unit @pytest.mark.snowflake_snowpark def test_validate_required_is_case_insensitive(sf_exec): - SNDF = sf_mod.SNDF - df = SNDF(sf_exec.session) # type: ignore[call-arg] + df = FakeSnowparkDataFrame(sf_exec.session, cols=["ID", "NAME", "AGE"]) + # Snowflake-style upper-case physical columns df.schema = SimpleNamespace(names=["USER_ID", "EMAIL"]) # type: ignore[attr-defined] From 15e520757c8c3b9e8f9d02544f1360f9bd3e7b06 Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Fri, 21 Nov 2025 12:58:54 +0100 Subject: [PATCH 3/3] Fixed ci.yml --- .github/workflows/ci.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1941c2a..2508222 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,9 +75,6 @@ jobs: - name: bigquery extra: bigquery_bf marker: "unit and bigquery" - - name: snowflake - extra: snowflake - marker: "unit and snowflake" steps: - name: Checkout