From 7721987d1e3321b3322fb9365a808c5e4dc0e14f Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Tue, 18 Nov 2025 16:09:20 +0100 Subject: [PATCH 1/2] Added custom data quality tests in SQL and python --- docs/Data_Quality_Tests.md | 158 ++++++++++++ docs/examples/DQ_Demo.md | 156 +++++++++++- examples/dq_demo/project.yml | 15 ++ .../dq_demo/tests/dq/min_positive_share.ff.py | 70 ++++++ .../dq_demo/tests/dq/no_future_orders.ff.sql | 17 ++ pyproject.toml | 2 +- src/fastflowtransform/__init__.py | 3 +- src/fastflowtransform/cli/test_cmd.py | 176 ++++++------- src/fastflowtransform/config/project.py | 43 +++- src/fastflowtransform/decorators.py | 68 +++++ src/fastflowtransform/testing/__init__.py | 8 +- src/fastflowtransform/testing/discovery.py | 234 ++++++++++++++++++ src/fastflowtransform/testing/registry.py | 194 ++++++++++++--- uv.lock | 2 +- 14 files changed, 999 insertions(+), 147 deletions(-) create mode 100644 examples/dq_demo/tests/dq/min_positive_share.ff.py create mode 100644 examples/dq_demo/tests/dq/no_future_orders.ff.sql create mode 100644 src/fastflowtransform/testing/discovery.py diff --git a/docs/Data_Quality_Tests.md b/docs/Data_Quality_Tests.md index d6f5a31..a92fa45 100644 --- a/docs/Data_Quality_Tests.md +++ b/docs/Data_Quality_Tests.md @@ -18,6 +18,10 @@ The following values are currently supported for `type`: - `reconcile_diff_within` - `reconcile_coverage` +In addition, you can register **custom tests** (Python or SQL) with any logical +name (e.g. `min_positive_share`, `no_future_orders`) and use that name in +`project.yml → tests:`. See [Custom DQ Tests (Python & SQL)](#custom-dq-tests-python--sql). + ## Usage Overview ```yaml @@ -234,6 +238,160 @@ Reconciliation checks compare aggregates or keys across two relations. Their con * `target_where` *(str, optional)* — filter applied to the target. * **Failure:** Reports the number of missing keys. +## Custom DQ Tests (Python & SQL) + +FastFlowTransform lets you plug in your own test logic and still reuse: + +* `project.yml → tests:` configuration, +* the same `fft test` command, +* the standard summary output and exit codes. + +Custom tests come in two flavours: + +1. **Python-based tests** registered via `@dq_test(...)` in `tests/**/*.ff.py` +2. **SQL-based tests** defined in `tests/**/*.ff.sql` with a small `{{ config(...) }}` header. + +Both kinds participate in the same Pydantic validation pipeline as built-in +tests: built-ins use the strict `ProjectTestConfig` union, while custom tests +are validated against a generated parameter model derived from their `config(...)`. + +### Python-based custom tests + +Create a file like `tests/dq/min_positive_share.ff.py`: + +```python +from __future__ import annotations + +from typing import Any + +from fastflowtransform.decorators import dq_test +from fastflowtransform.testing import base as testing + + +@dq_test("min_positive_share") +def min_positive_share( + con: Any, + table: str, + column: str | None, + params: dict[str, Any], +) -> tuple[bool, str | None, str | None]: + """ + Custom DQ test: require that at least `min_share` of rows have column > 0. + + Parameters (from project.yml → tests → params): + - min_share: float in [0,1], e.g. 0.75 + - where: optional filter (string) to restrict the population + """ + if column is None: + example = f"select count(*) from {table} where > 0" + return False, "min_positive_share requires a 'column' parameter", example + + # For project.yml tests the user payload lives under params["params"] + cfg: dict[str, Any] = params.get("params") or params + min_share: float = cfg["min_share"] + where: str | None = cfg.get("where") + + where_clause = f" where {where}" if where else "" + + total_sql = f"select count(*) from {table}{where_clause}" + if where: + pos_sql = f"select count(*) from {table}{where_clause} and {column} > 0" + else: + pos_sql = f"select count(*) from {table} where {column} > 0" + + total = testing._scalar(con, total_sql) + positives = testing._scalar(con, pos_sql) + + example_sql = f"{pos_sql}; -- positives\n{total_sql}; -- total" + + if not total: + return False, f"min_positive_share: table {table} is empty", example_sql + + share = float(positives or 0) / float(total) + if share < min_share: + msg = ( + f"min_positive_share failed: positive share {share:.4f} " + f"< required {min_share:.4f} " + f"({positives} of {total} rows have {column} > 0)" + ) + return False, msg, example_sql + + return True, None, example_sql +``` + +The decorator `@dq_test("min_positive_share")` registers the function under that +logical name. You can then reference it from `project.yml`: + +```yaml +tests: + - type: min_positive_share + table: orders + column: amount + params: + min_share: 0.75 + where: "amount <> 0" + tags: [batch] +``` + +Notes: + +* The function **must** return `(ok: bool, message: str | None, example_sql: str | None)`. +* The signature is the same as built-in runners: `(con, table, column, params)`. +* For project-level tests, the full YAML dict is passed as `params`; by + convention, custom tests read their own options from `params["params"]`. + +### SQL-based custom tests + +Create a file like `tests/dq/no_future_orders.ff.sql`: + +```sql +{{ config( + type="no_future_orders", + params=["where"] +) }} + +-- Custom DQ test: fail if any row has a timestamp in the future. +-- +-- Conventions: +-- - {{ table }} : table name (e.g. "orders") +-- - {{ column }} : timestamp column (e.g. "order_ts") +-- - {{ where }} : optional filter, passed via params["where"] + +select count(*) as failures +from {{ table }} +where {{ column }} > current_timestamp + {%- if where %} and ({{ where }}){%- endif %} +``` + +The `config(...)` header declares: + +* `type`: logical name used in `project.yml → tests:`. +* `params`: list of allowed parameter keys for this test. + +FFT turns this into a small Pydantic model and validates your YAML config +against it. Any unknown key under `params:` results in a clear error at +**config-load time**, before executing SQL. + +Hook it up in `project.yml`: + +```yaml +tests: + - type: no_future_orders + table: orders + column: order_ts + params: + where: "amount <> 0" + tags: [batch] +``` + +At runtime FFT: + +1. discovers `tests/**/*.ff.sql`, +2. registers each file as a test of the given `type`, +3. validates `params:` using the `params=[...]` schema, +4. renders and executes the SQL as a “violation count” query + (`0` = pass, `> 0` = fail). + ## Severity & Tags * `severity: error` (default) makes failures stop the test run with exit code 1. diff --git a/docs/examples/DQ_Demo.md b/docs/examples/DQ_Demo.md index 8bbbde0..881798d 100644 --- a/docs/examples/DQ_Demo.md +++ b/docs/examples/DQ_Demo.md @@ -1,6 +1,6 @@ # Data Quality Demo Project -The **Data Quality Demo** shows how to use **all built-in FFT data quality tests** on a small, understandable model: +The **Data Quality Demo** shows how to use **all built-in FFT data quality tests** plus **custom DQ tests (Python & SQL)** on a small, understandable model: * Column checks: @@ -18,6 +18,11 @@ The **Data Quality Demo** shows how to use **all built-in FFT data quality tests * `reconcile_diff_within` * `reconcile_coverage` +* Custom tests (demo): + + * `min_positive_share` (Python-based) + * `no_future_orders` (SQL-based) + It uses a simple **customers / orders / mart** setup so you can see exactly what each test does and how it fails when something goes wrong. --- @@ -66,6 +71,11 @@ examples/dq_demo/ orders.ff.sql marts/ mart_orders_agg.ff.sql + + tests/ + dq/ + min_positive_share.ff.py + no_future_orders.ff.sql ``` ### Seeds @@ -218,6 +228,23 @@ tests: column: last_order_ts max_delay_minutes: 100000000 tags: [example:dq_demo, batch] + + # 7) Custom Python test: ensure at least a given share of positive amounts + - type: min_positive_share + table: orders + column: amount + params: + min_share: 0.75 + where: "amount <> 0" + tags: [example:dq_demo, batch] + + # 8) Custom SQL test: no future orders allowed + - type: no_future_orders + table: orders + column: order_ts + params: + where: "amount <> 0" + tags: [example:dq_demo, batch] ``` ### Cross-table reconciliations @@ -276,6 +303,133 @@ This set of tests touches **all available test types** and ties directly back to --- +## Custom DQ tests (Python & SQL) + +The demo also shows how to define **custom data quality tests** that integrate with: + +* the `project.yml → tests:` block, +* the `fft test` CLI, +* and the same summary output as built-in tests. + +### Python-based test: `min_positive_share` + +File: `examples/dq_demo/tests/dq/min_positive_share.ff.py` + +```python +from __future__ import annotations + +from typing import Any + +from fastflowtransform.decorators import dq_test +from fastflowtransform.testing import base as testing + + +@dq_test("min_positive_share") +def min_positive_share( + con: Any, + table: str, + column: str | None, + params: dict[str, Any], +) -> tuple[bool, str | None, str | None]: + """ + Custom DQ test: require that at least `min_share` of rows have column > 0. + + Parameters (from project.yml → tests → params): + - min_share: float in [0,1], e.g. 0.75 + - where: optional filter (string) to restrict the population + """ + if column is None: + example = f"select count(*) from {table} where > 0" + return False, "min_positive_share requires a 'column' parameter", example + + # Params come from project.yml under `params:` + cfg: dict[str, Any] = params.get("params") or params # project.yml wrapper + min_share: float = cfg["min_share"] + where: str | None = cfg.get("where") + + where_clause = f" where {where}" if where else "" + + total_sql = f"select count(*) from {table}{where_clause}" + if where: + pos_sql = f"select count(*) from {table}{where_clause} and {column} > 0" + else: + pos_sql = f"select count(*) from {table} where {column} > 0" + + total = testing._scalar(con, total_sql) + positives = testing._scalar(con, pos_sql) + + example_sql = f"{pos_sql}; -- positives\n{total_sql}; -- total" + + if not total: + return False, f"min_positive_share: table {table} is empty", example_sql + + share = float(positives or 0) / float(total) + if share < min_share: + msg = ( + f"min_positive_share failed: positive share {share:.4f} " + f"< required {min_share:.4f} " + f"({positives} of {total} rows have {column} > 0)" + ) + return False, msg, example_sql + + return True, None, example_sql +```` + +This test is wired up from `project.yml` like this: + +```yaml +- type: min_positive_share + table: orders + column: amount + params: + min_share: 0.75 + where: "amount <> 0" + tags: [example:dq_demo, batch] +``` + +### SQL-based test: `no_future_orders` + +File: `examples/dq_demo/tests/dq/no_future_orders.ff.sql` + +```sql +{{ config( + type="no_future_orders", + params=["where"] +) }} + +-- Custom DQ test: fail if any row has a timestamp in the future. +-- +-- Conventions: +-- - {{ table }} : table name (e.g. "orders") +-- - {{ column }} : timestamp column (e.g. "order_ts") +-- - {{ where }} : optional filter, passed via params["where"] + +select count(*) as failures +from {{ table }} +where {{ column }} > current_timestamp + {%- if where %} and ({{ where }}){%- endif %} +``` + +And the corresponding `project.yml` test: + +```yaml +- type: no_future_orders + table: orders + column: order_ts + params: + where: "amount <> 0" + tags: [example:dq_demo, batch] +``` + +At runtime: + +* The SQL file is discovered under `tests/**/*.ff.sql`. +* `{{ config(...) }}` tells FFT the logical `type` and allowed `params`. +* `fft test` validates your `params:` from `project.yml` against this schema and + then executes the rendered SQL as a “violation count” query (`0` = pass, `>0` = fail). + +--- + ## Running the demo Assuming you are in the repo root and using DuckDB as a starting point: diff --git a/examples/dq_demo/project.yml b/examples/dq_demo/project.yml index 9432e51..750e755 100644 --- a/examples/dq_demo/project.yml +++ b/examples/dq_demo/project.yml @@ -114,3 +114,18 @@ tests: target: table: customers key: "customer_id" + + # --- Custom tests -------------------------------------------------- + - type: no_future_orders + table: orders + column: order_ts + where: "order_ts is not null" + tags: [example:dq_demo, batch] + + - type: min_positive_share + table: orders + column: amount + params: + min_share: 0.75 + where: "amount <> 0" + tags: [example:dq_demo, batch] diff --git a/examples/dq_demo/tests/dq/min_positive_share.ff.py b/examples/dq_demo/tests/dq/min_positive_share.ff.py new file mode 100644 index 0000000..47c054d --- /dev/null +++ b/examples/dq_demo/tests/dq/min_positive_share.ff.py @@ -0,0 +1,70 @@ +# examples/dq_demo/tests/dq/min_positive_share.ff.py +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, ConfigDict + +from fastflowtransform.decorators import dq_test +from fastflowtransform.testing import base as testing + + +class MinPositiveShareParams(BaseModel): + """ + Params for the min_positive_share test. + + - min_share: required minimum share of positive values in [0, 1] + - where: optional WHERE predicate to filter rows + """ + + model_config = ConfigDict(extra="forbid") + + min_share: float = 0.5 + where: str | None = None + + +@dq_test("min_positive_share", params_model=MinPositiveShareParams) +def min_positive_share( + con: Any, + table: str, + column: str | None, + params: dict[str, Any], +) -> tuple[bool, str | None, str | None]: + """ + Require that at least `min_share` of rows have column > 0. + """ + if column is None: + example = f"select count(*) from {table} where > 0" + return False, "min_positive_share requires a 'column' parameter", example + + min_share: float = params["min_share"] + where: str | None = params.get("where") + + where_clause = f" where {where}" if where else "" + + total_sql = f"select count(*) from {table}{where_clause}" + if where: + pos_sql = f"{total_sql} and {column} > 0" + else: + pos_sql = f"select count(*) from {table} where {column} > 0" + + total = testing._scalar(con, total_sql) + positives = testing._scalar(con, pos_sql) + + example_sql = f"{pos_sql}; -- positives\n{total_sql}; -- total" + + if not total: + return False, f"min_positive_share: table {table} is empty", example_sql + + share = float(positives or 0) / float(total) + if share < min_share: + msg = ( + f"min_positive_share failed: positive share {share:.4f} " + f"< required {min_share:.4f} " + f"({positives} of {total} rows have {column} > 0" + + (f" where {where}" if where else "") + + ")" + ) + return False, msg, example_sql + + return True, None, example_sql diff --git a/examples/dq_demo/tests/dq/no_future_orders.ff.sql b/examples/dq_demo/tests/dq/no_future_orders.ff.sql new file mode 100644 index 0000000..c45a059 --- /dev/null +++ b/examples/dq_demo/tests/dq/no_future_orders.ff.sql @@ -0,0 +1,17 @@ +{{ config( + type="no_future_orders", + params=["where"] +) }} + +-- Custom DQ test: fail if any row has a timestamp in the future. +-- +-- Context variables injected by the runner: +-- {{ table }} : table name (e.g. "orders") +-- {{ column }} : timestamp column (e.g. "order_ts") +-- {{ where }} : optional filter (string), from params["where"] +-- {{ params }} : full params dict (validated), if you ever need it + +select count(*) as failures +from {{ table }} +where {{ column }} > current_timestamp + {%- if where %} and ({{ where }}){%- endif %} diff --git a/pyproject.toml b/pyproject.toml index dd39980..ee21faf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "fastflowtransform" -version = "0.5.15" +version = "0.6.1" description = "ython framework for SQL & Python data transformation, ETL pipelines, and dbt-style data modeling" readme = "README.md" license = { text = "Apache-2.0" } diff --git a/src/fastflowtransform/__init__.py b/src/fastflowtransform/__init__.py index 07a16ed..85d5ed5 100644 --- a/src/fastflowtransform/__init__.py +++ b/src/fastflowtransform/__init__.py @@ -15,7 +15,7 @@ try: from fastflowtransform.core import REGISTRY, Node, relation_for from fastflowtransform.dag import levels, mermaid, topo_sort - from fastflowtransform.decorators import engine_model, model + from fastflowtransform.decorators import dq_test, engine_model, model from fastflowtransform.fingerprint import ( EnvCtx, build_env_ctx, @@ -35,6 +35,7 @@ "Node", "__version__", "build_env_ctx", + "dq_test", "engine_model", "fingerprint_py", "fingerprint_sql", diff --git a/src/fastflowtransform/cli/test_cmd.py b/src/fastflowtransform/cli/test_cmd.py index 7feb493..8701e9d 100644 --- a/src/fastflowtransform/cli/test_cmd.py +++ b/src/fastflowtransform/cli/test_cmd.py @@ -9,7 +9,6 @@ from typing import Any import typer -import yaml from fastflowtransform.cli.bootstrap import _get_test_con, _prepare_context from fastflowtransform.cli.options import ( @@ -21,12 +20,20 @@ VarsOpt, ) from fastflowtransform.cli.selectors import _compile_selector +from fastflowtransform.config.project import ( + BaseProjectTestConfig, + parse_project_yaml_config, +) from fastflowtransform.core import REGISTRY from fastflowtransform.dag import topo_sort from fastflowtransform.errors import ModelExecutionError from fastflowtransform.logging import echo from fastflowtransform.schema_loader import Severity, TestSpec, load_schema_tests -from fastflowtransform.testing.registry import TESTS +from fastflowtransform.testing.discovery import ( + discover_python_tests, + discover_sql_tests, +) +from fastflowtransform.testing.registry import TESTS, Runner @dataclass @@ -96,12 +103,17 @@ def _run_models( _execute_models(order, run_sql, run_py, before=before, on_error=on_error) -def _load_tests(proj: Path) -> list[dict]: +def _load_tests(proj: Path) -> list[Any]: + """ + Load project-level tests from project.yml and validate them via Pydantic + (ProjectConfig.tests → list[ProjectTestConfig]). + """ cfg_path = proj / "project.yml" if not cfg_path.exists(): return [] - cfg = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) or {} - return cfg.get("tests") or [] + proj_cfg = parse_project_yaml_config(proj) + # proj_cfg.tests is already a list[ProjectTestConfig] + return list(proj_cfg.tests or []) def _is_legacy_test_token(tokens: list[str]) -> bool: @@ -116,13 +128,16 @@ def _apply_legacy_tag_filter( legacy_tag = tokens[0] def has_tag(t: Any) -> bool: - # Dict (old format) + # Dict (old format; kept for backwards compatibility) if isinstance(t, dict): tags = t.get("tags") or [] return (legacy_tag in tags) if isinstance(tags, list) else (legacy_tag == tags) - # TestSpec (new Schema) + # Schema YAML tests if isinstance(t, TestSpec): return legacy_tag in (t.tags or []) + # Project.yml tests validated via Pydantic + if isinstance(t, BaseProjectTestConfig): + return legacy_tag in (t.tags or []) return False return [t for t in tests if has_tag(t)] @@ -178,7 +193,16 @@ def _prepare_test_from_mapping( _sev = str(t.get("severity", "error")).lower() severity: Severity = "warn" if _sev == "warn" else "error" - params: dict[str, Any] = dict(t) + # Prefer nested `params:` block (for custom tests), fall back to flat fields + if isinstance(t.get("params"), Mapping): + params: dict[str, Any] = dict(t["params"]) + else: + params = dict(t) + + # Strip meta fields that belong to the test spec, not the runner + META_KEYS = {"type", "table", "column", "severity", "tags", "name"} + for k in META_KEYS: + params.pop(k, None) col = t.get("column") if kind.startswith("reconcile_"): @@ -212,10 +236,20 @@ def _prepare_test( raw_test: Any, executor: Any ) -> tuple[str, Any, Severity, dict[str, Any], Any, Any]: """ - Dispatcher that normalizes both TestSpec and mapping-based tests. + Dispatcher that normalizes: + - TestSpec (schema.yml) + - ProjectTestConfig (Pydantic from project.yml) + - dict-like legacy tests """ if isinstance(raw_test, TestSpec): return _prepare_test_from_spec(raw_test, executor) + + if isinstance(raw_test, BaseProjectTestConfig): + # Convert to plain dict and reuse the existing mapping-based logic. + data = raw_test.model_dump(exclude_none=True) + return _prepare_test_from_mapping(data, executor) + + # Fallback: old dict-style tests (if any remain) return _prepare_test_from_mapping(raw_test, executor) @@ -233,9 +267,33 @@ def _run_dq_tests(con: Any, tests: Iterable[Any], executor: Any) -> list[DQResul ) = _prepare_test(raw_test, executor) t0 = time.perf_counter() - ok, msg, example = TESTS[kind](con, table_for_exec, col, params) - ms = int((time.perf_counter() - t0) * 1000) + runner: Runner | None = TESTS.get(kind) + if runner is None: + # Unknown test type → treat as configuration failure + err_msg = ( + f"Unknown test type {kind!r}. " + "Register a custom runner or fix the 'type' in project.yml/schema.yml." + ) + ms = int((time.perf_counter() - t0) * 1000) + param_str = _format_params_for_summary(kind, params) + results.append( + DQResult( + kind=kind, + table=str(display_table), + column=col, + ok=False, + msg=err_msg, + ms=ms, + severity=severity, + param_str=param_str, + example_sql=None, + ) + ) + continue + + ok, msg, example = runner(con, table_for_exec, col, params) + ms = int((time.perf_counter() - t0) * 1000) param_str = _format_params_for_summary(kind, params) results.append( @@ -255,98 +313,6 @@ def _run_dq_tests(con: Any, tests: Iterable[Any], executor: Any) -> list[DQResul return results -# def _run_dq_tests(con: Any, tests: Iterable[Any], executor: Any) -> list[DQResult]: -# results: list[DQResult] = [] - -# def _fmt_table(value: Any) -> Any: -# if executor is None or not hasattr(executor, "_format_test_table"): -# return value -# return executor._format_test_table(value) - -# def _fmt_reconcile_side(side: Any) -> Any: -# if not isinstance(side, dict): -# return side -# side_fmt = dict(side) -# tbl = side_fmt.get("table") -# if tbl is not None: -# side_fmt["table"] = _fmt_table(tbl) -# return side_fmt - -# for t in tests: -# severity: Severity -# if isinstance(t, TestSpec): -# kind = t.type -# col = t.column -# severity = t.severity -# params: dict[str, Any] = t.params or {} -# display_table = t.table -# table_for_exec = _fmt_table(t.table) -# if kind.startswith("reconcile_"): -# params = dict(params) -# if isinstance(params.get("left"), dict): -# params["left"] = _fmt_reconcile_side(params["left"]) -# if isinstance(params.get("right"), dict): -# params["right"] = _fmt_reconcile_side(params["right"]) -# if isinstance(params.get("source"), dict): -# params["source"] = _fmt_reconcile_side(params["source"]) -# if isinstance(params.get("target"), dict): -# params["target"] = _fmt_reconcile_side(params["target"]) -# else: -# kind = t["type"] -# _sev = str(t.get("severity", "error")).lower() -# severity = "warn" if _sev == "warn" else "error" -# params = dict(t) -# col = t.get("column") -# if kind.startswith("reconcile_"): -# if isinstance(t.get("left"), dict) and isinstance(t.get("right"), dict): -# lt = (t.get("left") or {}).get("table") -# rt = (t.get("right") or {}).get("table") -# display_table = f"{lt} ⇔ {rt}" -# elif isinstance(t.get("source"), dict) and isinstance(t.get("target"), dict): -# st = (t.get("source") or {}).get("table") -# tt = (t.get("target") or {}).get("table") -# display_table = f"{st} ⇒ {tt}" -# else: -# display_table = "" -# table_for_exec = _fmt_table(t.get("table")) -# if isinstance(params.get("left"), dict): -# params["left"] = _fmt_reconcile_side(params["left"]) -# if isinstance(params.get("right"), dict): -# params["right"] = _fmt_reconcile_side(params["right"]) -# if isinstance(params.get("source"), dict): -# params["source"] = _fmt_reconcile_side(params["source"]) -# if isinstance(params.get("target"), dict): -# params["target"] = _fmt_reconcile_side(params["target"]) -# else: -# table_for_exec = _fmt_table(t.get("table")) -# if not isinstance(table_for_exec, str) or not table_for_exec: -# raise typer.BadParameter("Missing or invalid 'table' in test config") -# display_table = table_for_exec - -# # Dispatch via registry -# t0 = time.perf_counter() -# ok, msg, example = TESTS[kind](con, table_for_exec, col, params) -# ms = int((time.perf_counter() - t0) * 1000) - -# # Build short parameter display for the summary line -# param_str = _format_params_for_summary(kind, params) - -# results.append( -# DQResult( -# kind=kind, -# table=str(display_table), -# column=col, -# ok=ok, -# msg=msg, -# ms=ms, -# severity=severity, -# param_str=param_str, -# example_sql=example, -# ) -# ) -# return results - - def _print_summary(results: list[DQResult]) -> None: passed = sum(1 for r in results if r.ok) failed = sum((not r.ok) and (r.severity != "warn") for r in results) @@ -424,6 +390,10 @@ def test( if not skip_build: _run_models(model_pred, run_sql, run_py) + # Discover custom DQ tests (SQL + Python) under project/tests/ + discover_sql_tests(ctx.project) + discover_python_tests(ctx.project) + # 1) project.yml tests tests: list[Any] = _load_tests(ctx.project) # 2) schema YAML tests diff --git a/src/fastflowtransform/config/project.py b/src/fastflowtransform/config/project.py index f12a710..de9b20b 100644 --- a/src/fastflowtransform/config/project.py +++ b/src/fastflowtransform/config/project.py @@ -3,7 +3,7 @@ from collections.abc import Sequence from pathlib import Path -from typing import Annotated, Any, Literal +from typing import Any, Literal import yaml from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator @@ -439,7 +439,40 @@ class ReconcileCoverageTestConfig(BaseProjectTestConfig): target_where: str | None = None -ProjectTestConfig = Annotated[ +class CustomProjectTestConfig(BaseProjectTestConfig): + """ + Catch-all config for user-defined tests declared in project.yml under `tests:`. + + - `type`: any non-empty string (must NOT match a built-in test type if you want + this class to be used; built-ins still win first). + - `table` / `column`: optional; for tests that don't need them, you can omit. + - Extra keys are allowed and preserved (e.g. threshold, pattern, window_days). + """ + + # Allow arbitrary extra keys; they'll be visible in model_dump(). + model_config = ConfigDict(extra="allow") + + type: str + table: str | None = None + column: str | None = None + + +# ProjectTestConfig = Annotated[ +# NotNullTestConfig +# | UniqueTestConfig +# | AcceptedValuesTestConfig +# | GreaterEqualTestConfig +# | NonNegativeSumTestConfig +# | RowCountBetweenTestConfig +# | FreshnessTestConfig +# | ReconcileEqualTestConfig +# | ReconcileRatioWithinTestConfig +# | ReconcileDiffWithinTestConfig +# | ReconcileCoverageTestConfig, +# Field(discriminator="type"), +# ] + +ProjectTestConfig = ( NotNullTestConfig | UniqueTestConfig | AcceptedValuesTestConfig @@ -450,9 +483,9 @@ class ReconcileCoverageTestConfig(BaseProjectTestConfig): | ReconcileEqualTestConfig | ReconcileRatioWithinTestConfig | ReconcileDiffWithinTestConfig - | ReconcileCoverageTestConfig, - Field(discriminator="type"), -] + | ReconcileCoverageTestConfig + | CustomProjectTestConfig +) # --------------------------------------------------------------------------- diff --git a/src/fastflowtransform/decorators.py b/src/fastflowtransform/decorators.py index e0f62ad..1346e98 100644 --- a/src/fastflowtransform/decorators.py +++ b/src/fastflowtransform/decorators.py @@ -7,8 +7,11 @@ from pathlib import Path from typing import Any, ParamSpec, Protocol, TypeVar, cast +from pydantic import BaseModel + from fastflowtransform.core import REGISTRY, relation_for from fastflowtransform.errors import ModuleLoadError +from fastflowtransform.testing.registry import DQParamsBase, Runner, register_python_test P = ParamSpec("P") R_co = TypeVar("R_co", covariant=True) @@ -176,3 +179,68 @@ def deco(fn: Callable[P, R_co]) -> HasFFMeta[P, R_co] | Callable[P, R_co]: return fn return deco + + +def dq_test( + name: str | None = None, + *, + overwrite: bool = False, + params_model: type[BaseModel] | None = None, +) -> Callable[[Callable[..., Any]], Runner]: + """ + Decorator to register a custom data-quality test runner. + + Usage: + + from fastflowtransform import dq_test + + @dq_test("email_domain_allowed") + def email_domain_allowed(con, table, column, params): + ... + return True, None, "select ..." + + If `name` is omitted, the function name is used: + + @dq_test() + def email_sanity(con, table, column, params): + ... + + # In project.yml / schema.yml: type: email_sanity + + Params model: + + class EmailTestParams(DQParamsBase): + allowed_domains: list[str] + + @dq_test("email_domain_allowed", params_model=EmailTestParams) + def email_domain_allowed(con, table, column, params: EmailTestParams): + ... + + Args: + name: Optional explicit test name. If None, fn.__name__ is used. + overwrite: If True, allow overriding an existing test name. + params_model: Optional Pydantic model to validate `params`. + If omitted, DQParamsBase (extra='forbid') is used. + """ + + def decorator(fn: Callable[..., Any]) -> Runner: + # Prefer attribute __name__ when available; fallback is just a placeholder. + if name is not None: + reg_name: str = name + else: + reg_name = cast(str, getattr(fn, "__name__", "")) + + pm = params_model or DQParamsBase + + # Central registration so test_cmd can pick up the params schema + register_python_test(reg_name, fn, params_model=pm, overwrite=overwrite) + + # Attach a bit of metadata (not required, but can be handy for debugging/introspection) + fn_any = cast(Any, fn) + fn_any.__ff_test_name__ = reg_name + fn_any.__ff_test_params_model__ = pm + + # Type-wise, fn already matches Runner's call signature at runtime + return cast(Runner, fn) + + return decorator diff --git a/src/fastflowtransform/testing/__init__.py b/src/fastflowtransform/testing/__init__.py index 1eb3ee2..f937cc2 100644 --- a/src/fastflowtransform/testing/__init__.py +++ b/src/fastflowtransform/testing/__init__.py @@ -1,9 +1,5 @@ from __future__ import annotations -from .registry import TESTS, Runner, register_test +from .registry import TESTS, Runner -__all__ = [ - "TESTS", - "Runner", - "register_test", -] +__all__ = ["TESTS", "Runner"] diff --git a/src/fastflowtransform/testing/discovery.py b/src/fastflowtransform/testing/discovery.py new file mode 100644 index 0000000..e958868 --- /dev/null +++ b/src/fastflowtransform/testing/discovery.py @@ -0,0 +1,234 @@ +# fastflowtransform/testing/discovery.py + +from __future__ import annotations + +import ast +import re +from pathlib import Path +from typing import Any, cast + +from pydantic import BaseModel, create_model + +from fastflowtransform.core import REGISTRY +from fastflowtransform.errors import ModelConfigError, ModuleLoadError +from fastflowtransform.logging import get_logger +from fastflowtransform.testing.registry import DQParamsBase, register_sql_test + +logger = get_logger("dq_discovery") + + +# --------------------------------------------------------------------------- +# Helper: parse leading {{ config(...) }} for test SQL files +# --------------------------------------------------------------------------- + + +def _parse_test_config(text: str, path: Path) -> dict[str, Any]: + """ + Parse the leading `{{ config(...) }}` header from a SQL-based DQ test file. + + Expected pattern (subset of the model config parser): + + {{ config( + type="no_future_orders", + params=["where"] + ) }} + + Returns a plain dict with at least 'type', optionally 'params'. + Raises ModelConfigError on malformed config. + """ + head = text[:2000] + + m = re.search( + r"^\s*\{\{\s*config\s*\((?P.*?)\)\s*\}\}", + head, + flags=re.IGNORECASE | re.DOTALL, + ) + if not m: + return {} + args = m.group("args").strip() + if not args: + return {} + src = f"__CFG__({args})" + + try: + node = ast.parse(src, mode="eval") + if not isinstance(node.body, ast.Call): + return {} + except Exception as exc: + raise ModelConfigError( + f"invalid syntax in test config: {exc}", + path=str(path), + field=None, + hint="Ensure {{ config(...) }} contains comma-separated key=value literals.", + ) from exc + + cfg: dict[str, Any] = {} + for kw in node.body.keywords: + if kw.arg is None: + val_src = ast.get_source_segment(src, kw.value) or "" + raise ModelConfigError( + f"unsupported **kwargs (got {val_src})", + path=str(path), + field="**kwargs", + hint="Use explicit key=value pairs; test configs must use literals.", + ) + field = kw.arg + try: + cfg[field] = ast.literal_eval(kw.value) + except Exception as err: + val_src = ast.get_source_segment(src, kw.value) or "" + raise ModelConfigError( + f"invalid literal (quote strings, no expressions): {val_src}", + path=str(path), + field=field, + hint=( + "All values must be JSON/Python literals (e.g. 'no_future_orders', ['where'])." + ), + ) from err + return cfg + + +# --------------------------------------------------------------------------- +# SQL-based tests +# --------------------------------------------------------------------------- + + +def _build_params_model_from_config( + test_type: str, + path: Path, + cfg: dict[str, Any], +) -> type[BaseModel] | None: + """ + Build a Pydantic model from config(params=[...]) in the SQL test file. + + Example in .ff.sql: + + {{ config( + type="no_future_orders", + params=["where"] + ) }} + + This becomes a model roughly equivalent to: + + class DQTestParams_no_future_orders(DQParamsBase): + where: Any | None = None + + DQParamsBase enforces extra="forbid", so: + - unknown keys are rejected (typos → clear error), + - declared keys are typed as Any, + - all declared params are optional (default None). + """ + raw_params = cfg.get("params") + if not raw_params: + return None + + if not isinstance(raw_params, (list, tuple)) or not all(isinstance(p, str) for p in raw_params): + raise ModelConfigError( + "config(params=...) must be a list of strings", + path=str(path), + field="params", + hint="Example: params=['where', 'min_amount']", + ) + + params: list[str] = [str(p) for p in raw_params] + + # Build a dynamic model with optional Any fields and extra='forbid' + field_defs: dict[str, tuple[Any, Any]] = {} + for name in params: + field_defs[name] = (Any, None) + model_name = f"DQTestParams_{test_type}" + + ParamsModel = create_model( + model_name, + __base__=DQParamsBase, + # cast to satisfy the type checker; runtime behaviour is correct + **cast(dict[str, Any], field_defs), + ) + + return ParamsModel + + +def discover_sql_tests(project_dir: Path) -> None: + """ + Discover SQL-based DQ tests under tests/**/*.ff.sql. + + Each file must: + - start with a {{ config(type="...", params=[...]) }} block + - contain exactly one SELECT that returns a scalar "violation count". + + For each test we: + - parse config(type=..., params=[...]) + - build a Pydantic params model from `params` (if provided) + - register the test via `register_sql_test(...)` + + Runtime behaviour: + - params from project.yml are validated against that model (unknown keys → error) + - Jinja template is rendered with: + table, column, params, where + (where = params.get('where'), always present) + - the SQL is executed and interpreted as "violation count". + """ + tests_dir = project_dir / "tests" + if not tests_dir.exists(): + return + + # Ensure the Jinja env is initialized, even though register_sql_test + # will call REGISTRY.get_env() again internally. + REGISTRY.get_env() + + for path in sorted(tests_dir.rglob("*.ff.sql")): + try: + text = path.read_text(encoding="utf-8") + except Exception as exc: + logger.error("Failed to read SQL test file %s: %s", path, exc) + continue + + cfg = _parse_test_config(text, path) + tname = cfg.get("type") + if not tname or not isinstance(tname, str): + logger.warning( + "%s: SQL test file missing config(type='...'); skipping", + path, + ) + continue + + try: + params_model = _build_params_model_from_config(tname, path, cfg) + except ModelConfigError: + # Config errors should be fatal, like for models + raise + + register_sql_test( + kind=tname, + path=path, + params_model=params_model, + ) + logger.debug("Registered SQL DQ test '%s' from %s", tname, path) + + +# --------------------------------------------------------------------------- +# Python-based tests +# --------------------------------------------------------------------------- + + +def discover_python_tests(project_dir: Path) -> None: + """ + Discover Python-based DQ tests under tests/**/*.ff.py. + + The files should define functions decorated with @dq_test("type_name"). + Importing the module is enough; the decorator will register the runner. + """ + tests_dir = project_dir / "tests" + if not tests_dir.exists(): + return + + for path in sorted(tests_dir.rglob("*.ff.py")): + # Reuse the Registry's module loader so we get consistent behaviour. + try: + REGISTRY._load_py_module(path) + except ModuleLoadError: + # Fail fast: broken DQ test modules should not be silently ignored. + raise + except Exception as exc: + # Other exceptions: surface as a clear message + raise ModuleLoadError(f"Failed to import DQ test module {path}: {exc}") from exc diff --git a/src/fastflowtransform/testing/registry.py b/src/fastflowtransform/testing/registry.py index 28fd88b..fbb2653 100644 --- a/src/fastflowtransform/testing/registry.py +++ b/src/fastflowtransform/testing/registry.py @@ -1,9 +1,17 @@ # fastflowtransform/testing/registry.py from __future__ import annotations +from pathlib import Path from typing import Any, Protocol +from pydantic import BaseModel, ConfigDict, ValidationError + +from fastflowtransform.core import REGISTRY +from fastflowtransform.logging import get_logger from fastflowtransform.testing import base as testing +from fastflowtransform.testing.base import _scalar + +logger = get_logger("dq_registry") class Runner(Protocol): @@ -15,6 +23,8 @@ class Runner(Protocol): example_sql (str | None): Optional example SQL (shown in summary on failure). """ + __name__: str + def __call__( self, con: Any, table: str, column: str | None, params: dict[str, Any] ) -> tuple[bool, str | None, str | None]: ... @@ -30,6 +40,31 @@ def _example_where(where: str | None) -> str: return f" where ({where})" if where else "" +def _format_param_validation_error( + kind: str, + origin: str | None, + exc: ValidationError, +) -> str: + """Build a human-friendly error message when params don't match the schema.""" + lines: list[str] = [] + header = f"[{kind}] Invalid test configuration" + if origin: + header += f" for {origin}" + lines.append(header + ":") + for err in exc.errors(): + loc = ".".join(str(p) for p in err.get("loc", ())) + msg = err.get("msg", "invalid value") + if loc: + lines.append(f" • {loc}: {msg}") + else: + lines.append(f" • {msg}") + lines.append( + "Hint: Update project.yml → tests: entry for this test so that its parameters " + "match the expected schema." + ) + return "\n".join(lines) + + # --------------------------------------------------------------------------- # Basic column-level tests # --------------------------------------------------------------------------- @@ -349,6 +384,16 @@ def run_reconcile_coverage( return False, str(e), example +# --------------------------------------------------------------------------- +# Optional param-schema registry for custom tests +# --------------------------------------------------------------------------- + +# kind -> Pydantic model used to validate params +TEST_PARAM_MODELS: dict[str, type[BaseModel]] = {} +# kind -> origin info (for nicer messages) +TEST_ORIGINS: dict[str, str] = {} # e.g. "tests/dq/no_future_orders.ff.sql" + + # --------------------------------------------------------------------------- # Registry # --------------------------------------------------------------------------- @@ -375,42 +420,133 @@ def run_reconcile_coverage( # --------------------------------------------------------------------------- -def register_test(name: str, runner: Runner, *, overwrite: bool = False) -> None: +class DQParamsBase(BaseModel): + """ + Base for all dynamically created DQ params models. + Forbids unknown keys by default. """ - Register (or override) a data-quality test runner. - - Usage: - from fastflowtransform.testing import register_test + model_config = ConfigDict(extra="forbid") - def my_runner(con, table, column, params): - ... - return True, None, None - register_test("my_custom_test", my_runner) +def register_python_test( + kind: str, + runner: Runner, + *, + params_model: type[BaseModel] | None = None, + origin: str | None = None, + overwrite: bool = False, +) -> None: + """ + Register a custom Python test. Args: - name: Name of the test as used in project.yml / schema.yml (`type:` field). - runner: Callable implementing the Runner protocol. - overwrite: If False (default), attempting to override an existing name - raises ValueError. Set True to replace built-ins or earlier - registrations. - - Raises: - ValueError: If name is empty or already registered (and overwrite=False). - TypeError: If runner is not callable. + kind: logical test type (e.g. "no_future_orders"). + runner: callable implementing Runner. + params_model: optional Pydantic model for the params dict. + origin: string used in error messages (e.g. module path). + overwrite: if True, replace an existing test with the same kind. """ - if not isinstance(name, (str, bytes)) or not str(name).strip(): - raise ValueError("Test name must be a non-empty string") - - if not callable(runner): - raise TypeError("runner must be callable") - - key = str(name).strip() - if key in TESTS and not overwrite: + if kind in TESTS and not overwrite: raise ValueError( - f"Test '{key}' is already registered. " - "Pass overwrite=True to replace the existing runner." + f"Test type {kind!r} is already registered " + f"(origin={TEST_ORIGINS.get(kind, '')!r})" ) - TESTS[key] = runner + if kind in TESTS and overwrite: + logger.warning( + "Overwriting DQ test %r (previous origin=%r, new origin=%r)", + kind, + TEST_ORIGINS.get(kind), + origin, + ) + + TESTS[kind] = runner + + if params_model is not None: + TEST_PARAM_MODELS[kind] = params_model + # If overwriting and no new params_model is given, keep the old one if present. + elif overwrite and kind in TEST_PARAM_MODELS: + pass + else: + # Default to a generic params model if you have one, or leave it unset + TEST_PARAM_MODELS.pop(kind, None) + + if origin is not None: + TEST_ORIGINS[kind] = origin + elif overwrite: + # if overwriting without explicit origin, don't change existing origin + pass + + +def register_sql_test( + kind: str, + path: Path, + *, + params_model: type[BaseModel] | None = None, + overwrite: bool = False, +) -> None: + """ + Register a custom SQL-based test from a *.ff.sql file. + + kind: logical test type (e.g. "no_future_orders"). + path: filesystem path to the template. + params_model: optional Pydantic model for params. + overwrite: if True, allow overriding an existing test of the same kind. + """ + origin = str(path) + META_KEYS = {"type", "table", "column", "severity", "tags", "name"} + + def _runner( + con: Any, table: str, column: str | None, params: dict[str, Any] + ) -> tuple[bool, str | None, str | None]: + # 1) Strip generic test metadata and validate params if a schema is provided + raw_params: dict[str, Any] = dict(params or {}) + core_params: dict[str, Any] = {k: v for k, v in raw_params.items() if k not in META_KEYS} + + if params_model is not None: + try: + cfg = params_model.model_validate(core_params) + except ValidationError as exc: + err_msg = _format_param_validation_error(kind, origin, exc) + raise testing.TestFailure(err_msg) from exc + # Use normalized params (e.g. converted types, defaults) + params_validated = cfg.model_dump(exclude_none=True) + else: + params_validated = core_params + + # 2) Render the SQL template with a stable context + env = REGISTRY.get_env() + raw = path.read_text(encoding="utf-8") + tmpl = env.from_string(raw) + + ctx: dict[str, Any] = { + "kind": kind, + "table": table, + "column": column, + "params": params_validated, + # always present, so templates can safely do `{% if where %}` + "where": params_validated.get("where"), + } + + try: + sql = tmpl.render(**ctx) + except Exception as exc: + raise testing.TestFailure( + f"[{kind}] Failed to render SQL template for {origin}: {exc}" + ) from exc + + # 3) Execute the SQL: convention here is "fail if count(*) > 0" + n = _scalar(con, sql) + ok = int(n or 0) == 0 + msg: str | None = None if ok else f"{kind} failed: {n} offending row(s)" + example_sql = sql + return ok, msg, example_sql + + register_python_test( + kind, + _runner, + params_model=params_model, + origin=origin, + overwrite=overwrite, + ) diff --git a/uv.lock b/uv.lock index b01fb07..d9815cc 100644 --- a/uv.lock +++ b/uv.lock @@ -733,7 +733,7 @@ wheels = [ [[package]] name = "fastflowtransform" -version = "0.5.15" +version = "0.6.1" source = { editable = "." } dependencies = [ { name = "duckdb" }, From 81c62457da05737d82dc47b8b64e5d1d981c83fb Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Tue, 18 Nov 2025 17:27:05 +0100 Subject: [PATCH 2/2] Made api demo more robust --- examples/cache_demo/Makefile | 2 ++ pyproject.toml | 6 +++++- pytest.ini | 1 + src/fastflowtransform/api/http.py | 19 ++++++++++++++++--- .../examples/test_examples_matrix.py | 1 + uv.lock | 15 +++++++++++++++ 6 files changed, 40 insertions(+), 4 deletions(-) diff --git a/examples/cache_demo/Makefile b/examples/cache_demo/Makefile index c2cf2f9..b57f5a6 100644 --- a/examples/cache_demo/Makefile +++ b/examples/cache_demo/Makefile @@ -9,6 +9,7 @@ BQ_FRAME ?= bigframes PROJECT ?= . UV ?= uv DB ?= .local/cache_demo.duckdb +HTTP_TIMEOUT ?= 60 ifeq ($(ENGINE),duckdb) PROFILE_ENV = dev_duckdb @@ -39,6 +40,7 @@ ifeq ($(ENGINE),bigquery) BASE_ENV := $(BASE_ENV) FF_ENGINE_VARIANT=$(BQ_FRAME) endif RUN_ENV = $(BASE_ENV) +RUN_ENV := $(RUN_ENV) FF_HTTP_TIMEOUT=$(HTTP_TIMEOUT) SELECT_ALL = --select tag:example:cache_demo --select tag:$(ENGINE_TAG) diff --git a/pyproject.toml b/pyproject.toml index ee21faf..b7ace1a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "hatchling.build" [project] name = "fastflowtransform" version = "0.6.1" -description = "ython framework for SQL & Python data transformation, ETL pipelines, and dbt-style data modeling" +description = "Python framework for SQL & Python data transformation, ETL pipelines, and dbt-style data modeling" readme = "README.md" license = { text = "Apache-2.0" } authors = [ { name = "Marko Lekic", email = "you@example.com" } ] @@ -84,6 +84,7 @@ full = [ dev = [ "pytest==8.4.*", "pytest-cov==7.0.*", + "pytest-rerunfailures==14.0.*", "ruff==0.14.*", "mypy==1.18.*", "pre-commit==3.*", @@ -195,6 +196,9 @@ repository = "pypi" [tool.pytest.ini_options] addopts = "-q --cov=src/fastflowtransform --cov-report=term-missing --cov-report=xml --cov-report=html" testpaths = ["tests"] +markers = [ + "flaky: tests that may be rerun on failure", +] [tool.coverage.run] branch = true diff --git a/pytest.ini b/pytest.ini index 407b084..24520a0 100644 --- a/pytest.ini +++ b/pytest.ini @@ -11,3 +11,4 @@ markers = unit: marks unit tests integration: integration tests example: run the examples as tests + flaky: tests that may be rerun on failure diff --git a/src/fastflowtransform/api/http.py b/src/fastflowtransform/api/http.py index 9f53773..75cab55 100644 --- a/src/fastflowtransform/api/http.py +++ b/src/fastflowtransform/api/http.py @@ -189,9 +189,22 @@ def _one(method: str, url_: str, params_: dict | None) -> tuple[bytes, dict]: tries = max(_DEF_MAX_RETRIES, 1) for i in range(tries): - status, resp_headers, resp_body = _http_request( - method, url_, params=params_, headers=headers, timeout=timeout - ) + try: + status, resp_headers, resp_body = _http_request( + method, url_, params=params_, headers=headers, timeout=timeout + ) + except _HTTP.TimeoutException as exc: + if i < tries - 1: + _backoff_sleep(i) + continue + raise RuntimeError( + f"HTTP timeout after {timeout or _DEF_TIMEOUT}s for {url_}" + ) from exc + except _HTTP.RequestError as exc: + if i < tries - 1: + _backoff_sleep(i) + continue + raise RuntimeError(f"HTTP request error for {url_}: {exc}") from exc if status in (429, 500, 502, 503, 504) and i < tries - 1: # honor Retry-After (seconds) if present ra = resp_headers.get("Retry-After") diff --git a/tests/integration/examples/test_examples_matrix.py b/tests/integration/examples/test_examples_matrix.py index 32df40e..d6affe5 100644 --- a/tests/integration/examples/test_examples_matrix.py +++ b/tests/integration/examples/test_examples_matrix.py @@ -56,6 +56,7 @@ def _run_cmd(cmd: list[str], cwd: Path, extra_env: dict[str, str] | None = None) @pytest.mark.integration @pytest.mark.example +@pytest.mark.flaky(reruns=2, reruns_delay=2) @pytest.mark.parametrize("example,engine", EXAMPLE_ENGINE_PARAMS) def test_examples_with_all_engines(example, engine, request): fixture_name = ENGINE_ENV_FIXTURE[engine] diff --git a/uv.lock b/uv.lock index d9815cc..26a07e1 100644 --- a/uv.lock +++ b/uv.lock @@ -762,6 +762,7 @@ dev = [ { name = "pre-commit" }, { name = "pytest" }, { name = "pytest-cov" }, + { name = "pytest-rerunfailures" }, { name = "ruff" }, { name = "types-pyyaml" }, ] @@ -832,6 +833,7 @@ requires-dist = [ { name = "pyspark", marker = "extra == 'spark'", specifier = ">=4.0.1" }, { name = "pytest", marker = "extra == 'dev'", specifier = "==8.4.*" }, { name = "pytest-cov", marker = "extra == 'dev'", specifier = "==7.0.*" }, + { name = "pytest-rerunfailures", marker = "extra == 'dev'", specifier = "==14.0.*" }, { name = "python-dotenv", specifier = ">=1.0" }, { name = "pyyaml", specifier = ">=6.0" }, { name = "ruff", marker = "extra == 'dev'", specifier = "==0.14.*" }, @@ -2976,6 +2978,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" }, ] +[[package]] +name = "pytest-rerunfailures" +version = "14.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "packaging" }, + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/cc/a4/6de45fe850759e94aa9a55cda807c76245af1941047294df26c851dfb4a9/pytest-rerunfailures-14.0.tar.gz", hash = "sha256:4a400bcbcd3c7a4ad151ab8afac123d90eca3abe27f98725dc4d9702887d2e92", size = 21350, upload-time = "2024-03-13T08:21:39.444Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/dc/e7/e75bd157331aecc190f5f8950d7ea3d2cf56c3c57fb44da70e60b221133f/pytest_rerunfailures-14.0-py3-none-any.whl", hash = "sha256:4197bdd2eaeffdbf50b5ea6e7236f47ff0e44d1def8dae08e409f536d84e7b32", size = 12709, upload-time = "2024-03-13T08:21:37.199Z" }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0"