Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/Data_Quality_Tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The following values are currently supported for `type`:
- `non_negative_sum`
- `row_count_between`
- `freshness`
- `relationships`
- `reconcile_equal`
- `reconcile_ratio_within`
- `reconcile_diff_within`
Expand Down Expand Up @@ -179,6 +180,31 @@ These checks operate on a single table (optionally filtered with `where:`). Unle

This is straightforward for DuckDB/Postgres; other engines may need adaptations.

---

### `relationships`

* **Purpose:** Validate referential integrity between a child table and a parent lookup table (foreign-key style).
* **Parameters:**

* `column` *(str, optional)* — child key column; defaults to the schema-YAML column where the test is defined.
* `field` *(str, optional)* — explicit override for the child key column (same as `column` when omitted).
* `to` *(str, required)* — parent relation. Accepts plain table names or `ref('model_name')` (the same syntax as SQL models).
* `to_field` *(str, default `id`)* — parent column to match against.
* `where` *(str, optional)* — filter applied to the child table before validating.
* `to_where` *(str, optional)* — filter applied to the parent table.
* **Failure:** Reports how many orphaned keys exist and prints the anti-join SQL:

```sql
with child as (select user_id as k from fact_orders),
parent as (select id as k from dim_users)
select count(*) from child c
left join parent p on c.k = p.k
where p.k is null
```

Using `ref('dim_users')` inside `to:` automatically resolves to the physical relation (and handles schema/database prefixes for you).

## Cross-Table Reconciliations

Reconciliation checks compare aggregates or keys across two relations. Their configuration accepts dictionaries describing the left/right side expressions or keys. The top-level `table`/`column` fields are used only for display and grouping; the actual queries are defined via the nested dictionaries.
Expand Down
12 changes: 11 additions & 1 deletion docs/YAML_Tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ models:
- accepted_values:
values: ["a@example.com","b@example.com","c@gmail.com"]
severity: warn
- name: user_id
tests:
- relationships:
to: ref('dim_users')
field: user_id
to_field: id
````

### Severities
Expand Down Expand Up @@ -51,4 +57,8 @@ Totals
✓ passed: 2
✗ failed: 1
! warnings: 1
```
```

### Relationships (Foreign Keys)

Use the `relationships` test to assert that every value in a child column exists in a parent table. When declared under a column, the column name becomes the default `field`. The `to` parameter may point to another model via `ref('model_name')`, giving you fully-qualified database identifiers that match the current target profile. Optional filters (`where`, `to_where`) let you scope the child or parent sides independently.
10 changes: 10 additions & 0 deletions examples/dq_demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,13 @@ From this directory:
Artifacts:
- Target metadata: `.fastflowtransform/target/{manifest.json,run_results.json,catalog.xml}`
- DAG HTML: `site/dag/index.html`

## Featured checks

- Single-table tests (`not_null`, `unique`, `row_count_between`, `freshness`, …) live in `project.yml` and give quick feedback on staging tables.
- Cross-table reconciliations (`reconcile_*`) compare `orders`, `customers`, and the mart to keep aggregates in sync.
- `relationships` (tagged `fk`) enforces referential integrity between `orders.customer_id` and `customers.customer_id`. Run it in isolation with:

```sh
fft test . --env dev --select tag:fk
```
7 changes: 7 additions & 0 deletions examples/dq_demo/project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ tests:
max_rows: 100
tags: [example:dq_demo, batch]

- type: relationships
table: orders
column: customer_id
to: "ref('customers.ff')"
to_field: customer_id
tags: [example:dq_demo, fk]

# Large max_delay_minutes so the example typically passes;
# adjust down in real projects to enforce freshness SLAs.
- type: freshness
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "fastflowtransform"
version = "0.6.5"
version = "0.6.6"
description = "Python framework for SQL & Python data transformation, ETL pipelines, and dbt-style data modeling"
readme = "README.md"
license = { text = "Apache-2.0" }
Expand Down
93 changes: 87 additions & 6 deletions src/fastflowtransform/cli/test_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

import os
import re
import time
from collections.abc import Callable, Iterable, Mapping
from dataclasses import dataclass
Expand Down Expand Up @@ -49,6 +50,21 @@ class DQResult:
example_sql: str | None = None


_REF_CALL_RE = re.compile(r"^ref\(\s*(['\"])([^'\"]+)\1\s*\)$")


def _registry_env() -> Any | None:
env = getattr(REGISTRY, "env", None)
if env is not None:
return env
if hasattr(REGISTRY, "get_env"):
try:
return REGISTRY.get_env()
except Exception:
return None
return None


