diff --git a/docs/Data_Quality_Tests.md b/docs/Data_Quality_Tests.md index 393ee5c..4f74cf8 100644 --- a/docs/Data_Quality_Tests.md +++ b/docs/Data_Quality_Tests.md @@ -13,6 +13,7 @@ The following values are currently supported for `type`: - `non_negative_sum` - `row_count_between` - `freshness` +- `relationships` - `reconcile_equal` - `reconcile_ratio_within` - `reconcile_diff_within` @@ -179,6 +180,31 @@ These checks operate on a single table (optionally filtered with `where:`). Unle This is straightforward for DuckDB/Postgres; other engines may need adaptations. +--- + +### `relationships` + +* **Purpose:** Validate referential integrity between a child table and a parent lookup table (foreign-key style). +* **Parameters:** + + * `column` *(str, optional)* — child key column; defaults to the schema-YAML column where the test is defined. + * `field` *(str, optional)* — explicit override for the child key column (same as `column` when omitted). + * `to` *(str, required)* — parent relation. Accepts plain table names or `ref('model_name')` (the same syntax as SQL models). + * `to_field` *(str, default `id`)* — parent column to match against. + * `where` *(str, optional)* — filter applied to the child table before validating. + * `to_where` *(str, optional)* — filter applied to the parent table. +* **Failure:** Reports how many orphaned keys exist and prints the anti-join SQL: + + ```sql + with child as (select user_id as k from fact_orders), + parent as (select id as k from dim_users) + select count(*) from child c + left join parent p on c.k = p.k + where p.k is null + ``` + + Using `ref('dim_users')` inside `to:` automatically resolves to the physical relation (and handles schema/database prefixes for you). + ## Cross-Table Reconciliations Reconciliation checks compare aggregates or keys across two relations. Their configuration accepts dictionaries describing the left/right side expressions or keys. The top-level `table`/`column` fields are used only for display and grouping; the actual queries are defined via the nested dictionaries. diff --git a/docs/YAML_Tests.md b/docs/YAML_Tests.md index a97f3a4..7eba47f 100644 --- a/docs/YAML_Tests.md +++ b/docs/YAML_Tests.md @@ -21,6 +21,12 @@ models: - accepted_values: values: ["a@example.com","b@example.com","c@gmail.com"] severity: warn + - name: user_id + tests: + - relationships: + to: ref('dim_users') + field: user_id + to_field: id ```` ### Severities @@ -51,4 +57,8 @@ Totals ✓ passed: 2 ✗ failed: 1 ! warnings: 1 -``` \ No newline at end of file +``` + +### Relationships (Foreign Keys) + +Use the `relationships` test to assert that every value in a child column exists in a parent table. When declared under a column, the column name becomes the default `field`. The `to` parameter may point to another model via `ref('model_name')`, giving you fully-qualified database identifiers that match the current target profile. Optional filters (`where`, `to_where`) let you scope the child or parent sides independently. diff --git a/examples/dq_demo/README.md b/examples/dq_demo/README.md index b271774..3fe99ca 100644 --- a/examples/dq_demo/README.md +++ b/examples/dq_demo/README.md @@ -25,3 +25,13 @@ From this directory: Artifacts: - Target metadata: `.fastflowtransform/target/{manifest.json,run_results.json,catalog.xml}` - DAG HTML: `site/dag/index.html` + +## Featured checks + +- Single-table tests (`not_null`, `unique`, `row_count_between`, `freshness`, …) live in `project.yml` and give quick feedback on staging tables. +- Cross-table reconciliations (`reconcile_*`) compare `orders`, `customers`, and the mart to keep aggregates in sync. +- `relationships` (tagged `fk`) enforces referential integrity between `orders.customer_id` and `customers.customer_id`. Run it in isolation with: + + ```sh + fft test . --env dev --select tag:fk + ``` diff --git a/examples/dq_demo/project.yml b/examples/dq_demo/project.yml index 8806dd4..63cef31 100644 --- a/examples/dq_demo/project.yml +++ b/examples/dq_demo/project.yml @@ -51,6 +51,13 @@ tests: max_rows: 100 tags: [example:dq_demo, batch] + - type: relationships + table: orders + column: customer_id + to: "ref('customers.ff')" + to_field: customer_id + tags: [example:dq_demo, fk] + # Large max_delay_minutes so the example typically passes; # adjust down in real projects to enforce freshness SLAs. - type: freshness diff --git a/pyproject.toml b/pyproject.toml index d7a6f62..5b41b8f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "fastflowtransform" -version = "0.6.5" +version = "0.6.6" description = "Python framework for SQL & Python data transformation, ETL pipelines, and dbt-style data modeling" readme = "README.md" license = { text = "Apache-2.0" } diff --git a/src/fastflowtransform/cli/test_cmd.py b/src/fastflowtransform/cli/test_cmd.py index 382d0ea..3261d22 100644 --- a/src/fastflowtransform/cli/test_cmd.py +++ b/src/fastflowtransform/cli/test_cmd.py @@ -2,6 +2,7 @@ from __future__ import annotations import os +import re import time from collections.abc import Callable, Iterable, Mapping from dataclasses import dataclass @@ -49,6 +50,21 @@ class DQResult: example_sql: str | None = None +_REF_CALL_RE = re.compile(r"^ref\(\s*(['\"])([^'\"]+)\1\s*\)$") + + +def _registry_env() -> Any | None: + env = getattr(REGISTRY, "env", None) + if env is not None: + return env + if hasattr(REGISTRY, "get_env"): + try: + return REGISTRY.get_env() + except Exception: + return None + return None + + def _is_snapshot_model(node: Any) -> bool: """ Return True if this node is a snapshot model (materialized='snapshot'). @@ -172,6 +188,55 @@ def _fmt_reconcile_side(side: Any, executor: Any) -> Any: return side_fmt +def _resolve_relationship_target_table(value: Any, executor: Any) -> tuple[str | None, str | None]: + """Return (display_value, table_for_exec) for relationships.to.""" + if value is None: + return None, None + if isinstance(value, str): + stripped = value.strip() + if not stripped: + return "", "" + m = _REF_CALL_RE.match(stripped) + if m: + env = _registry_env() + if env is None: + raise typer.BadParameter( + "ref('...') requires a loaded project/environment (registry env missing)." + ) + if not hasattr(executor, "_resolve_ref"): + raise typer.BadParameter("Current executor cannot resolve ref('...') in tests.") + ref_target = m.group(2) + try: + resolved = executor._resolve_ref(ref_target, env) + except Exception as exc: + raise typer.BadParameter(f"Failed to resolve ref('{ref_target}'): {exc}") from exc + return stripped, resolved + return stripped, _fmt_table(stripped, executor) + literal = str(value) + return literal, _fmt_table(literal, executor) + + +def _prepare_relationship_params( + params: dict[str, Any], executor: Any +) -> tuple[dict[str, Any], str | None]: + """Normalize params for relationships tests; returns new params + formatted parent label.""" + normalized = dict(params or {}) + target_display: str | None = None + if "to" in normalized: + display, resolved = _resolve_relationship_target_table(normalized.get("to"), executor) + if resolved: + normalized["_to_relation"] = resolved + if display or resolved: + target_display = display or resolved + normalized["_to_display"] = target_display + return normalized, target_display + + +def _relationships_display(child: str, parent: str | None) -> str: + parent_label = parent or "" + return f"{child} ⇒ {parent_label}" + + def _prepare_test_from_spec( t: TestSpec, executor: Any ) -> tuple[str, Any, Severity, dict[str, Any], Any, Any]: @@ -181,10 +246,12 @@ def _prepare_test_from_spec( kind = t.type col = t.column severity: Severity = t.severity - params: dict[str, Any] = t.params or {} + params: dict[str, Any] = dict(t.params or {}) - display_table = t.table table_for_exec = _fmt_table(t.table, executor) + if not isinstance(table_for_exec, str) or not table_for_exec: + raise typer.BadParameter("Missing or invalid 'table' in test config") + display_table = table_for_exec if kind.startswith("reconcile_"): params = dict(params) # copy so we don't mutate original @@ -192,6 +259,9 @@ def _prepare_test_from_spec( side = params.get(key) if isinstance(side, dict): params[key] = _fmt_reconcile_side(side, executor) + elif kind == "relationships": + params, parent_display = _prepare_relationship_params(params, executor) + display_table = _relationships_display(table_for_exec, parent_display) return kind, col, severity, params, display_table, table_for_exec @@ -236,6 +306,12 @@ def _prepare_test_from_mapping( side = params.get(key) if isinstance(side, dict): params[key] = _fmt_reconcile_side(side, executor) + elif kind == "relationships": + table_for_exec = _fmt_table(t.get("table"), executor) + if not isinstance(table_for_exec, str) or not table_for_exec: + raise typer.BadParameter("Missing or invalid 'table' in test config") + params, parent_display = _prepare_relationship_params(params, executor) + display_table = _relationships_display(table_for_exec, parent_display) else: table_for_exec = _fmt_table(t.get("table"), executor) if not isinstance(table_for_exec, str) or not table_for_exec: @@ -357,17 +433,22 @@ def _format_params_for_summary(kind: str, params: dict[str, Any]) -> str: """Format a short, readable parameter snippet for the summary line.""" if not params: return "" + hidden_keys = {"type", "table", "severity", "tags", "name"} + + def _skip(key: str) -> bool: + return key in hidden_keys or key.startswith("_") + # Common keys first for stable display keys = [] - if "column" in params: + if "column" in params and not _skip("column"): keys.append("column") - if "values" in params: + if "values" in params and not _skip("values"): keys.append("values") - if "where" in params: + if "where" in params and not _skip("where"): keys.append("where") # Add remaining keys deterministically for k in sorted(params.keys()): - if k not in keys and k not in ("type", "table", "severity", "tags"): + if k not in keys and not _skip(k): keys.append(k) parts: list[str] = [] for k in keys: diff --git a/src/fastflowtransform/testing/base.py b/src/fastflowtransform/testing/base.py index 7916716..25f71a3 100644 --- a/src/fastflowtransform/testing/base.py +++ b/src/fastflowtransform/testing/base.py @@ -306,23 +306,20 @@ def _compute_delay_minutes( # Spark / Databricks: unix_timestamp over timestamps sql_spark = ( - "select " - f"(unix_timestamp(current_timestamp()) - unix_timestamp(max({ts_col}))) / 60.0 " + f"select (unix_timestamp(current_timestamp()) - unix_timestamp(max({ts_col}))) / 60.0 " f"as delay_min from {table}" ) # BigQuery: TIMESTAMP_DIFF returns integer minutes; keep float compatibility sql_bigquery = ( - "select cast(" - f"TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), max({ts_col}), MINUTE) as float64" - ") as delay_min " - f"from {table}" + f"select cast(TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), max({ts_col}), MINUTE) as float64) " + f"as delay_min from {table}" ) # Snowflake: DATEDIFF on minutes; cast to float to align with other engines sql_snowflake = ( - "select DATEDIFF('minute', max({ts_col}), CURRENT_TIMESTAMP())::float " - f"as delay_min from {table}" + f"select DATEDIFF('minute', max({ts_col}), " + f"CURRENT_TIMESTAMP())::float as delay_min from {table}" ) delay: float | None = None @@ -528,3 +525,38 @@ def reconcile_coverage( dprint("reconcile_coverage:", sql, "=>", missing) if missing and missing != 0: raise TestFailure(f"Coverage failed: {missing} source keys missing in target") + + +def relationships( + con: Any, + table: str, + field: str, + to_table: str, + to_field: str, + *, + where: str | None = None, + to_where: str | None = None, +) -> None: + """ + Assert that every value from child `table.field` exists in parent `to_table.to_field`. + Implemented as an anti-join count; failures report the number of missing keys. + """ + child_where = f" where {where}" if where else "" + parent_where = f" where {to_where}" if to_where else "" + sql = f""" + with child as (select {field} as k from {table}{child_where}), + parent as (select {to_field} as k from {to_table}{parent_where}) + select count(*) from child c + left join parent p on c.k = p.k + where p.k is null + """ + try: + missing = _scalar(con, sql) + except Exception as e: + raise _wrap_db_error("relationships", table, field, sql, e) from e + dprint("relationships:", sql, "=>", missing) + if missing and missing != 0: + raise TestFailure( + f"[relationships] {table}.{field} has {missing} orphan key(s) " + f"missing in {to_table}.{to_field}" + ) diff --git a/src/fastflowtransform/testing/registry.py b/src/fastflowtransform/testing/registry.py index fbb2653..950e4fc 100644 --- a/src/fastflowtransform/testing/registry.py +++ b/src/fastflowtransform/testing/registry.py @@ -261,6 +261,30 @@ def _example_coverage_sql( """.strip() +def _example_relationship_sql( + child_table: str, + child_field: str, + parent_table: str | None, + parent_field: str, + child_where: str | None, + parent_where: str | None, +) -> str: + """Render an example SQL snippet for relationships (foreign key) checks.""" + ct = child_table or "" + cf = child_field or "" + pt = parent_table or "" + pf = parent_field or "" + cw = f" where {child_where}" if child_where else "" + pw = f" where {parent_where}" if parent_where else "" + return f""" +with child as (select {cf} as k from {ct}{cw}), + parent as (select {pf} as k from {pt}{pw}) +select count(*) from child c +left join parent p on c.k = p.k +where p.k is null +""".strip() + + # --------------------------------------------------------------------------- # Reconcile tests # --------------------------------------------------------------------------- @@ -384,6 +408,40 @@ def run_reconcile_coverage( return False, str(e), example +def run_relationships( + con: Any, table: str, column: str | None, params: dict[str, Any] +) -> tuple[bool, str | None, str | None]: + """Runner for testing.relationships (FK-style anti join).""" + field = params.get("field") or column + to_table = params.get("_to_relation") or params.get("to") + to_field = params.get("to_field") or "id" + where = params.get("where") + to_where = params.get("to_where") + + example = _example_relationship_sql( + table, field or "", to_table, to_field, where, to_where + ) + + if not field: + return False, "missing required parameter: field (or column)", example + if not to_table: + return False, "missing required parameter: to", example + + try: + testing.relationships( + con, + table=table, + field=field, + to_table=to_table, + to_field=to_field, + where=where, + to_where=to_where, + ) + return True, None, example + except testing.TestFailure as e: + return False, str(e), example + + # --------------------------------------------------------------------------- # Optional param-schema registry for custom tests # --------------------------------------------------------------------------- @@ -407,6 +465,7 @@ def run_reconcile_coverage( "non_negative_sum": run_non_negative_sum, "row_count_between": run_row_count_between, "freshness": run_freshness, + "relationships": run_relationships, # Reconcile tests "reconcile_equal": run_reconcile_equal, "reconcile_ratio_within": run_reconcile_ratio_within, diff --git a/tests/integration/testing/registry/test_dispatch_integration.py b/tests/integration/testing/registry/test_dispatch_integration.py index edf350f..6708687 100644 --- a/tests/integration/testing/registry/test_dispatch_integration.py +++ b/tests/integration/testing/registry/test_dispatch_integration.py @@ -18,3 +18,32 @@ def test_registry_not_null_and_unique_and_params_and_sql(): ok2, msg2, sql2 = TESTS["unique"](ex.con, "t", "id", {}) assert not ok2 and "duplicate" in (msg2 or "").lower() assert "group by 1 having count(*) > 1" in (sql2 or "").lower() + + +@pytest.mark.integration +@pytest.mark.duckdb +def test_registry_relationships_runner(): + ex = DuckExecutor(":memory:") + ex.con.execute("create table dim_users(id int)") + ex.con.execute("create table fact_users(user_id int)") + ex.con.execute("insert into dim_users values (1)") + ex.con.execute("insert into fact_users values (1),(2)") + + ok, msg, sql = TESTS["relationships"]( + ex.con, + "fact_users", + "user_id", + {"to": "dim_users"}, + ) + assert not ok + assert "orphan" in (msg or "").lower() + assert "left join" in (sql or "").lower() + + ex.con.execute("delete from fact_users where user_id = 2") + ok2, msg2, _ = TESTS["relationships"]( + ex.con, + "fact_users", + "user_id", + {"to": "dim_users"}, + ) + assert ok2 and msg2 is None diff --git a/tests/unit/test_testing_unit.py b/tests/unit/test_testing_unit.py index f4330e0..837ab19 100644 --- a/tests/unit/test_testing_unit.py +++ b/tests/unit/test_testing_unit.py @@ -20,6 +20,7 @@ reconcile_diff_within, reconcile_equal, reconcile_ratio_within, + relationships, row_count_between, sql_list, unique, @@ -587,6 +588,54 @@ def execute(self, sql): ) +@pytest.mark.unit +def test_relationships_ok(): + class FakeCon: + def execute(self, sql): + return _FakeResult([(0,)]) + + relationships( + FakeCon(), + table="fact_events", + field="user_id", + to_table="dim_users", + to_field="id", + ) + + +@pytest.mark.unit +def test_relationships_fails_on_orphans(): + class FakeCon: + def execute(self, sql): + return _FakeResult([(5,)]) + + with pytest.raises(TestFailure): + relationships( + FakeCon(), + table="fact_events", + field="user_id", + to_table="dim_users", + to_field="id", + ) + + +@pytest.mark.unit +def test_relationships_wraps_db_errors(): + class FakeCon: + def execute(self, sql): + raise RuntimeError("no such column") + + with pytest.raises(TestFailure) as exc: + relationships( + FakeCon(), + table="fact_events", + field="user_id", + to_table="dim_users", + to_field="id", + ) + assert "[relationships]" in str(exc.value) + + # --------------------------------------------------------------------------- # _fail # --------------------------------------------------------------------------- diff --git a/uv.lock b/uv.lock index f5d4896..c462d99 100644 --- a/uv.lock +++ b/uv.lock @@ -733,7 +733,7 @@ wheels = [ [[package]] name = "fastflowtransform" -version = "0.6.5" +version = "0.6.6" source = { editable = "." } dependencies = [ { name = "duckdb" },