Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions docs/Data_Quality_Tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ The following values are currently supported for `type`:
- `reconcile_diff_within`
- `reconcile_coverage`

In addition, you can register **custom tests** (Python or SQL) with any logical
name (e.g. `min_positive_share`, `no_future_orders`) and use that name in
`project.yml → tests:`. See [Custom DQ Tests (Python & SQL)](#custom-dq-tests-python--sql).

## Usage Overview

```yaml
Expand Down Expand Up @@ -234,6 +238,160 @@ Reconciliation checks compare aggregates or keys across two relations. Their con
* `target_where` *(str, optional)* — filter applied to the target.
* **Failure:** Reports the number of missing keys.

## Custom DQ Tests (Python & SQL)

FastFlowTransform lets you plug in your own test logic and still reuse:

* `project.yml → tests:` configuration,
* the same `fft test` command,
* the standard summary output and exit codes.

Custom tests come in two flavours:

1. **Python-based tests** registered via `@dq_test(...)` in `tests/**/*.ff.py`
2. **SQL-based tests** defined in `tests/**/*.ff.sql` with a small `{{ config(...) }}` header.

Both kinds participate in the same Pydantic validation pipeline as built-in
tests: built-ins use the strict `ProjectTestConfig` union, while custom tests
are validated against a generated parameter model derived from their `config(...)`.

### Python-based custom tests

Create a file like `tests/dq/min_positive_share.ff.py`:

```python
from __future__ import annotations

from typing import Any

from fastflowtransform.decorators import dq_test
from fastflowtransform.testing import base as testing


@dq_test("min_positive_share")
def min_positive_share(
con: Any,
table: str,
column: str | None,
params: dict[str, Any],
) -> tuple[bool, str | None, str | None]:
"""
Custom DQ test: require that at least `min_share` of rows have column > 0.

Parameters (from project.yml → tests → params):
- min_share: float in [0,1], e.g. 0.75
- where: optional filter (string) to restrict the population
"""
if column is None:
example = f"select count(*) from {table} where <column> > 0"
return False, "min_positive_share requires a 'column' parameter", example

# For project.yml tests the user payload lives under params["params"]
cfg: dict[str, Any] = params.get("params") or params
min_share: float = cfg["min_share"]
where: str | None = cfg.get("where")

where_clause = f" where {where}" if where else ""

total_sql = f"select count(*) from {table}{where_clause}"
if where:
pos_sql = f"select count(*) from {table}{where_clause} and {column} > 0"
else:
pos_sql = f"select count(*) from {table} where {column} > 0"

total = testing._scalar(con, total_sql)
positives = testing._scalar(con, pos_sql)

example_sql = f"{pos_sql}; -- positives\n{total_sql}; -- total"

if not total:
return False, f"min_positive_share: table {table} is empty", example_sql

share = float(positives or 0) / float(total)
if share < min_share:
msg = (
f"min_positive_share failed: positive share {share:.4f} "
f"< required {min_share:.4f} "
f"({positives} of {total} rows have {column} > 0)"
)
return False, msg, example_sql

return True, None, example_sql
```

The decorator `@dq_test("min_positive_share")` registers the function under that
logical name. You can then reference it from `project.yml`:

```yaml
tests:
- type: min_positive_share
table: orders
column: amount
params:
min_share: 0.75
where: "amount <> 0"
tags: [batch]
```

Notes:

* The function **must** return `(ok: bool, message: str | None, example_sql: str | None)`.
* The signature is the same as built-in runners: `(con, table, column, params)`.
* For project-level tests, the full YAML dict is passed as `params`; by
convention, custom tests read their own options from `params["params"]`.

### SQL-based custom tests

Create a file like `tests/dq/no_future_orders.ff.sql`:

```sql
{{ config(
type="no_future_orders",
params=["where"]
) }}

-- Custom DQ test: fail if any row has a timestamp in the future.
--
-- Conventions:
-- - {{ table }} : table name (e.g. "orders")
-- - {{ column }} : timestamp column (e.g. "order_ts")
-- - {{ where }} : optional filter, passed via params["where"]

select count(*) as failures
from {{ table }}
where {{ column }} > current_timestamp
{%- if where %} and ({{ where }}){%- endif %}
```

The `config(...)` header declares:

* `type`: logical name used in `project.yml → tests:`.
* `params`: list of allowed parameter keys for this test.

FFT turns this into a small Pydantic model and validates your YAML config
against it. Any unknown key under `params:` results in a clear error at
**config-load time**, before executing SQL.

Hook it up in `project.yml`:

```yaml
tests:
- type: no_future_orders
table: orders
column: order_ts
params:
where: "amount <> 0"
tags: [batch]
```

At runtime FFT:

1. discovers `tests/**/*.ff.sql`,
2. registers each file as a test of the given `type`,
3. validates `params:` using the `params=[...]` schema,
4. renders and executes the SQL as a “violation count” query
(`0` = pass, `> 0` = fail).

## Severity & Tags

* `severity: error` (default) makes failures stop the test run with exit code 1.
Expand Down
156 changes: 155 additions & 1 deletion docs/examples/DQ_Demo.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Data Quality Demo Project

The **Data Quality Demo** shows how to use **all built-in FFT data quality tests** on a small, understandable model:
The **Data Quality Demo** shows how to use **all built-in FFT data quality tests** plus **custom DQ tests (Python & SQL)** on a small, understandable model:

* Column checks:

Expand All @@ -18,6 +18,11 @@ The **Data Quality Demo** shows how to use **all built-in FFT data quality tests
* `reconcile_diff_within`
* `reconcile_coverage`

* Custom tests (demo):

* `min_positive_share` (Python-based)
* `no_future_orders` (SQL-based)

It uses a simple **customers / orders / mart** setup so you can see exactly what each test does and how it fails when something goes wrong.

---
Expand Down Expand Up @@ -66,6 +71,11 @@ examples/dq_demo/
orders.ff.sql
marts/
mart_orders_agg.ff.sql

tests/
dq/
min_positive_share.ff.py
no_future_orders.ff.sql
```

### Seeds
Expand Down Expand Up @@ -218,6 +228,23 @@ tests:
column: last_order_ts
max_delay_minutes: 100000000
tags: [example:dq_demo, batch]

# 7) Custom Python test: ensure at least a given share of positive amounts
- type: min_positive_share
table: orders
column: amount
params:
min_share: 0.75
where: "amount <> 0"
tags: [example:dq_demo, batch]

# 8) Custom SQL test: no future orders allowed
- type: no_future_orders
table: orders
column: order_ts
params:
where: "amount <> 0"
tags: [example:dq_demo, batch]
```

### Cross-table reconciliations
Expand Down Expand Up @@ -276,6 +303,133 @@ This set of tests touches **all available test types** and ties directly back to

---

## Custom DQ tests (Python & SQL)

The demo also shows how to define **custom data quality tests** that integrate with:

* the `project.yml → tests:` block,
* the `fft test` CLI,
* and the same summary output as built-in tests.

### Python-based test: `min_positive_share`

File: `examples/dq_demo/tests/dq/min_positive_share.ff.py`

```python
from __future__ import annotations

from typing import Any

from fastflowtransform.decorators import dq_test
from fastflowtransform.testing import base as testing


@dq_test("min_positive_share")
def min_positive_share(
con: Any,
table: str,
column: str | None,
params: dict[str, Any],
) -> tuple[bool, str | None, str | None]:
"""
Custom DQ test: require that at least `min_share` of rows have column > 0.

Parameters (from project.yml → tests → params):
- min_share: float in [0,1], e.g. 0.75
- where: optional filter (string) to restrict the population
"""
if column is None:
example = f"select count(*) from {table} where <column> > 0"
return False, "min_positive_share requires a 'column' parameter", example

# Params come from project.yml under `params:`
cfg: dict[str, Any] = params.get("params") or params # project.yml wrapper
min_share: float = cfg["min_share"]
where: str | None = cfg.get("where")

where_clause = f" where {where}" if where else ""

total_sql = f"select count(*) from {table}{where_clause}"
if where:
pos_sql = f"select count(*) from {table}{where_clause} and {column} > 0"
else:
pos_sql = f"select count(*) from {table} where {column} > 0"

total = testing._scalar(con, total_sql)
positives = testing._scalar(con, pos_sql)

example_sql = f"{pos_sql}; -- positives\n{total_sql}; -- total"

if not total:
return False, f"min_positive_share: table {table} is empty", example_sql

share = float(positives or 0) / float(total)
if share < min_share:
msg = (
f"min_positive_share failed: positive share {share:.4f} "
f"< required {min_share:.4f} "
f"({positives} of {total} rows have {column} > 0)"
)
return False, msg, example_sql

return True, None, example_sql
````

This test is wired up from `project.yml` like this:

```yaml
- type: min_positive_share
table: orders
column: amount
params:
min_share: 0.75
where: "amount <> 0"
tags: [example:dq_demo, batch]
```

### SQL-based test: `no_future_orders`

File: `examples/dq_demo/tests/dq/no_future_orders.ff.sql`

```sql
{{ config(
type="no_future_orders",
params=["where"]
) }}

-- Custom DQ test: fail if any row has a timestamp in the future.
--
-- Conventions:
-- - {{ table }} : table name (e.g. "orders")
-- - {{ column }} : timestamp column (e.g. "order_ts")
-- - {{ where }} : optional filter, passed via params["where"]

select count(*) as failures
from {{ table }}
where {{ column }} > current_timestamp
{%- if where %} and ({{ where }}){%- endif %}
```

And the corresponding `project.yml` test:

```yaml
- type: no_future_orders
table: orders
column: order_ts
params:
where: "amount <> 0"
tags: [example:dq_demo, batch]
```

At runtime:

* The SQL file is discovered under `tests/**/*.ff.sql`.
* `{{ config(...) }}` tells FFT the logical `type` and allowed `params`.
* `fft test` validates your `params:` from `project.yml` against this schema and
then executes the rendered SQL as a “violation count” query (`0` = pass, `>0` = fail).

---

## Running the demo

Assuming you are in the repo root and using DuckDB as a starting point:
Expand Down
2 changes: 2 additions & 0 deletions examples/cache_demo/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ BQ_FRAME ?= bigframes
PROJECT ?= .
UV ?= uv
DB ?= .local/cache_demo.duckdb
HTTP_TIMEOUT ?= 60

ifeq ($(ENGINE),duckdb)
PROFILE_ENV = dev_duckdb
Expand Down Expand Up @@ -39,6 +40,7 @@ ifeq ($(ENGINE),bigquery)
BASE_ENV := $(BASE_ENV) FF_ENGINE_VARIANT=$(BQ_FRAME)
endif
RUN_ENV = $(BASE_ENV)
RUN_ENV := $(RUN_ENV) FF_HTTP_TIMEOUT=$(HTTP_TIMEOUT)

SELECT_ALL = --select tag:example:cache_demo --select tag:$(ENGINE_TAG)

Expand Down
15 changes: 15 additions & 0 deletions examples/dq_demo/project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,18 @@ tests:
target:
table: customers
key: "customer_id"

# --- Custom tests --------------------------------------------------
- type: no_future_orders
table: orders
column: order_ts
where: "order_ts is not null"
tags: [example:dq_demo, batch]

- type: min_positive_share
table: orders
column: amount
params:
min_share: 0.75
where: "amount <> 0"
tags: [example:dq_demo, batch]
Loading