def _is_snapshot_model(node: Any) -> bool:
"""
Return True if this node is a snapshot model (materialized='snapshot').
Expand Down Expand Up @@ -172,6 +188,55 @@ def _fmt_reconcile_side(side: Any, executor: Any) -> Any:
return side_fmt


def _resolve_relationship_target_table(value: Any, executor: Any) -> tuple[str | None, str | None]:
"""Return (display_value, table_for_exec) for relationships.to."""
if value is None:
return None, None
if isinstance(value, str):
stripped = value.strip()
if not stripped:
return "", ""
m = _REF_CALL_RE.match(stripped)
if m:
env = _registry_env()
if env is None:
raise typer.BadParameter(
"ref('...') requires a loaded project/environment (registry env missing)."
)
if not hasattr(executor, "_resolve_ref"):
raise typer.BadParameter("Current executor cannot resolve ref('...') in tests.")
ref_target = m.group(2)
try:
resolved = executor._resolve_ref(ref_target, env)
except Exception as exc:
raise typer.BadParameter(f"Failed to resolve ref('{ref_target}'): {exc}") from exc
return stripped, resolved
return stripped, _fmt_table(stripped, executor)
literal = str(value)
return literal, _fmt_table(literal, executor)


def _prepare_relationship_params(
params: dict[str, Any], executor: Any
) -> tuple[dict[str, Any], str | None]:
"""Normalize params for relationships tests; returns new params + formatted parent label."""
normalized = dict(params or {})
target_display: str | None = None
if "to" in normalized:
display, resolved = _resolve_relationship_target_table(normalized.get("to"), executor)
if resolved:
normalized["_to_relation"] = resolved
if display or resolved:
target_display = display or resolved
normalized["_to_display"] = target_display
return normalized, target_display


def _relationships_display(child: str, parent: str | None) -> str:
parent_label = parent or "<target>"
return f"{child} ⇒ {parent_label}"


def _prepare_test_from_spec(
t: TestSpec, executor: Any
) -> tuple[str, Any, Severity, dict[str, Any], Any, Any]:
Expand All @@ -181,17 +246,22 @@ def _prepare_test_from_spec(
kind = t.type
col = t.column
severity: Severity = t.severity
params: dict[str, Any] = t.params or {}
params: dict[str, Any] = dict(t.params or {})

display_table = t.table
table_for_exec = _fmt_table(t.table, executor)
if not isinstance(table_for_exec, str) or not table_for_exec:
raise typer.BadParameter("Missing or invalid 'table' in test config")
display_table = table_for_exec

if kind.startswith("reconcile_"):
params = dict(params) # copy so we don't mutate original
for key in ("left", "right", "source", "target"):
side = params.get(key)
if isinstance(side, dict):
params[key] = _fmt_reconcile_side(side, executor)
elif kind == "relationships":
params, parent_display = _prepare_relationship_params(params, executor)
display_table = _relationships_display(table_for_exec, parent_display)

return kind, col, severity, params, display_table, table_for_exec

Expand Down Expand Up @@ -236,6 +306,12 @@ def _prepare_test_from_mapping(
side = params.get(key)
if isinstance(side, dict):
params[key] = _fmt_reconcile_side(side, executor)
elif kind == "relationships":
table_for_exec = _fmt_table(t.get("table"), executor)
if not isinstance(table_for_exec, str) or not table_for_exec:
raise typer.BadParameter("Missing or invalid 'table' in test config")
params, parent_display = _prepare_relationship_params(params, executor)
display_table = _relationships_display(table_for_exec, parent_display)
else:
table_for_exec = _fmt_table(t.get("table"), executor)
if not isinstance(table_for_exec, str) or not table_for_exec:
Expand Down Expand Up @@ -357,17 +433,22 @@ def _format_params_for_summary(kind: str, params: dict[str, Any]) -> str:
"""Format a short, readable parameter snippet for the summary line."""
if not params:
return ""
hidden_keys = {"type", "table", "severity", "tags", "name"}

def _skip(key: str) -> bool:
return key in hidden_keys or key.startswith("_")

# Common keys first for stable display
keys = []
if "column" in params:
if "column" in params and not _skip("column"):
keys.append("column")
if "values" in params:
if "values" in params and not _skip("values"):
keys.append("values")
if "where" in params:
if "where" in params and not _skip("where"):
keys.append("where")
# Add remaining keys deterministically
for k in sorted(params.keys()):
if k not in keys and k not in ("type", "table", "severity", "tags"):
if k not in keys and not _skip(k):
keys.append(k)
parts: list[str] = []
for k in keys:
Expand Down
48 changes: 40 additions & 8 deletions src/fastflowtransform/testing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,23 +306,20 @@ def _compute_delay_minutes(

# Spark / Databricks: unix_timestamp over timestamps
sql_spark = (
"select "
f"(unix_timestamp(current_timestamp()) - unix_timestamp(max({ts_col}))) / 60.0 "
f"select (unix_timestamp(current_timestamp()) - unix_timestamp(max({ts_col}))) / 60.0 "
f"as delay_min from {table}"
)

# BigQuery: TIMESTAMP_DIFF returns integer minutes; keep float compatibility
sql_bigquery = (
"select cast("
f"TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), max({ts_col}), MINUTE) as float64"
") as delay_min "
f"from {table}"
f"select cast(TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), max({ts_col}), MINUTE) as float64) "
f"as delay_min from {table}"
)

# Snowflake: DATEDIFF on minutes; cast to float to align with other engines
sql_snowflake = (
"select DATEDIFF('minute', max({ts_col}), CURRENT_TIMESTAMP())::float "
f"as delay_min from {table}"
f"select DATEDIFF('minute', max({ts_col}), "
f"CURRENT_TIMESTAMP())::float as delay_min from {table}"
)

delay: float | None = None
Expand Down Expand Up @@ -528,3 +525,38 @@ def reconcile_coverage(
dprint("reconcile_coverage:", sql, "=>", missing)
if missing and missing != 0:
raise TestFailure(f"Coverage failed: {missing} source keys missing in target")


def relationships(
con: Any,
table: str,
field: str,
to_table: str,
to_field: str,
*,
where: str | None = None,
to_where: str | None = None,
) -> None:
"""
Assert that every value from child `table.field` exists in parent `to_table.to_field`.
Implemented as an anti-join count; failures report the number of missing keys.
"""
child_where = f" where {where}" if where else ""
parent_where = f" where {to_where}" if to_where else ""
sql = f"""
with child as (select {field} as k from {table}{child_where}),
parent as (select {to_field} as k from {to_table}{parent_where})
select count(*) from child c
left join parent p on c.k = p.k
where p.k is null
"""
try:
missing = _scalar(con, sql)
except Exception as e:
raise _wrap_db_error("relationships", table, field, sql, e) from e
dprint("relationships:", sql, "=>", missing)
if missing and missing != 0:
raise TestFailure(
f"[relationships] {table}.{field} has {missing} orphan key(s) "
f"missing in {to_table}.{to_field}"
)
59 changes: 59 additions & 0 deletions src/fastflowtransform/testing/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,30 @@ def _example_coverage_sql(
""".strip()


