Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ jobs:
- name: bigquery
extra: bigquery_bf
marker: "unit and bigquery"
- name: snowflake
extra: snowflake
marker: "unit and snowflake"

steps:
- name: Checkout
Expand Down
2 changes: 1 addition & 1 deletion Makefile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ utest-duckdb:

# Lint & format helpers
fmt:
$(UV) run ruff format src tests
$(UV) run ruff format

lint:
$(UV) run ruff check src tests --no-cache
Expand Down
2 changes: 1 addition & 1 deletion docs/Data_Quality_Tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The following values are currently supported for `type`:

In addition, you can register **custom tests** (Python or SQL) with any logical
name (e.g. `min_positive_share`, `no_future_orders`) and use that name in
`project.yml → tests:`. See [Custom DQ Tests (Python & SQL)](#custom-dq-tests-python--sql).
`project.yml → tests:`. See [Custom DQ Tests (Python & SQL)](#custom-dq-tests-python-sql).

## Usage Overview

Expand Down
266 changes: 80 additions & 186 deletions docs/examples/Incremental_Demo.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ incremental_demo/
.env
.env.dev_duckdb
.env.dev_postgres
.env.dev_databricks_parquet
.env.dev_databricks_delta
.env.dev_databricks_iceberg
.env.dev_bigquery_pandas
Expand Down Expand Up @@ -383,151 +384,69 @@ The executor uses the `meta.incremental` / `meta.unique_key` / `meta.updated_at`

## Delta & Iceberg variants (Databricks / Spark)

In addition to the “regular” incremental models, the demo also includes **Delta Lake** and **Iceberg** variants
that shows how to:
The Databricks/Spark executor can materialize the same incremental models as **managed parquet**,
**Delta Lake**, or **Iceberg** tables. Instead of wiring storage paths in `project.yml`, each table
format gets its own Databricks profile with a dedicated database and warehouse.

- route a model to **Delta tables** via `project.yml`
- reuse the same incremental pattern, but with a **Delta-backed** table on Databricks/Spark
- keep Parquet and Delta models side-by-side in the same project
### Managed storage per format

This is optional and only relevant for the `databricks_spark` engine.

---

### Storage configuration for the Delta / Iceberg models

In `project.yml`, the Delta variant gets its own storage entry, separate from the Parquet fact table:
`profiles.yml` ships with three out-of-the-box profiles:

```yaml
models:
storage:
# Existing Parquet fact table
fct_events_sql_inline:
path: ".local/spark/fct_events_sql_inline"
format: parquet

# 🔹 Delta-based fact table (Spark/Databricks only)
fct_events_sql_inline_delta:
path: ".local/spark_delta/fct_events_sql_inline"
format: delta

# ❄️ Iceberg-based fact table (Spark 4 / Databricks only)
fct_events_sql_inline_iceberg:
# Points into the Iceberg warehouse; must match your Iceberg catalog config
path: ".local/iceberg_warehouse/incremental_demo/fct_events_sql_inline"
format: iceberg
````

Notes:

* The key `fct_events_sql_inline_delta` must match the **model name**.
* `format: delta` tells the Databricks/Spark executor to create `USING DELTA LOCATION ...`.
* The path is different from the Parquet path so artifacts don’t clash.

---

### Delta fact model

The Delta fact model is a close sibling of `fct_events_sql_inline.ff.sql`, but:

* is tagged only for the Databricks/Spark engine
* is configured for incremental **merge** with a `unique_key` + `updated_at` column

Example (conceptual) model:

```sql
-- models/common/fct_events_sql_inline_delta.ff.sql

{{ config(
materialized='table',
tags=[
'example:incremental_demo',
'kind:incremental',
'engine:databricks_spark',
],
meta={
'incremental': True,
'unique_key': ['event_id'],
'updated_at': 'updated_at',
'delta': {
'sql': "
with base as (
select event_id, updated_at, value
from {{ ref('events_base.ff') }}
)
select
event_id,
updated_at,
value
from base
where updated_at > (
select coalesce(max(updated_at), timestamp '1970-01-01 00:00:00')
from {{ this }}
)
"
},
},
) }}

-- canonical full-select (used for docs / full-refresh)
select
event_id,
updated_at,
value
from {{ ref('events_base.ff') }};
dev_databricks_parquet:
engine: databricks_spark
databricks_spark:
database: incremental_demo_parquet
warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_parquet"

dev_databricks_delta:
engine: databricks_spark
databricks_spark:
database: incremental_demo_delta
warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_delta"

dev_databricks_iceberg:
engine: databricks_spark
databricks_spark:
database: incremental_demo_iceberg
warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_iceberg"
table_format: iceberg
extra_conf:
spark.sql.catalog.iceberg: org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type: hadoop
spark.sql.catalog.iceberg.warehouse: "file://{{ project_dir() }}/.local/iceberg_warehouse"
```

What happens:

* On the **first run**, the engine sees no existing table and does a full materialization
(a Delta table at `.local/spark_delta/fct_events_sql_inline`).
* On **subsequent runs**, the executor uses the `delta.sql` query as the **incremental delta** and:

* attempts a `MERGE INTO` for Delta tables, or
* falls back to a full-refresh strategy if MERGE is not supported.
Because every format has its own schema/warehouse, you can flip `FF_DBR_TABLE_FORMAT` or swap
profiles without worrying about leftover Parquet metadata interfering with a Delta run (and vice
versa).

---

### Running the Delta variant
### Running the Spark formats

Once your Databricks/Spark profile is configured (e.g. `dev_databricks` in `profiles.yml` and `.env.dev_databricks`),
you can run the Delta model like any other:
Pick the `.env.dev_databricks_*` file that matches your target format, export it, and run the demo:

```bash
# From the repo root
cd examples/incremental_demo

# Seed
FFT_ACTIVE_ENV=dev_databricks fft seed .

# Run only the Delta variant
FFT_ACTIVE_ENV=dev_databricks fft run . \
--select fct_events_sql_inline_delta.ff \
--select tag:engine:databricks_spark

# Or include it in the general incremental demo selection
FFT_ACTIVE_ENV=dev_databricks fft run . \
--select tag:example:incremental_demo \
--select tag:engine:databricks_spark
```

Optionally, you can add a small `not_null` test to `project.yml` to verify the Delta model:

```yaml
tests:
- type: not_null
table: fct_events_sql_inline_delta
column: event_id
tags: [batch, delta]
```
# Parquet tables (default)
set -a; source .env.dev_databricks_parquet; set +a
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft run . \
--select tag:example:incremental_demo --select tag:engine:databricks_spark

Then run:
# Delta Lake tables
set -a; source .env.dev_databricks_delta; set +a
FFT_ACTIVE_ENV=dev_databricks_delta FF_DBR_TABLE_FORMAT=delta fft run . \
--select tag:example:incremental_demo --select tag:engine:databricks_spark

```bash
FFT_ACTIVE_ENV=dev_databricks fft test . --select tag:delta
# Iceberg tables
set -a; source .env.dev_databricks_iceberg; set +a
FFT_ACTIVE_ENV=dev_databricks_iceberg FF_DBR_TABLE_FORMAT=iceberg fft run . \
--select tag:example:incremental_demo --select tag:engine:databricks_spark
```

to validate the Delta-backed incremental table specifically.
`FF_DBR_TABLE_FORMAT` is optional when the profile already sets `databricks_spark.table_format`, but
passing it explicitly makes the intent clear in logs and CI.

---

Expand Down Expand Up @@ -605,101 +524,76 @@ Make sure `.env.dev_snowflake` sets the required `FF_SF_*` variables and install
### Databricks Spark

```bash
FFT_ACTIVE_ENV=dev_databricks fft seed .
FFT_ACTIVE_ENV=dev_databricks fft run . \
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft seed .
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft run . \
--select tag:example:incremental_demo --select tag:engine:databricks_spark
FFT_ACTIVE_ENV=dev_databricks fft test . \
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft test . \
--select tag:example:incremental_demo
````

### Databricks Spark (parquet vs Delta)

You can run the incremental demo on Databricks/Spark against either **parquet** or **Delta** tables.

FFT reads the desired table format from the `FF_DBR_TABLE_FORMAT` environment variable, which overrides
`databricks_spark.table_format` from `profiles.yml`.

When `FF_DBR_TABLE_FORMAT=delta`, the Databricks/Spark executor automatically wires Delta Lake into the
SparkSession (downloads the Maven artifact via `delta-spark`, adds
`spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension`, and sets
`spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog` unless you already
overrode those settings). No extra `spark-submit --conf` flags are needed—just ensure the
`delta-spark >= 4.0` Python package is installed.

From the repo root:

```bash
cd examples/incremental_demo
````
Each format has its own profile/database (`dev_databricks_parquet` → `incremental_demo_parquet`,
`dev_databricks_delta` → `incremental_demo_delta`), so cleanup and reruns never reuse stale metadata.
`FF_DBR_TABLE_FORMAT` still overrides `databricks_spark.table_format` if you want to switch formats
without changing profiles.

Run with **parquet** tables (default):

```bash
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks fft seed .
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks fft run . \
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft seed .
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft run . \
--select tag:example:incremental_demo --select tag:engine:databricks_spark
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks fft test . \
FF_DBR_TABLE_FORMAT=parquet FFT_ACTIVE_ENV=dev_databricks_parquet fft test . \
--select tag:example:incremental_demo
```

Run with **Delta** tables:

```bash
FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks fft seed .
FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks fft run . \
FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks_delta fft seed .
FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks_delta fft run . \
--select tag:example:incremental_demo --select tag:engine:databricks_spark
FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks fft test . \
FF_DBR_TABLE_FORMAT=delta FFT_ACTIVE_ENV=dev_databricks_delta fft test . \
--select tag:example:incremental_demo
```

This way you can switch between parquet and Delta just by changing the `FF_DBR_TABLE_FORMAT`
environment variable, without touching the models or project.yml.

Adjust environment names to match your `profiles.yml`.
`delta-spark >= 4.0` must be installed locally so the executor can wire Delta extensions into
SparkSession automatically (the profile/CLI already sets the required Spark configs).

### Databricks Spark (Iceberg / Spark 4+)

If you are on Spark 4 / Databricks with Iceberg support, you can also run the incremental demo
purely against Iceberg tables using a dedicated profile (for example `dev_databricks_iceberg`).

That profile typically:

* uses `engine: databricks_spark`
* sets `databricks_spark.table_format: iceberg`
* configures an Iceberg catalog via `extra_conf`, for example:

models:
storage:
# Example warehouse location, adjust as needed
fct_events_sql_inline_iceberg:
path: ".local/iceberg_warehouse/incremental_demo/fct_events_sql_inline"
format: iceberg

and in the profile (profiles.yml) something like:

dev_databricks_iceberg:
engine: databricks_spark
databricks_spark:
master: "local[*]"
app_name: "incremental_demo"
warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse"
extra_conf:
spark.sql.catalog.iceberg: org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type: hadoop
spark.sql.catalog.iceberg.warehouse: "file:///{{ project_dir() }}/.local/iceberg_warehouse"
purely against Iceberg tables using `dev_databricks_iceberg`. That profile:

```yaml
dev_databricks_iceberg:
engine: databricks_spark
databricks_spark:
database: incremental_demo_iceberg
warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse_iceberg"
table_format: iceberg
extra_conf:
spark.jars.packages: org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0
spark.sql.catalog.iceberg: org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type: hadoop
spark.sql.catalog.iceberg.warehouse: "file://{{ project_dir() }}/.local/iceberg_warehouse"
```

From the repo root:

cd examples/incremental_demo

Run seeds and models against Iceberg:

FFT_ACTIVE_ENV=dev_databricks_iceberg fft seed .
FF_DBR_TABLE_FORMAT=iceberg FFT_ACTIVE_ENV=dev_databricks_iceberg fft seed .

FFT_ACTIVE_ENV=dev_databricks_iceberg fft run . \
FF_DBR_TABLE_FORMAT=iceberg FFT_ACTIVE_ENV=dev_databricks_iceberg fft run . \
--select tag:example:incremental_demo --select tag:engine:databricks_spark

FFT_ACTIVE_ENV=dev_databricks_iceberg fft test . \
FF_DBR_TABLE_FORMAT=iceberg FFT_ACTIVE_ENV=dev_databricks_iceberg fft test . \
--select tag:example:incremental_demo

Under this profile, all `ref()` / `source()` calls in Spark SQL and Python models are resolved
Expand Down
Loading