def _example_relationship_sql(
child_table: str,
child_field: str,
parent_table: str | None,
parent_field: str,
child_where: str | None,
parent_where: str | None,
) -> str:
"""Render an example SQL snippet for relationships (foreign key) checks."""
ct = child_table or "<child_table>"
cf = child_field or "<child_field>"
pt = parent_table or "<parent_table>"
pf = parent_field or "<parent_field>"
cw = f" where {child_where}" if child_where else ""
pw = f" where {parent_where}" if parent_where else ""
return f"""
with child as (select {cf} as k from {ct}{cw}),
parent as (select {pf} as k from {pt}{pw})
select count(*) from child c
left join parent p on c.k = p.k
where p.k is null
""".strip()


# ---------------------------------------------------------------------------
# Reconcile tests
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -384,6 +408,40 @@ def run_reconcile_coverage(
return False, str(e), example


def run_relationships(
con: Any, table: str, column: str | None, params: dict[str, Any]
) -> tuple[bool, str | None, str | None]:
"""Runner for testing.relationships (FK-style anti join)."""
field = params.get("field") or column
to_table = params.get("_to_relation") or params.get("to")
to_field = params.get("to_field") or "id"
where = params.get("where")
to_where = params.get("to_where")

example = _example_relationship_sql(
table, field or "<field>", to_table, to_field, where, to_where
)

if not field:
return False, "missing required parameter: field (or column)", example
if not to_table:
return False, "missing required parameter: to", example

try:
testing.relationships(
con,
table=table,
field=field,
to_table=to_table,
to_field=to_field,
where=where,
to_where=to_where,
)
return True, None, example
except testing.TestFailure as e:
return False, str(e), example


# ---------------------------------------------------------------------------
# Optional param-schema registry for custom tests
# ---------------------------------------------------------------------------
Expand All @@ -407,6 +465,7 @@ def run_reconcile_coverage(
"non_negative_sum": run_non_negative_sum,
"row_count_between": run_row_count_between,
"freshness": run_freshness,
"relationships": run_relationships,
# Reconcile tests
"reconcile_equal": run_reconcile_equal,
"reconcile_ratio_within": run_reconcile_ratio_within,
Expand Down
Loading