From 217b158d5678113280b389152b7c717264dd7ac1 Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Sat, 18 Oct 2025 16:51:25 +0200 Subject: [PATCH 01/16] Implemented tickets T1 and T2 --- .flowforge/cache/dev-duckdb.json | 6 + .github/workflows/ci.yml | 3 + Makefile.pipeline | 5 +- .../.flowforge/cache/stg-postgres.json | 12 + .../.flowforge/cache/dev-duckdb.json | 15 + pyproject.toml | 11 +- src/flowforge/__init__.py | 50 +++- src/flowforge/_version.py | 54 ++++ src/flowforge/cache.py | 150 ++++++++++ src/flowforge/cli.py | 280 ++++++++++++++---- src/flowforge/dag.py | 46 +++ src/flowforge/executors/duckdb_exec.py | 9 + src/flowforge/fingerprint.py | 262 ++++++++++++++++ src/flowforge/run_executor.py | 98 ++++++ tests/unit/test_cache_skip_logic.py | 68 +++++ tests/unit/test_cache_store.py | 20 ++ tests/unit/test_fingerprint.py | 91 ++++++ uv.lock | 2 +- 18 files changed, 1111 insertions(+), 71 deletions(-) create mode 100644 .flowforge/cache/dev-duckdb.json create mode 100644 examples/postgres/.flowforge/cache/stg-postgres.json create mode 100644 examples/simple_duckdb/.flowforge/cache/dev-duckdb.json create mode 100644 src/flowforge/_version.py create mode 100644 src/flowforge/cache.py create mode 100644 src/flowforge/fingerprint.py create mode 100644 src/flowforge/run_executor.py create mode 100644 tests/unit/test_cache_skip_logic.py create mode 100644 tests/unit/test_cache_store.py create mode 100644 tests/unit/test_fingerprint.py diff --git a/.flowforge/cache/dev-duckdb.json b/.flowforge/cache/dev-duckdb.json new file mode 100644 index 0000000..e28ef33 --- /dev/null +++ b/.flowforge/cache/dev-duckdb.json @@ -0,0 +1,6 @@ +{ + "engine": "duckdb", + "entries": {}, + "profile": "dev", + "version": 1 +} \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 86f9cbd..f89d585 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,5 +1,8 @@ name: CI +env: + ACT: "" + on: push: branches: [ main ] diff --git a/Makefile.pipeline b/Makefile.pipeline index 984b913..2f2ac2a 100644 --- a/Makefile.pipeline +++ b/Makefile.pipeline @@ -10,7 +10,10 @@ seed: # Run/DAG/Test reuse the same duckdb path (FF_ENV can switch engine) run: - $(FLOWFORGE) run "$(FF_PROJECT)" --env "$(FF_ENV)" + $(FLOWFORGE) run "$(FF_PROJECT)" --env "$(FF_ENV)" --jobs=1 + +run-parallel: + $(FLOWFORGE) run "$(FF_PROJECT)" --env "$(FF_ENV)" --jobs=4 --keep-going dag: $(FLOWFORGE) dag "$(FF_PROJECT)" --env "$(FF_ENV)" --html diff --git a/examples/postgres/.flowforge/cache/stg-postgres.json b/examples/postgres/.flowforge/cache/stg-postgres.json new file mode 100644 index 0000000..b1a6262 --- /dev/null +++ b/examples/postgres/.flowforge/cache/stg-postgres.json @@ -0,0 +1,12 @@ +{ + "engine": "postgres", + "entries": { + "mart_orders_enriched": "fc41294d6967cfcf3c9b7d2c5405210d9383e5538747f7f13bc16c96cc8754c5", + "mart_users.ff": "6a61e68266d9151e9c473340ee93ccb70146b0079371bae889e4c8313b40a8b8", + "orders.ff": "b45347dd5ad3adbf1637e637fb32e27b766995549c7b1ae4d9412a8ff1b0d375", + "users.ff": "68dbd147dcca21a36d04f031499eb8977a6fae8659873189b2a3169e560cb81e", + "users_enriched": "cf5157127bd1c72c6942a54049acd61ee8817782920534c274b4261783ceda4b" + }, + "profile": "stg", + "version": 1 +} \ No newline at end of file diff --git a/examples/simple_duckdb/.flowforge/cache/dev-duckdb.json b/examples/simple_duckdb/.flowforge/cache/dev-duckdb.json new file mode 100644 index 0000000..821ffb1 --- /dev/null +++ b/examples/simple_duckdb/.flowforge/cache/dev-duckdb.json @@ -0,0 +1,15 @@ +{ + "engine": "duckdb", + "entries": { + "ephemeral_ids.ff": "61534518fe8ceb722140f1c6b6429362222672c27fd5aa4136cfbb03cd38654e", + "mart_orders_enriched": "da281dd1d5b462bb3e2dc632fd203d0528a3ab9527feedb586527551705e620d", + "mart_users.ff": "02f49fe6c1d761de1795f8e707507d6555f6157765c0424c4aebc78108e63c54", + "orders.ff": "cc9e389d5252c14d658314722787c6196cc1fa31478f560a53f0e60702956ede", + "users.ff": "4e0bfd811f1b17f20a3eb2605746bacf2e76e5bc2e7227f73e9f3f9b36521bb5", + "users_enriched": "3767af19bcb0b93e7fef32a668de2b7d9cb7ba9523da0b2c8c12986a9a51d1cb", + "v_users.ff": "032761584e627b5f7e2d2dd77eb885cfc3fc8e7e8d7e35f6f6fda45a2e4e8e73", + "v_users_enriched.ff": "a15c32702a0efeafc32d1a9f2a6106a076f8b2f4ce82c5eb99010e7221c59490" + }, + "profile": "dev", + "version": 1 +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index d657d46..2983546 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,11 +4,11 @@ build-backend = "hatchling.build" [project] name = "flowforge" -version = "0.1.0" +version = "0.3.0" description = "FlowForge PoC: SQL+Python Models, DAG, DuckDB executor" readme = "README.md" license = { text = "Apache-2.0" } -authors = [ { name = "Your Name", email = "you@example.com" } ] +authors = [ { name = "Marko Lekic", email = "you@example.com" } ] requires-python = ">=3.12" keywords = ["data", "etl", "elt", "analytics", "dbt", "streaming", "dag"] classifiers = [ @@ -49,9 +49,9 @@ dev = [ ] [project.urls] -Homepage = "https://github.com//" -Issues = "https://github.com///issues" -Repository = "https://github.com//.git" +Homepage = "https://github.com/MirrorsAndMisdirections/FlowForge" +Issues = "https://github.com/MirrorsAndMisdirections/FlowForge/issues" +Repository = "https://github.com/MirrorsAndMisdirections/FlowForge.git" [project.scripts] flowforge = "flowforge.cli:app" @@ -96,6 +96,7 @@ combine-as-imports = true [tool.ruff.lint.pylint] max-args = 7 +max-returns = 8 # Keep __init__.py tidy without false positives on re-exports. [tool.ruff.lint.per-file-ignores] diff --git a/src/flowforge/__init__.py b/src/flowforge/__init__.py index 4613074..c0a9d2e 100644 --- a/src/flowforge/__init__.py +++ b/src/flowforge/__init__.py @@ -1,5 +1,47 @@ -__all__ = ["__version__"] -__version__ = "0.1.0" +# src/flowforge/__init__.py +""" +FlowForge package entry point. -from .core import Node, Registry, relation_for -from .decorators import model +Expose the package version and a few commonly-used types/APIs for convenience. +Importing here is intentionally lightweight to avoid circular imports with CLI code. +""" + +from __future__ import annotations + +from ._version import __version__ # re-export for `from flowforge import __version__` + +# Optional convenience re-exports (safe, low-risk imports). +# If you prefer a minimal surface, you can remove the block below. +try: + from .core import REGISTRY, Node, relation_for + from .dag import levels, mermaid, topo_sort + from .decorators import model + from .fingerprint import ( + EnvCtx, + build_env_ctx, + fingerprint_py, + fingerprint_sql, + get_function_source, + normalized_sources_blob, + ) +except Exception: + # Keep import-time robustness; the CLI only needs __version__ at import time. + # Other symbols remain available when modules are importable. + pass + +__all__ = [ + "REGISTRY", + "EnvCtx", + "Node", + "__version__", + "build_env_ctx", + "fingerprint_py", + "fingerprint_sql", + "get_function_source", + "levels", + "mermaid", + "model", + "normalized_sources_blob", + "relation_for", + "topo_sort", +] diff --git a/src/flowforge/_version.py b/src/flowforge/_version.py new file mode 100644 index 0000000..433c901 --- /dev/null +++ b/src/flowforge/_version.py @@ -0,0 +1,54 @@ +# src/flowforge/_version.py +from __future__ import annotations + +import os +from importlib.metadata import PackageNotFoundError, version as pkg_version +from pathlib import Path + +""" +Lightweight version helper. + +The public contract is a single constant: + + __version__: str + +Resolution order (first hit wins): +1) Environment variable FLOWFORGE_VERSION +2) Installed package metadata (importlib.metadata) +3) Optional sidecar file next to this module named "_VERSION" +4) Fallback: "0.0.0+dev" +""" + + +def _resolve_version() -> str: + # 1) Explicit override via environment (useful in CI/CD) + env = os.getenv("FLOWFORGE_VERSION") + if env: + return env + + # 2) Installed package metadata (works for normal + editable installs) + try: + v = pkg_version("flowforge") + if v and isinstance(v, str): + return v + except PackageNotFoundError: + pass + except Exception: + # Ignore unexpected metadata issues + pass + + # 3) Optional sidecar file (can be written by release tooling) + sidecar = Path(__file__).with_name("_VERSION") + try: + if sidecar.exists(): + txt = sidecar.read_text(encoding="utf-8").strip() + if txt: + return txt + except Exception: + pass + + # 4) Last resort + return "0.0.0+dev" + + +__version__: str = _resolve_version() diff --git a/src/flowforge/cache.py b/src/flowforge/cache.py new file mode 100644 index 0000000..821ca23 --- /dev/null +++ b/src/flowforge/cache.py @@ -0,0 +1,150 @@ +# src/flowforge/cache.py +from __future__ import annotations + +import json +import os +import tempfile +from collections.abc import Mapping +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from sqlalchemy import text + +from .core import relation_for + + +@dataclass +class FingerprintCache: + """ + Lightweight, project-scoped fingerprint store. + + The cache is persisted under: + /.flowforge/cache/-.json + + Schema: + { + "version": 1, + "engine": "", + "profile": "", + "entries": { "": "", ... } + } + """ + + project_dir: Path + profile: str + engine: str + version: int = 1 + entries: dict[str, str] = field(default_factory=dict) + + @property + def path(self) -> Path: + base = self.project_dir / ".flowforge" / "cache" + base.mkdir(parents=True, exist_ok=True) + filename = f"{self.profile}-{self.engine}.json" + return base / filename + + def load(self) -> None: + """Load cache file if present; silently do nothing when missing or corrupt.""" + try: + raw = json.loads(self.path.read_text(encoding="utf-8")) + if isinstance(raw, dict) and raw.get("version") == self.version: + self.entries = dict(raw.get("entries") or {}) + except Exception: + # On any error, start with an empty cache + self.entries = {} + + def save(self) -> None: + """Persist cache atomically.""" + payload = { + "version": self.version, + "engine": self.engine, + "profile": self.profile, + "entries": self.entries, + } + tmp_fd, tmp_name = tempfile.mkstemp(prefix=".ff-cache-", dir=str(self.path.parent)) + try: + with os.fdopen(tmp_fd, "w", encoding="utf-8") as fh: + json.dump(payload, fh, ensure_ascii=False, sort_keys=True, indent=2) + os.replace(tmp_name, self.path) + finally: + try: + if os.path.exists(tmp_name): + os.remove(tmp_name) + except Exception: + pass + + def get(self, node_name: str) -> str | None: + """Return cached fingerprint for a node or None.""" + return self.entries.get(node_name) + + def set(self, node_name: str, fingerprint: str) -> None: + """Set cached fingerprint for a node name.""" + self.entries[node_name] = fingerprint + + def update_many(self, fps: Mapping[str, str]) -> None: + """Bulk update cache entries.""" + for k, v in fps.items(): + self.entries[k] = v + + +# ------------------------ artifact existence helpers ------------------------ + + +def relation_exists(executor: Any, relation: str) -> bool: + """ + best-effort existence check of a materialized relation on the current engine. + Returns True if the relation appears to exist, False if definitely missing. + For unknown engines, returns True (do not block execution). + """ + try: + # DuckDB + con = getattr(executor, "con", None) + if con is not None and hasattr(con, "execute"): + rows = con.execute( + "select 1 from information_schema.tables " + "where table_schema in ('main','temp') and table_name = ?", + [relation], + ).fetchall() + return bool(rows) + + # Postgres (via SQLAlchemy engine) + engine = getattr(executor, "engine", None) + if engine is not None and hasattr(engine, "begin"): + with engine.begin() as conn: + rows = conn.execute( + text( + "select 1 from information_schema.tables " + "where table_schema = current_schema() and table_name = :t" + ), + {"t": relation}, + ).fetchall() + return bool(rows) + + # Other engines: assume relation is present if cache says so + return True + except Exception: + # Be permissive on errors: don't block execution with a false negative + return True + + +def can_skip_node( + *, + node_name: str, + new_fp: str, + cache: FingerprintCache, + executor: Any, + materialized: str, +) -> bool: + """ + Decide whether a node can be skipped based on: + - identical fingerprint to cached entry + - and existing materialized relation (unless ephemeral) + """ + old = cache.get(node_name) + if old is None or old != new_fp: + return False + if materialized == "ephemeral": + return True + rel = relation_for(node_name) + return relation_exists(executor, rel) diff --git a/src/flowforge/cli.py b/src/flowforge/cli.py index db57807..c478b52 100644 --- a/src/flowforge/cli.py +++ b/src/flowforge/cli.py @@ -5,10 +5,11 @@ import logging import os import textwrap +import threading import time import traceback from collections.abc import Callable, Iterable -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path from typing import Annotated, Any, NoReturn, cast @@ -28,11 +29,20 @@ from flowforge.executors.base import BaseExecutor from . import testing +from .cache import FingerprintCache, can_skip_node from .core import REGISTRY, relation_for -from .dag import mermaid, topo_sort +from .dag import levels as dag_levels, mermaid, topo_sort from .docs import render_site from .errors import DependencyNotFoundError, ProfileConfigError from .executors._shims import BigQueryConnShim, SAConnShim +from .fingerprint import ( + EnvCtx, + build_env_ctx, + fingerprint_py, + fingerprint_sql, + get_function_source, +) +from .run_executor import ScheduleResult, schedule from .seeding import seed_project from .settings import EngineType, EnvSettings, Profile, resolve_profile from .utest import discover_unit_specs, run_unit_specs @@ -361,13 +371,13 @@ def _execute_models( def _resolve_dag_out_dir(proj: Path, override: Path | None) -> Path: if override: return override.expanduser().resolve() - # project.yml optional lesen + # Optionally read project.yml cfg_path = proj / "project.yml" try: cfg = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) if cfg_path.exists() else {} except Exception: cfg = {} - p = (cfg or {}).get("docs", {}).get("dag_dir") # z. B. "site/dag" oder "./build/dag" + p = (cfg or {}).get("docs", {}).get("dag_dir") # e.g. "site/dag" or "./build/dag" if p: return (proj / p).expanduser().resolve() # Default @@ -415,22 +425,22 @@ def _setup_logging(verbose: int, quiet: int) -> None: typer.Option("--vars", help="Override template vars: key=value"), ] -CaseOpt = Annotated[str | None, typer.Option("--case", help="Nur einen Case ausführen")] +CaseOpt = Annotated[str | None, typer.Option("--case", help="Run only a single case")] -EnvOpt = Annotated[str, typer.Option("--env", help="Profil-Umgebung")] +EnvOpt = Annotated[str, typer.Option("--env", help="Profile environment")] EngineOpt = Annotated[ EngineType | None, - typer.Option("--engine", help="duckdb|postgres|bigquery (überschreibt Profile)"), + typer.Option("--engine", help="duckdb|postgres|bigquery (overrides profile)"), ] PathOpt = Annotated[ - str | None, typer.Option("--path", help="Eine einzelne YAML-Datei statt Discovery") + str | None, typer.Option("--path", help="Single YAML file instead of discovery") ] -ProjectArg = Annotated[str, typer.Argument(help="Pfad zum Projekt (mit tests/unit/*.yml)")] +ProjectArg = Annotated[str, typer.Argument(help="Path to the project (with tests/unit/*.yml)")] -ModelOpt = Annotated[str | None, typer.Option("--model", help="Nur ein Modell testen")] +ModelOpt = Annotated[str | None, typer.Option("--model", help="Test a single model")] SelectOpt = Annotated[ list[str] | None, @@ -450,9 +460,29 @@ def _setup_logging(verbose: int, quiet: int) -> None: HtmlOpt = Annotated[ bool, - typer.Option("--html", help="Erzeuge HTML-DAG und Mini-Dokumentation"), + typer.Option("--html", help="Generate HTML DAG and mini documentation"), ] +JobsOpt = Annotated[ + int, + typer.Option( + "--jobs", + help="Max parallel executions per level (≥1).", + min=1, + show_default=True, + ), +] + +KeepOpt = Annotated[ + bool, + typer.Option( + "--keep-going", + help=( + "On errors within a level: do not cancel tasks already running in that level; " + "subsequent levels still do not start." + ), + ), +] # ──────────────────────────────────── CLI Root ─────────────────────────────────── @@ -463,14 +493,14 @@ def main( None, "--version", "-V", - help="Zeigt die Version und beendet.", + help="Show version and exit.", callback=_version_callback, is_eager=True, ), verbose: int = typer.Option( - 0, "--verbose", "-v", count=True, help="Mehr Ausgaben (-v: INFO, -vv: DEBUG)" + 0, "--verbose", "-v", count=True, help="Increase verbosity (-v: INFO, -vv: DEBUG)" ), - quiet: int = typer.Option(0, "--quiet", "-q", count=True, help="Weniger Ausgaben (-q: ERROR)"), + quiet: int = typer.Option(0, "--quiet", "-q", count=True, help="Reduce verbosity (-q: ERROR)"), ) -> None: _setup_logging(verbose, quiet) @@ -478,58 +508,188 @@ def main( # ──────────────────────────────────── Commands ─────────────────────────────────── -@app.command( - help=( - "Lädt das Projekt, baut den DAG und führt alle Modelle aus." - "\n\nBeispiel:\n flowforge run . --env dev" - ) -) -def run( - project: ProjectArg = ".", - env_name: EnvOpt = "dev", - engine: EngineOpt = None, - vars: VarsOpt = None, - select: SelectOpt = None, -) -> None: - ctx = _prepare_context(project, env_name, engine, vars) - _, pred = _compile_selector(select) - - if LOG.isEnabledFor(logging.INFO): - typer.echo(f"Profil: {env_name} | Engine: {ctx.profile.engine}") - _, run_sql, run_py = ctx.make_executor() - - def _before(name: str, node: Any) -> None: +@dataclass +class _RunEngine: + ctx: Any + env_name: str + pred: Callable[[Any], bool] + shared: tuple[Any, Callable, Callable] = field(init=False) + tls: threading.local = field(default_factory=threading.local, init=False) + cache: FingerprintCache = field(init=False) + env_ctx: EnvCtx = field(init=False) + computed_fps: dict[str, str] = field(default_factory=dict, init=False) + fps_lock: threading.Lock = field(default_factory=threading.Lock, init=False) + + def __post_init__(self) -> None: if LOG.isEnabledFor(logging.INFO): - typer.echo(f"→ Running {name} ({node.kind}) on {ctx.profile.engine}") + typer.echo(f"Profile: {self.env_name} | Engine: {self.ctx.profile.engine}") + self.shared = self.ctx.make_executor() + relevant_env = [k for k in os.environ if k.startswith("FF_")] + self.env_ctx = build_env_ctx( + engine=self.ctx.profile.engine, + profile_name=self.env_name, + relevant_env_keys=relevant_env, + sources=getattr(REGISTRY, "sources", {}), + ) + self.cache = FingerprintCache( + self.ctx.project, profile=self.env_name, engine=self.ctx.profile.engine + ) + self.cache.load() + + def _get_runner(self) -> tuple[Any, Callable, Callable]: + if getattr(self.tls, "runner", None) is None: + ex, run_sql_shared, run_py_shared = self.shared + run_sql_fn, run_py_fn = run_sql_shared, run_py_shared + if self.ctx.profile.engine == "duckdb" and hasattr(ex, "clone"): + try: + ex = ex.clone() + + def run_sql_fn(n): + return ex.run_sql(n, self.ctx.jinja_env) + + run_py_fn = ex.run_python + except Exception: + pass + self.tls.runner = (ex, run_sql_fn, run_py_fn) + return self.tls.runner + + def _maybe_fingerprint(self, node: Any, ex: Any) -> str | None: + supports_sql_fp = all( + hasattr(ex, a) for a in ("render_sql", "_resolve_ref", "_resolve_source") + ) + if not (supports_sql_fp or node.kind == "python"): + return None + with self.fps_lock: + dep_fps = { + d: self.computed_fps.get(d) or self.cache.get(d) or "" for d in (node.deps or []) + } + try: + if node.kind == "sql" and supports_sql_fp: + rendered = ex.render_sql( + node, + self.ctx.jinja_env, + ref_resolver=lambda nm: ex._resolve_ref(nm, self.ctx.jinja_env), + source_resolver=ex._resolve_source, + ) + return fingerprint_sql( + node=node, rendered_sql=rendered, env_ctx=self.env_ctx, dep_fps=dep_fps + ) + if node.kind == "python": + func = REGISTRY.py_funcs[node.name] + src = get_function_source(func) + return fingerprint_py( + node=node, func_src=src, env_ctx=self.env_ctx, dep_fps=dep_fps + ) + except Exception: + return None + return None - def _on_error(name: str, _node: Any, err: Exception) -> None: + def run_node(self, name: str) -> None: + node = REGISTRY.nodes[name] + ex, run_sql_fn, run_py_fn = self._get_runner() + cand_fp = self._maybe_fingerprint(node, ex) + if cand_fp is not None: + materialized = (getattr(node, "meta", {}) or {}).get("materialized", "table") + if can_skip_node( + node_name=name, + new_fp=cand_fp, + cache=self.cache, + executor=ex, + materialized=materialized, + ): + with self.fps_lock: + self.computed_fps[name] = cand_fp + if LOG.isEnabledFor(logging.INFO): + typer.echo(f"↻ Skipped {name} (cache hit)") + return + if LOG.isEnabledFor(logging.INFO): + typer.echo(f"→ Running {name} ({node.kind})") + (run_sql_fn if node.kind == "sql" else run_py_fn)(node) + if cand_fp is not None: + with self.fps_lock: + self.computed_fps[name] = cand_fp + + @staticmethod + def before(_name: str) -> None: + return + + @staticmethod + def on_error(name: str, err: BaseException) -> None: + _node = REGISTRY.get_node(name) if isinstance(err, KeyError): typer.echo( _error_block( f"Model failed: {name} (KeyError)", _pretty_exc(err), - "• Prüfe Spaltennamen deiner Upstream-Tabellen (Seeds/SQL).\n" - "• Bei >1 Deps: dict-Keys sind physische Relationen (relation_for), " - "z. B. 'orders'.\n" - "• (Optional) Eingabespalten im Executor vor dem Call loggen.", + "• Check column names in your upstream tables (seeds/SQL).\n" + "• For >1 deps: dict keys are physical relations (relation_for), " + "e.g. 'orders'.\n" + "• (Optional) Log input columns in the executor before the call.", ) ) raise typer.Exit(1) from err - body = _pretty_exc(err) if os.getenv("FLOWFORGE_TRACE") == "1": body += "\n\n" + "".join(traceback.format_exc()) - typer.echo(_error_block(f"Model failed: {name}", body, "• Siehe Ursache oben.")) + typer.echo(_error_block(f"Model failed: {name}", body, "• See cause above.")) raise typer.Exit(1) from err - _run_models(pred, run_sql, run_py, before=_before, on_error=_on_error) + def persist_on_success(self, result: ScheduleResult) -> None: + if not result.failed: + self.cache.update_many(self.computed_fps) + self.cache.save() + @staticmethod + def print_timings(result: ScheduleResult) -> None: + if not result.per_node_s: + return + typer.echo("\nRuntime per model") + typer.echo("─────────────────") + for name in sorted(result.per_node_s, key=lambda k: k): + ms = int(result.per_node_s[name] * 1000) + typer.echo(f"• {name:<30} {ms:>6} ms") + typer.echo(f"\nTotal runtime: {result.total_s:.3f}s") + + +@app.command( + help=( + "Loads the project, builds the DAG, and runs every model." + "\n\nExample:\n flowforge run . --env dev" + ) +) +def run( + project: ProjectArg = ".", + env_name: EnvOpt = "dev", + engine: EngineOpt = None, + vars: VarsOpt = None, + select: SelectOpt = None, + jobs: JobsOpt = 1, + keep_going: KeepOpt = False, +) -> None: + ctx = _prepare_context(project, env_name, engine, vars) + _, pred = _compile_selector(select) + engine_ = _RunEngine(ctx=ctx, env_name=env_name, pred=pred) + lvls_all = dag_levels(REGISTRY.nodes) + lvls = [[n for n in lvl if pred(REGISTRY.nodes[n])] for lvl in lvls_all] + lvls = [lvl for lvl in lvls if lvl] + result: ScheduleResult = schedule( + lvls, + jobs=jobs, + fail_policy="keep_going" if keep_going else "fail_fast", + run_node=engine_.run_node, + before=engine_.before, + on_error=engine_.on_error, + ) + if result.failed: + raise typer.Exit(1) + engine_.persist_on_success(result) + engine_.print_timings(result) typer.echo("✓ Done") @app.command( help=( - "Gibt den DAG als Mermaid oder erzeugt eine HTML-Seite.\n\nBeispiele:\n " + "Outputs the DAG as Mermaid text or generates an HTML page.\n\nExamples:\n " "flowforge dag .\n flowforge dag . --env dev --html" ) ) @@ -566,13 +726,13 @@ def dag( typer.echo(f"Mermaid DAG written to {dag_out}") if LOG.isEnabledFor(logging.INFO): - typer.echo(f"Profil: {env_name} | Engine: {ctx.profile.engine}") + typer.echo(f"Profile: {env_name} | Engine: {ctx.profile.engine}") @app.command( help=( - "Materialisiert Modelle und führt konfigurierte Datenqualitäts-Checks aus." - "\n\nBeispiel:\n flowforge test . --env dev --select batch" + "Materializes models and runs configured data-quality checks." + "\n\nExample:\n flowforge test . --env dev --select batch" ) ) def test( @@ -582,30 +742,30 @@ def test( vars: VarsOpt = None, select: SelectOpt = None, ) -> None: - # 0) Setup & Normalisierung + # 0) Setup & normalization ctx = _prepare_context(project, env_name, engine, vars) tokens, pred = _compile_selector(select) execu, run_sql, run_py = ctx.make_executor() - # 1) Shim/Marker (optional) + # 1) Shim/marker (optional) con = _get_test_con(execu) _maybe_print_marker(con) - # 2) Modelle in Topo-Reihenfolge (mit optionalem Filter) ausführen + # 2) Run models in topological order (with optional filter) _run_models(pred, run_sql, run_py) - # 3) Tests laden & ggf. legacy Tag-Filter anwenden + # 3) Load tests and optionally apply the legacy tag filter tests = _load_tests(ctx.project) tests = _apply_legacy_tag_filter(tests, tokens) if not tests: - typer.secho("Keine Tests konfiguriert.", fg="bright_black") + typer.secho("No tests configured.", fg="bright_black") raise typer.Exit(code=0) - # 4) Tests ausführen & Ergebnis zusammenfassen + # 4) Run tests and summarize the outcome results = _run_dq_tests(con, tests) _print_summary(results) - # 5) Exit-Code + # 5) Exit code failed = sum(not r.ok for r in results) raise typer.Exit(code=2 if failed > 0 else 0) @@ -639,8 +799,8 @@ def _load_tests(proj: Path) -> list[dict]: def _apply_legacy_tag_filter(tests: list[dict], tokens: list[str]) -> list[dict]: - # Falls genau EIN Token ohne Präfix (tag:/type:/kind:) angegeben wurde, - # als Legacy-DQ-Tag interpretieren. + # If exactly ONE token without a prefix (tag:/type:/kind:) was provided, + # interpret it as a legacy DQ tag. if len(tokens) != 1 or tokens[0].startswith(("tag:", "type:", "kind:")): return tests legacy_tag = tokens[0] @@ -670,7 +830,7 @@ def _run_dq_tests(con: Any, tests: Iterable[dict]) -> list[DQResult]: def _exec_test_kind(con: Any, kind: str, t: dict, table: Any, col: Any) -> tuple[bool, str | None]: - # Dispatch-Mapping statt großer if/elif-Kette + # Dispatch mapping instead of a large if/elif chain try_map = { "not_null": lambda: testing.not_null(con, table, col), "unique": lambda: testing.unique(con, table, col), @@ -716,7 +876,7 @@ def _print_summary(results: list[DQResult]) -> None: @app.command( help=( - "Seeds aus /seeds einspielen.\n\nBeispiele:\n flowforge seed . " + "Load seeds from /seeds into the target database.\n\nExamples:\n flowforge seed . " "--env dev\n flowforge seed examples/postgres --env stg" ) ) @@ -754,7 +914,7 @@ def utest( specs = discover_unit_specs(ctx.project, path=path, only_model=model) if not specs: - typer.echo("ℹ️ Keine Unit-Tests gefunden (tests/unit/*.yml).") # noqa: RUF001 + typer.echo("ℹ️ No unit tests found (tests/unit/*.yml).") # noqa: RUF001 raise typer.Exit(0) failures = run_unit_specs(specs, ex, ctx.jinja_env, only_case=case) diff --git a/src/flowforge/dag.py b/src/flowforge/dag.py index 9a09e84..a55d789 100644 --- a/src/flowforge/dag.py +++ b/src/flowforge/dag.py @@ -40,6 +40,52 @@ def topo_sort(nodes: dict[str, Node]) -> list[str]: return order +def levels(nodes: dict[str, Node]) -> list[list[str]]: + """ + Returns a level-wise topological ordering. + - Each inner list contains nodes with no prerequisites inside the remaining + graph (i.e. eligible to run in parallel). + - Ordering within a level is lexicographically stable. + - Validation for missing deps/cycles matches topo_sort. + """ + # Fehlende Deps einsammeln (nur Modell-Refs; sources sind keine Nodes) + missing = { + n.name: sorted({d for d in (n.deps or []) if d not in nodes}) + for n in nodes.values() + if any(d not in nodes for d in (n.deps or [])) + } + if missing: + raise DependencyNotFoundError(missing) + + indeg = {k: 0 for k in nodes} + out: dict[str, set[str]] = defaultdict(set) + for n in nodes.values(): + for d in set(n.deps or []): + out[d].add(n.name) + indeg[n.name] += 1 + + # Start-Level: alle 0-Indegree + current = sorted([k for k, deg in indeg.items() if deg == 0]) + lvls: list[list[str]] = [] + seen_count = 0 + + while current: + lvls.append(current) + next_zero: set[str] = set() + for u in current: + seen_count += 1 + for v in sorted(out.get(u, ())): + indeg[v] -= 1 + if indeg[v] == 0: + next_zero.add(v) + current = sorted(next_zero) + + if seen_count != len(nodes): + cyclic = [k for k, deg in indeg.items() if deg > 0] + raise ModelCycleError(f"Cycle detected among nodes: {', '.join(sorted(cyclic))}") + return lvls + + def _mm_id(name: str) -> str: s = re.sub(r"[^A-Za-z0-9_]", "_", name) return "_" + s if s and s[0].isdigit() else (s or "_node") diff --git a/src/flowforge/executors/duckdb_exec.py b/src/flowforge/executors/duckdb_exec.py index 0f9cf4d..7e70855 100644 --- a/src/flowforge/executors/duckdb_exec.py +++ b/src/flowforge/executors/duckdb_exec.py @@ -1,4 +1,6 @@ # flowforge/executors/duckdb_exec.py +from __future__ import annotations + from collections.abc import Iterable from contextlib import suppress from pathlib import Path @@ -22,8 +24,15 @@ def __init__(self, db_path: str = ":memory:"): if db_path and db_path != ":memory:" and "://" not in db_path: with suppress(Exception): Path(db_path).parent.mkdir(parents=True, exist_ok=True) + self.db_path = db_path self.con = duckdb.connect(db_path) + def clone(self) -> DuckExecutor: + """ + Generates a new Executor instance with own connection for Thread-Worker. + """ + return DuckExecutor(self.db_path) + # ---- Frame hooks ---- def _read_relation(self, relation: str, node: Node, deps: Iterable[str]) -> pd.DataFrame: try: diff --git a/src/flowforge/fingerprint.py b/src/flowforge/fingerprint.py new file mode 100644 index 0000000..c1401f2 --- /dev/null +++ b/src/flowforge/fingerprint.py @@ -0,0 +1,262 @@ +# src/flowforge/fingerprint.py +from __future__ import annotations + +import hashlib +import inspect +import json +import os +import textwrap +from collections.abc import Iterable, Mapping +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from .core import Node, relation_for + +# ---------- Canonicalization helpers ---------- + + +def _as_primitive(obj: Any) -> Any: + """ + Convert complex Python objects into JSON-serializable primitives with stable ordering. + - dicts → {sorted keys} + - sets → sorted lists + - tuples → lists + - Path → string path + - Node → minimal stable representation (name/kind/path/dep names) + """ + if obj is None or isinstance(obj, (str, int, float, bool)): + return obj + if isinstance(obj, Path): + return str(obj) + if isinstance(obj, Node): + return { + "name": obj.name, + "kind": obj.kind, + "path": str(obj.path), + "deps": sorted(list(obj.deps or [])), + # meta intentionally omitted unless caller passes it explicitly + } + if isinstance(obj, (list, tuple)): + return [_as_primitive(x) for x in obj] + if isinstance(obj, set): + return sorted(_as_primitive(x) for x in obj) + if isinstance(obj, Mapping): + return {str(k): _as_primitive(v) for k, v in sorted(obj.items(), key=lambda kv: str(kv[0]))} + # Fallback: repr() is stable enough for primitives; for functions use dedicated source helper + return repr(obj) + + +def _stable_dumps(obj: Any) -> str: + """ + Serialize an object to a deterministic JSON string: + - keys sorted + - minimal separators + - non-ASCII preserved + """ + prim = _as_primitive(obj) + return json.dumps(prim, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + + +def _hash_hex(payload: str) -> str: + """Return a hex SHA-256 over the given payload string.""" + h = hashlib.sha256() + h.update(payload.encode("utf-8")) + return h.hexdigest() + + +def _normalize_sql(sql: str) -> str: + """ + Normalize SQL minimally for cross-platform consistency: + - Normalize line endings to '\n' + - Strip trailing whitespace on each line + NOTE: We intentionally DO NOT collapse spaces or comments; even small changes + should alter the fingerprint as per acceptance criteria. + """ + lines = sql.replace("\r\n", "\n").replace("\r", "\n").split("\n") + return "\n".join(line.rstrip() for line in lines).strip("\n") + + +# ---------- sources.yml normalization ---------- + + +def normalized_sources_blob(sources: Mapping[str, Any] | None) -> str: + """ + Return a stable JSON blob for a sources.yml mapping. + Keys are sorted recursively; absent input becomes "{}". + """ + return _stable_dumps(sources or {}) + + +# ---------- environment context ---------- + + +@dataclass(frozen=True) +class EnvCtx: + """ + Stable environment context used for fingerprinting. + Include only inputs that should invalidate compiled artifacts when they change. + """ + + engine: str + profile: str + env_vars: Mapping[str, str] + sources_json: str + + def to_payload(self) -> Mapping[str, Any]: + return { + "engine": self.engine, + "profile": self.profile, + "env": {k: self.env_vars.get(k, "") for k in sorted(self.env_vars.keys())}, + "sources": self.sources_json, + } + + +def build_env_ctx( + *, + engine: str, + profile_name: str, + relevant_env_keys: Iterable[str] = (), + sources: Mapping[str, Any] | None = None, +) -> EnvCtx: + """ + Construct an EnvCtx from engine/profile + a curated set of environment variables + and the (normalized) sources.yml mapping. + Only the provided environment keys are captured; all others are ignored. + """ + env_subset: dict[str, str] = {} + for key in sorted(set(relevant_env_keys)): + val = os.getenv(key) + if val is not None: + env_subset[key] = val + return EnvCtx( + engine=str(engine), + profile=str(profile_name), + env_vars=env_subset, + sources_json=normalized_sources_blob(sources), + ) + + +# ---------- robust function source retrieval ---------- + + +def get_function_source(func: Any) -> str: + """ + Return a best-effort, stable source string for a Python callable. + + Strategy (in order): + 1) inspect.getsource(func) → dedented string + 2) Read the defining file (co_filename) and slice starting at co_firstlineno + until the next top-level def/class (heuristic). Dedent as needed. + 3) Final fallback: combine qualified name and bytecode to ensure stability. + + This ensures fingerprinting works even for dynamically loaded modules, lambdas, + or environments where inspect cannot read the original file (e.g., zipimport). + """ + # 1) The happy path + try: + src = inspect.getsource(func) + return textwrap.dedent(src).strip() + except Exception: + pass + + # 2) Slice from file using code object hints + try: + code = getattr(func, "__code__", None) + if code and isinstance(code.co_firstlineno, int) and code.co_filename: + file_path = Path(code.co_filename) + # Read as binary + decode to be robust to odd encodings + with open(file_path, "rb") as fh: + raw = fh.read() + text = raw.decode("utf-8", errors="replace") + start = max(code.co_firstlineno - 1, 0) + + lines = text.splitlines() + # Heuristic: collect until the next top-level def/class (same or less indent) + buf: list[str] = [] + base_indent = None + for idx in range(start, len(lines)): + line = lines[idx] + buf.append(line) + # capture base indentation from the first non-empty line + if base_indent is None and line.strip(): + base_indent = len(line) - len(line.lstrip()) + # stop when we hit a new top-level def/class after the first line + if ( + idx > start + and line + and not line.startswith(" " * (base_indent or 0)) + and line.lstrip().startswith(("def ", "class ", "@")) + ): + buf.pop() # don't include the new top-level symbol + break + sliced = "\n".join(buf) + return textwrap.dedent(sliced).strip() + except Exception: + pass + + # 3) Last resort: qualname + bytecode hash + try: + qual = getattr(func, "__qualname__", getattr(func, "__name__", "anonymous")) + bc = getattr(getattr(func, "__code__", None), "co_code", b"") + payload = f"{qual}\nBYTECODE:{hashlib.sha256(bc).hexdigest()}" + return payload + except Exception: + return "UNKNOWN_FUNCTION" + + +# ---------- Fingerprint calculators ---------- + + +def fingerprint_sql( + *, + node: Node | str, + rendered_sql: str, + env_ctx: EnvCtx | Mapping[str, Any], + dep_fps: Mapping[str, str] | None = None, +) -> str: + """ + Compute a stable fingerprint for a SQL model. + Inputs: + - node : Node or node name for stable identity and relation + - rendered_sql : final SQL after templating (ref()/source() resolved as in executor) + - env_ctx : EnvCtx or compatible mapping (engine, profile, selected env vars, sources) + - dep_fps : mapping of dependency name → fingerprint (to invalidate downstream) + """ + n_name = node.name if isinstance(node, Node) else str(node) + payload = { + "kind": "sql", + "node": n_name, + "relation": relation_for(n_name), + "sql": _normalize_sql(rendered_sql), + "env": env_ctx.to_payload() if isinstance(env_ctx, EnvCtx) else _as_primitive(env_ctx), + "deps": _as_primitive(sorted((dep_fps or {}).items(), key=lambda kv: kv[0])), + } + return _hash_hex(_stable_dumps(payload)) + + +def fingerprint_py( + *, + node: Node | str, + func_src: str, + env_ctx: EnvCtx | Mapping[str, Any], + dep_fps: Mapping[str, str] | None = None, +) -> str: + """ + Compute a stable fingerprint for a Python model. + Inputs: + - node : Node or node name + - func_src : normalized function source (use get_function_source) + - env_ctx : EnvCtx or compatible mapping + - dep_fps : mapping of dependency name → fingerprint + """ + n_name = node.name if isinstance(node, Node) else str(node) + payload = { + "kind": "python", + "node": n_name, + "relation": relation_for(n_name), + "func_src": func_src.replace("\r\n", "\n").replace("\r", "\n").strip(), + "env": env_ctx.to_payload() if isinstance(env_ctx, EnvCtx) else _as_primitive(env_ctx), + "deps": _as_primitive(sorted((dep_fps or {}).items(), key=lambda kv: kv[0])), + } + return _hash_hex(_stable_dumps(payload)) diff --git a/src/flowforge/run_executor.py b/src/flowforge/run_executor.py new file mode 100644 index 0000000..f7c7380 --- /dev/null +++ b/src/flowforge/run_executor.py @@ -0,0 +1,98 @@ +# src/flowforge/run_executor.py +from __future__ import annotations + +import threading +from collections.abc import Callable +from concurrent.futures import Future, ThreadPoolExecutor, as_completed +from contextlib import suppress +from dataclasses import dataclass +from time import perf_counter +from typing import Literal + +FailPolicy = Literal["fail_fast", "keep_going"] + + +@dataclass +class ScheduleResult: + per_node_s: dict[str, float] + total_s: float + failed: dict[str, BaseException] + + +def schedule( + levels: list[list[str]], + jobs: int, + fail_policy: FailPolicy, + run_node: Callable[[str], None], + before: Callable[[str], None] | None = None, + on_error: Callable[[str, BaseException], None] | None = None, +) -> ScheduleResult: + """ + Execute the provided levels sequentially; within each level up to `jobs` + nodes are started in parallel. If a level fails, subsequent levels are not + scheduled. + + - fail_fast: after the first error, cancel any not-yet-started tasks in the + level when possible. + - keep_going: let all already scheduled tasks within the level finish. + + Returns per-node durations, total runtime, and node-level exceptions. + """ + jobs = max(1, int(jobs)) + per_node: dict[str, float] = {} + failed: dict[str, BaseException] = {} + per_node_lock = threading.Lock() + + t_total0 = perf_counter() + + for lvl in levels: + if not lvl: + continue + + # Separate pool per level to guarantee the per-level concurrency limit. + with ThreadPoolExecutor(max_workers=jobs, thread_name_prefix="ff-worker") as pool: + futures: dict[Future[None], str] = {} + + def _task(name: str) -> None: + if before: + with suppress(Exception): + before(name) + t0 = perf_counter() + try: + run_node(name) + finally: + # Always record duration, even when an exception is raised + dt = perf_counter() - t0 + with per_node_lock: + per_node[name] = dt + + # Schedule tasks + for name in lvl: + fut = pool.submit(_task, name) + futures[fut] = name + + # Evaluate futures; on first error optionally cancel the remainder + level_had_error = False + for fut in as_completed(list(futures.keys())): + name = futures[fut] + try: + fut.result() + except BaseException as e: + level_had_error = True + failed[name] = e + if on_error: + with suppress(Exception): + on_error(name, e) + if fail_policy == "fail_fast": + # Try to cancel all futures that have not started yet + for f in futures: + if f is fut: + continue + f.cancel() + + # Only proceed to subsequent levels if the current one succeeded + if level_had_error: + break + + total = perf_counter() - t_total0 + return ScheduleResult(per_node_s=per_node, total_s=total, failed=failed) diff --git a/tests/unit/test_cache_skip_logic.py b/tests/unit/test_cache_skip_logic.py new file mode 100644 index 0000000..065d02e --- /dev/null +++ b/tests/unit/test_cache_skip_logic.py @@ -0,0 +1,68 @@ +# tests/unit/test_cache_skip_logic.py +from __future__ import annotations + +from flowforge.cache import FingerprintCache, can_skip_node + + +class _DummyExec: + def __init__(self, present: set[str]): + self._present = present + + # DuckDB-like `con.execute(...).fetchall()` shape + class _Con: + def __init__(self, present: set[str]): + self.present = present + + def execute(self, sql, params=None): + class _R: + def __init__(self, present, table): + self.present = present + self.table = table + + def fetchall(self): + return [(1,)] if self.table in self.present else [] + + # Return local class instance directly (do not reference as attribute of _Con) + return _R(self.present, (params or [None])[0]) + + @property + def con(self): + return _DummyExec._Con(self._present) + + +def test_can_skip_node_requires_artifact_when_non_ephemeral(tmp_path): + cache = FingerprintCache(tmp_path, profile="dev", engine="duckdb") + cache.entries = {"users.ff": "xxx"} + + # artifact missing → cannot skip + ex = _DummyExec(present=set()) + assert not can_skip_node( + node_name="users.ff", + new_fp="xxx", + cache=cache, + executor=ex, + materialized="table", + ) + + # artifact present → skip ok + ex2 = _DummyExec(present={"users"}) + assert can_skip_node( + node_name="users.ff", + new_fp="xxx", + cache=cache, + executor=ex2, + materialized="table", + ) + + +def test_ephemeral_skip_without_artifact(tmp_path): + cache = FingerprintCache(tmp_path, profile="dev", engine="duckdb") + cache.entries = {"ephem.ff": "yyy"} + ex = _DummyExec(present=set()) + assert can_skip_node( + node_name="ephem.ff", + new_fp="yyy", + cache=cache, + executor=ex, + materialized="ephemeral", + ) diff --git a/tests/unit/test_cache_store.py b/tests/unit/test_cache_store.py new file mode 100644 index 0000000..aede73d --- /dev/null +++ b/tests/unit/test_cache_store.py @@ -0,0 +1,20 @@ +# tests/unit/test_cache_store.py +from __future__ import annotations + +from pathlib import Path + +from flowforge.cache import FingerprintCache + + +def test_cache_persist_roundtrip(tmp_path: Path): + proj = tmp_path + c1 = FingerprintCache(proj, profile="dev", engine="duckdb") + c1.load() # empty start + c1.set("users.ff", "abc") + c1.set("orders.ff", "def") + c1.save() + + c2 = FingerprintCache(proj, profile="dev", engine="duckdb") + c2.load() + assert c2.get("users.ff") == "abc" + assert c2.get("orders.ff") == "def" diff --git a/tests/unit/test_fingerprint.py b/tests/unit/test_fingerprint.py new file mode 100644 index 0000000..8b7b0a2 --- /dev/null +++ b/tests/unit/test_fingerprint.py @@ -0,0 +1,91 @@ +# tests/unit/test_fingerprint.py +from __future__ import annotations + +from pathlib import Path + +from flowforge.core import Node +from flowforge.fingerprint import ( + EnvCtx, + build_env_ctx, + fingerprint_py, + fingerprint_sql, + get_function_source, + inspect, + normalized_sources_blob, +) + + +def test_sources_normalization_stable(): + a = {"crm": {"users": {"identifier": "seed_users"}, "orders": {"identifier": "seed_orders"}}} + b = {"crm": {"orders": {"identifier": "seed_orders"}, "users": {"identifier": "seed_users"}}} + assert normalized_sources_blob(a) == normalized_sources_blob(b) + + +def test_env_ctx_respects_selected_env_keys(monkeypatch): + monkeypatch.setenv("FF_ENGINE", "duckdb") + monkeypatch.setenv("SECRET_TOKEN", "shh") + ctx1 = build_env_ctx(engine="duckdb", profile_name="dev", relevant_env_keys=["FF_ENGINE"]) + ctx2 = build_env_ctx(engine="duckdb", profile_name="dev", relevant_env_keys=["FF_ENGINE"]) + assert ctx1.to_payload() == ctx2.to_payload() + + # Changing an excluded env var should not alter ctx payload + monkeypatch.setenv("SECRET_TOKEN", "changed") + ctx3 = build_env_ctx(engine="duckdb", profile_name="dev", relevant_env_keys=["FF_ENGINE"]) + assert ctx1.to_payload() == ctx3.to_payload() + + +def test_fingerprint_sql_changes_on_small_sql_edit(): + node = Node(name="users.ff", kind="sql", path=Path(__file__)) + ctx = EnvCtx(engine="duckdb", profile="dev", env_vars={}, sources_json="{}") + fp1 = fingerprint_sql(node=node, rendered_sql="select 1 as x", env_ctx=ctx, dep_fps={}) + fp2 = fingerprint_sql(node=node, rendered_sql="select 2 as x", env_ctx=ctx, dep_fps={}) + assert fp1 != fp2 + + +def test_fingerprint_sql_dep_cascade(): + node = Node(name="mart.ff", kind="sql", path=Path(__file__), deps=["users.ff"]) + ctx = EnvCtx(engine="duckdb", profile="dev", env_vars={}, sources_json="{}") + fp_dep_a = "aaa" + fp_dep_b = "bbb" + fp1 = fingerprint_sql( + node=node, rendered_sql="select * from users", env_ctx=ctx, dep_fps={"users.ff": fp_dep_a} + ) + fp2 = fingerprint_sql( + node=node, rendered_sql="select * from users", env_ctx=ctx, dep_fps={"users.ff": fp_dep_b} + ) + assert fp1 != fp2 + + +def _dummy_func_a(x): + return x + 1 + + +def _dummy_func_b(x): + return x + 2 + + +def test_get_function_source_is_stable_and_different_per_change(): + src_a = get_function_source(_dummy_func_a) + src_b = get_function_source(_dummy_func_b) + assert isinstance(src_a, str) and isinstance(src_b, str) + assert src_a != src_b + + +def test_fingerprint_py_changes_with_source_and_deps(): + node = Node(name="py_model", kind="python", path=Path(__file__), deps=["users.ff"]) + ctx = EnvCtx(engine="duckdb", profile="dev", env_vars={}, sources_json="{}") + src = get_function_source(_dummy_func_a) + fp1 = fingerprint_py(node=node, func_src=src, env_ctx=ctx, dep_fps={"users.ff": "x"}) + fp2 = fingerprint_py(node=node, func_src=src, env_ctx=ctx, dep_fps={"users.ff": "y"}) + assert fp1 != fp2 + + +def test_get_function_source_fallback(monkeypatch): + # Force inspect.getsource to fail to exercise fallback path + def boom(_): + raise OSError("no source") + + monkeypatch.setattr(inspect, "getsource", boom, raising=True) + + src = get_function_source(_dummy_func_a) + assert isinstance(src, str) and len(src) > 0 diff --git a/uv.lock b/uv.lock index 96543e3..577e650 100644 --- a/uv.lock +++ b/uv.lock @@ -618,7 +618,7 @@ wheels = [ [[package]] name = "flowforge" -version = "0.1.0" +version = "0.3.0" source = { editable = "." } dependencies = [ { name = "bigframes" }, From 1869ab1ee8040aa7b9552a3cdf8e6df9ec4f18a8 Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Sat, 18 Oct 2025 17:44:52 +0200 Subject: [PATCH 02/16] Implemented ticket T3 --- src/flowforge/cache.py | 37 +--- src/flowforge/cli.py | 11 + src/flowforge/meta.py | 272 ++++++++++++++++++++++++ tests/integration/test_meta_postgres.py | 34 +++ tests/unit/test_meta_bigquery_fake.py | 71 +++++++ tests/unit/test_meta_duckdb.py | 40 ++++ 6 files changed, 431 insertions(+), 34 deletions(-) create mode 100644 src/flowforge/meta.py create mode 100644 tests/integration/test_meta_postgres.py create mode 100644 tests/unit/test_meta_bigquery_fake.py create mode 100644 tests/unit/test_meta_duckdb.py diff --git a/src/flowforge/cache.py b/src/flowforge/cache.py index 821ca23..76c1caf 100644 --- a/src/flowforge/cache.py +++ b/src/flowforge/cache.py @@ -9,9 +9,8 @@ from pathlib import Path from typing import Any -from sqlalchemy import text - from .core import relation_for +from .meta import relation_exists as _relation_exists_engine @dataclass @@ -93,39 +92,9 @@ def update_many(self, fps: Mapping[str, str]) -> None: def relation_exists(executor: Any, relation: str) -> bool: """ - best-effort existence check of a materialized relation on the current engine. - Returns True if the relation appears to exist, False if definitely missing. - For unknown engines, returns True (do not block execution). + Compatibility wrapper that delegates to the engine-aware implementation. """ - try: - # DuckDB - con = getattr(executor, "con", None) - if con is not None and hasattr(con, "execute"): - rows = con.execute( - "select 1 from information_schema.tables " - "where table_schema in ('main','temp') and table_name = ?", - [relation], - ).fetchall() - return bool(rows) - - # Postgres (via SQLAlchemy engine) - engine = getattr(executor, "engine", None) - if engine is not None and hasattr(engine, "begin"): - with engine.begin() as conn: - rows = conn.execute( - text( - "select 1 from information_schema.tables " - "where table_schema = current_schema() and table_name = :t" - ), - {"t": relation}, - ).fetchall() - return bool(rows) - - # Other engines: assume relation is present if cache says so - return True - except Exception: - # Be permissive on errors: don't block execution with a false negative - return True + return _relation_exists_engine(executor, relation) def can_skip_node( diff --git a/src/flowforge/cli.py b/src/flowforge/cli.py index c478b52..29face2 100644 --- a/src/flowforge/cli.py +++ b/src/flowforge/cli.py @@ -9,6 +9,7 @@ import time import traceback from collections.abc import Callable, Iterable +from contextlib import suppress from dataclasses import dataclass, field from pathlib import Path from typing import Annotated, Any, NoReturn, cast @@ -42,6 +43,7 @@ fingerprint_sql, get_function_source, ) +from .meta import ensure_meta_table, upsert_meta from .run_executor import ScheduleResult, schedule from .seeding import seed_project from .settings import EngineType, EnvSettings, Profile, resolve_profile @@ -524,6 +526,9 @@ def __post_init__(self) -> None: if LOG.isEnabledFor(logging.INFO): typer.echo(f"Profile: {self.env_name} | Engine: {self.ctx.profile.engine}") self.shared = self.ctx.make_executor() + # Ensure meta table exists once (best-effort; do not fail the run) + with suppress(Exception): + ensure_meta_table(self.shared[0]) relevant_env = [k for k in os.environ if k.startswith("FF_")] self.env_ctx = build_env_ctx( engine=self.ctx.profile.engine, @@ -601,6 +606,9 @@ def run_node(self, name: str) -> None: self.computed_fps[name] = cand_fp if LOG.isEnabledFor(logging.INFO): typer.echo(f"↻ Skipped {name} (cache hit)") + # Best-effort: write/update meta when we know fp and relation exists + with suppress(Exception): + upsert_meta(ex, name, relation_for(name), cand_fp, self.ctx.profile.engine) return if LOG.isEnabledFor(logging.INFO): typer.echo(f"→ Running {name} ({node.kind})") @@ -608,6 +616,9 @@ def run_node(self, name: str) -> None: if cand_fp is not None: with self.fps_lock: self.computed_fps[name] = cand_fp + # Best-effort: write/update meta on successful execution + with suppress(Exception): + upsert_meta(ex, name, relation_for(name), cand_fp, self.ctx.profile.engine) @staticmethod def before(_name: str) -> None: diff --git a/src/flowforge/meta.py b/src/flowforge/meta.py new file mode 100644 index 0000000..91be58a --- /dev/null +++ b/src/flowforge/meta.py @@ -0,0 +1,272 @@ +# src/flowforge/meta.py +""" +Engine-aware metadata store and relation-existence helpers. + +This module persists a per-engine `_ff_meta` table with the following columns: + - node_name (PK where supported) + - relation + - fp + - engine + - built_at (server-side timestamp) + +APIs: + ensure_meta_table(executor) + upsert_meta(executor, node_name, relation, fp, engine) + get_meta(executor, node_name) -> tuple[str, str, object, str] | None + relation_exists(executor, relation) -> bool + +Supported engines: + - DuckDB (executor.con) + - Postgres (executor.engine, optional .schema) + - BigQuery (executor.client, .dataset, optional .project) +""" + +from __future__ import annotations + +from typing import Any + +from sqlalchemy import text + +# --------------------------- Engine detection --------------------------- + + +def _is_duckdb(ex: Any) -> bool: + return hasattr(ex, "con") and hasattr(ex.con, "execute") + + +def _is_postgres(ex: Any) -> bool: + return hasattr(ex, "engine") and hasattr(ex.engine, "begin") + + +def _is_bigquery(ex: Any) -> bool: + return hasattr(ex, "client") and hasattr(ex, "dataset") + + +# --------------------------- Qualifier helpers --------------------------- + + +def _duck_name(name: str) -> str: + return '"' + name.replace('"', '""') + '"' + + +def _pg_qual_meta(ex: Any) -> str: + schema = getattr(ex, "schema", None) + if schema: + return f'"{schema}"."__ff_meta"' if schema.startswith("__") else f'"{schema}"."_ff_meta"' + return '"_ff_meta"' + + +def _bq_qual_meta(ex: Any) -> str: + dataset = getattr(ex, "dataset", None) + project = getattr(ex, "project", None) + if not dataset: + # best effort fallback (caller will fail later anyway) + return "`_ff_meta`" + if project: + return f"`{project}.{dataset}._ff_meta`" + return f"`{dataset}._ff_meta`" + + +# --------------------------- Public API --------------------------- + + +def ensure_meta_table(executor: Any) -> None: + """ + Create the _ff_meta table if it does not exist for the active engine. + """ + if _is_duckdb(executor): + sql = ( + 'create table if not exists "_ff_meta" (' + " node_name text primary key," + " relation text," + " fp text," + " engine text," + " built_at timestamp default current_timestamp" + ")" + ) + executor.con.execute(sql) + return + + if _is_postgres(executor): + qual = _pg_qual_meta(executor) + ddl = ( + f"create table if not exists {qual} (" + " node_name text primary key," + " relation text," + " fp text," + " engine text," + " built_at timestamptz default now()" + ")" + ) + with executor.engine.begin() as conn: + conn.execute(text(ddl)) + return + + if _is_bigquery(executor): + # BigQuery supports IF NOT EXISTS in standard SQL DDL + qual = _bq_qual_meta(executor) + ddl = ( + f"create table if not exists {qual} (" + " node_name string," + " relation string," + " fp string," + " engine string," + " built_at timestamp" + ")" + ) + executor.client.query(ddl).result() + return + + # Unknown engine: no-op + + +def upsert_meta(executor: Any, node_name: str, relation: str, fp: str, engine: str) -> None: + """ + Insert or update `_ff_meta` for a given node. + """ + ensure_meta_table(executor) + + if _is_duckdb(executor): + # DuckDB: emulate upsert via delete + insert inside the same connection. + executor.con.execute('delete from "_ff_meta" where node_name = ?', [node_name]) + executor.con.execute( + 'insert into "_ff_meta"(node_name, relation, fp, engine, built_at) ' + "values (?, ?, ?, ?, current_timestamp)", + [node_name, relation, fp, engine], + ) + return + + if _is_postgres(executor): + qual = _pg_qual_meta(executor) + sql = ( + f"insert into {qual}(node_name, relation, fp, engine, built_at) " + "values (:n, :r, :f, :e, now()) " + "on conflict (node_name) do update set " + " relation = excluded.relation, " + " fp = excluded.fp, " + " engine = excluded.engine, " + " built_at = now()" + ) + with executor.engine.begin() as conn: + conn.execute(text(sql), {"n": node_name, "r": relation, "f": fp, "e": engine}) + return + + if _is_bigquery(executor): + qual = _bq_qual_meta(executor) + + # Use MERGE to emulate upsert + # Parameterization with BigQuery QueryJobConfig is optional; build a safe literal instead. + def _q(s: str) -> str: + return s.replace("\\", "\\\\").replace("`", "\\`").replace("'", "\\'") + + sql = f"""merge {qual} T + using ( + select '{_q(node_name)}' as node_name, + '{_q(relation)}' as relation, + '{_q(fp)}' as fp, + '{_q(engine)}' as engine + ) S + on T.node_name = S.node_name + when matched then update set + relation = S.relation, + fp = S.fp, + engine = S.engine, + built_at = current_timestamp() + when not matched then insert (node_name, relation, fp, engine, built_at) + values (S.node_name, S.relation, S.fp, S.engine, current_timestamp()) + """ + executor.client.query(sql).result() + return + + # Unknown engine: no-op + + +def get_meta(executor: Any, node_name: str) -> tuple[str, str, Any, str] | None: + """ + Return (fp, relation, built_at, engine) for the node, or None if not found. + """ + if _is_duckdb(executor): + row = executor.con.execute( + 'select fp, relation, built_at, engine from "_ff_meta" where node_name = ? limit 1', + [node_name], + ).fetchone() + return (row[0], row[1], row[2], row[3]) if row else None + + if _is_postgres(executor): + qual = _pg_qual_meta(executor) + with executor.engine.begin() as conn: + row = conn.execute( + text( + f"select fp, relation, built_at, engine from {qual} " + "where node_name = :n limit 1" + ), + {"n": node_name}, + ).fetchone() + return (row[0], row[1], row[2], row[3]) if row else None + + if _is_bigquery(executor): + qual = _bq_qual_meta(executor) + # Parameterized query would need google.cloud.bigquery; keep it dependency-light. + node = node_name.replace("\\", "\\\\").replace("`", "\\`").replace("'", "\\'") + sql = ( + f"select fp, relation, built_at, engine from {qual} where node_name = '{node}' limit 1" + ) + rows = list(executor.client.query(sql).result()) + if not rows: + return None + r = rows[0] + # Access by field name if available, else positional + try: + return (r["fp"], r["relation"], r["built_at"], r["engine"]) + except Exception: + return (r[0], r[1], r[2], r[3]) + + return None + + +def relation_exists(executor: Any, relation: str) -> bool: + """ + Check whether a materialized relation exists on the active engine. + """ + if _is_duckdb(executor): + try: + rows = executor.con.execute( + "select 1 from information_schema.tables " + + "where table_schema in ('main','temp') and table_name = ?", + [relation], + ).fetchall() + return bool(rows) + except Exception: + return True # be permissive on unexpected errors + + if _is_postgres(executor): + try: + with executor.engine.begin() as conn: + rows = conn.execute( + text( + "select 1 from information_schema.tables " + + "where table_schema = current_schema() and table_name = :t" + ), + {"t": relation}, + ).fetchall() + return bool(rows) + except Exception: + return True + + if _is_bigquery(executor): + try: + dataset = getattr(executor, "dataset", None) + project = getattr(executor, "project", None) + if not dataset: + return True + qual = f"`{project}.{dataset}`" if project else f"`{dataset}`" + rel = relation.replace("`", "\\`").replace("'", "\\'") + sql = ( + f"select 1 from {qual}.INFORMATION_SCHEMA.TABLES where table_name = '{rel}' limit 1" + ) + rows = list(executor.client.query(sql).result()) + return bool(rows) + except Exception: + return True + + return True diff --git a/tests/integration/test_meta_postgres.py b/tests/integration/test_meta_postgres.py new file mode 100644 index 0000000..1838626 --- /dev/null +++ b/tests/integration/test_meta_postgres.py @@ -0,0 +1,34 @@ +# tests/integration/test_meta_postgres.py +from __future__ import annotations + +import os +from types import SimpleNamespace +from uuid import uuid4 + +import pytest +from sqlalchemy import create_engine, text + +from flowforge.meta import ensure_meta_table, get_meta, relation_exists, upsert_meta + +pytestmark = pytest.mark.postgres # mark to opt-in in CI + + +def test_postgres_meta_roundtrip(pg_env): + engine = create_engine(pg_env["FF_PG_DSN"]) + ex = SimpleNamespace(engine=engine, schema=os.getenv("FF_PG_SCHEMA", "public")) + + ensure_meta_table(ex) + # Use a unique node name per test run to avoid leftover rows from previous runs + node = f"users.ff::{uuid4().hex}" + assert get_meta(ex, node) is None + + upsert_meta(ex, node, "users", "abc", "postgres") + row = get_meta(ex, node) + assert row is not None + fp, rel, _, eng = row + assert (fp, rel, eng) == ("abc", "users", "postgres") + + # relation exists + with engine.begin() as conn: + conn.execute(text('create table if not exists "users" (id int)')) + assert relation_exists(ex, "users") is True diff --git a/tests/unit/test_meta_bigquery_fake.py b/tests/unit/test_meta_bigquery_fake.py new file mode 100644 index 0000000..87a59fb --- /dev/null +++ b/tests/unit/test_meta_bigquery_fake.py @@ -0,0 +1,71 @@ +# tests/unit/test_meta_bigquery_fake.py +from __future__ import annotations + +import re +from types import SimpleNamespace + +from flowforge.meta import ensure_meta_table, get_meta, relation_exists, upsert_meta + + +class FakeBQClient: + """Very small in-memory fake for BigQuery client.query().""" + + def __init__(self): + self.tables = {} # qual -> dict[node_name -> dict] + self.relations = set() + + class _Res: + def __init__(self, rows): + self._rows = rows + + def result(self): + return self._rows + + def query(self, sql: str): + sql_low = sql.lower() + sql_stripped_low = sql_low.lstrip() + # Naive parser for our limited SQL shapes + if "create table if not exists" in sql_low and "_ff_meta" in sql_low: + # no-op; table is "created" on demand + return FakeBQClient._Res([]) + if sql_stripped_low.startswith("merge ") and "_ff_meta" in sql_low: + # Extract '...literal...' AS with a small regex + def _grab(field: str) -> str: + m = re.search(rf"'([^']*)'\s+as\s+{field}\b", sql, flags=re.IGNORECASE) + return m.group(1) if m else "" + + node = _grab("node_name") + rel = _grab("relation") + fp = _grab("fp") + eng = _grab("engine") + qual = "meta" # single bucket + self.tables.setdefault(qual, {}) + self.tables[qual][node] = {"fp": fp, "relation": rel, "engine": eng, "built_at": "now"} + return FakeBQClient._Res([]) + if "information_schema.tables" in sql_low: + # existence check: table_name = 'rel' + rel = sql.split("table_name = '", 1)[1].split("'", 1)[0] + return FakeBQClient._Res([(1,)] if rel in self.relations else []) + if "select fp, relation, built_at, engine from" in sql_low: + node = sql.split("where node_name = '", 1)[1].split("'", 1)[0] + qual = "meta" + row = self.tables.get(qual, {}).get(node) + return FakeBQClient._Res( + [] if not row else [(row["fp"], row["relation"], row["built_at"], row["engine"])] + ) + return FakeBQClient._Res([]) + + +def test_bigquery_meta_with_fake(): + client = FakeBQClient() + ex = SimpleNamespace(client=client, dataset="dset", project="proj") + + ensure_meta_table(ex) + upsert_meta(ex, "users.ff", "users", "x1", "bigquery") + row = get_meta(ex, "users.ff") + assert row is not None and row[0] == "x1" and row[1] == "users" + + # existence + client.relations.add("users") + assert relation_exists(ex, "users") is True + assert relation_exists(ex, "unknown") is False diff --git a/tests/unit/test_meta_duckdb.py b/tests/unit/test_meta_duckdb.py new file mode 100644 index 0000000..d7799a1 --- /dev/null +++ b/tests/unit/test_meta_duckdb.py @@ -0,0 +1,40 @@ +# tests/unit/test_meta_duckdb.py +from __future__ import annotations + +from pathlib import Path + +from flowforge.executors.duckdb_exec import DuckExecutor +from flowforge.meta import ensure_meta_table, get_meta, relation_exists, upsert_meta + + +def test_duckdb_meta_roundtrip(tmp_path: Path): + ex = DuckExecutor(db_path=str(tmp_path / "t.duckdb")) + ensure_meta_table(ex) + + # Initially empty + assert get_meta(ex, "users.ff") is None + + # Upsert and read back + upsert_meta(ex, "users.ff", "users", "abc123", "duckdb") + row = get_meta(ex, "users.ff") + assert row is not None + fp, rel, built_at, eng = row + assert fp == "abc123" + assert rel == "users" + assert eng == "duckdb" + assert built_at is not None + + # Update + upsert_meta(ex, "users.ff", "users", "def456", "duckdb") + row2 = get_meta(ex, "users.ff") + assert row2 is not None + fp2, rel2, _, eng2 = row2 + assert fp2 == "def456" + assert rel2 == "users" + assert eng2 == "duckdb" + + # Relation existence helper + # Create a physical table and verify existence + ex.con.execute('create table "users" (id int)') + assert relation_exists(ex, "users") is True + assert relation_exists(ex, "does_not_exist") is False From 96e1fb445236dae5f3cb05dd88b85332bc00ec5d Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Sun, 19 Oct 2025 12:34:49 +0200 Subject: [PATCH 03/16] Ticket FF-304 --- .../.flowforge/cache/dev-duckdb.json | 16 +- .../site/dag/mart_orders_enriched.html | 26 -- .../simple_duckdb/site/dag/mart_users.ff.html | 18 -- .../simple_duckdb/site/dag/orders.ff.html | 16 -- examples/simple_duckdb/site/dag/users.ff.html | 16 -- .../site/dag/users_enriched.html | 18 -- .../simple_duckdb/site/dag/v_users.ff.html | 16 -- .../site/dag/v_users_enriched.ff.html | 16 -- pyproject.toml | 2 +- src/flowforge/cli.py | 53 +++- tests/unit/test_cache_policy_cli.py | 266 ++++++++++++++++++ 11 files changed, 325 insertions(+), 138 deletions(-) create mode 100644 tests/unit/test_cache_policy_cli.py diff --git a/examples/simple_duckdb/.flowforge/cache/dev-duckdb.json b/examples/simple_duckdb/.flowforge/cache/dev-duckdb.json index 821ffb1..65b8780 100644 --- a/examples/simple_duckdb/.flowforge/cache/dev-duckdb.json +++ b/examples/simple_duckdb/.flowforge/cache/dev-duckdb.json @@ -1,14 +1,14 @@ { "engine": "duckdb", "entries": { - "ephemeral_ids.ff": "61534518fe8ceb722140f1c6b6429362222672c27fd5aa4136cfbb03cd38654e", - "mart_orders_enriched": "da281dd1d5b462bb3e2dc632fd203d0528a3ab9527feedb586527551705e620d", - "mart_users.ff": "02f49fe6c1d761de1795f8e707507d6555f6157765c0424c4aebc78108e63c54", - "orders.ff": "cc9e389d5252c14d658314722787c6196cc1fa31478f560a53f0e60702956ede", - "users.ff": "4e0bfd811f1b17f20a3eb2605746bacf2e76e5bc2e7227f73e9f3f9b36521bb5", - "users_enriched": "3767af19bcb0b93e7fef32a668de2b7d9cb7ba9523da0b2c8c12986a9a51d1cb", - "v_users.ff": "032761584e627b5f7e2d2dd77eb885cfc3fc8e7e8d7e35f6f6fda45a2e4e8e73", - "v_users_enriched.ff": "a15c32702a0efeafc32d1a9f2a6106a076f8b2f4ce82c5eb99010e7221c59490" + "ephemeral_ids.ff": "39c4ed16881a27d114319c7db72cf395c1fd94ae32db95748cbd3f870819f53f", + "mart_orders_enriched": "82aa28b9d397c916e312a89b28f8884b53dc9c8db05d2dc37ed9e3fdbc7456ab", + "mart_users.ff": "f259da7d684eac53c6c567a1b584eaf04de4eb0b67acf9e2bed9fa3dfb70cb76", + "orders.ff": "33aed2c0c42b5876d9650044a1b1613c78e6f88a8aac673d9f42ebf415f7cbdd", + "users.ff": "6ada54acb464864200128da32015b389ca7c790173aee3c48621deb3ec752e12", + "users_enriched": "75d7bee1fe3f5502be45fd016d230c5e7ca77fa5d751717db7319fbb3f66d8de", + "v_users.ff": "87807a05147dfa3eaa88216f67f313f2d6efb5e74539568c04049158bee04c15", + "v_users_enriched.ff": "1f57803966346ded2224264c27fe1db71edc7fef3ac7d02612927009ac416afa" }, "profile": "dev", "version": 1 diff --git a/examples/simple_duckdb/site/dag/mart_orders_enriched.html b/examples/simple_duckdb/site/dag/mart_orders_enriched.html index 391ece5..bd49f5d 100644 --- a/examples/simple_duckdb/site/dag/mart_orders_enriched.html +++ b/examples/simple_duckdb/site/dag/mart_orders_enriched.html @@ -93,32 +93,6 @@

Metadata

-
-

Columns

- - - - - - - - - - - - - - - - - - - - - -
NameTypeNullable
order_idBIGINTyes
user_idBIGINTyes
amountDOUBLEyes
idBIGINTyes
emailVARCHARyes
signup_tsTIMESTAMPyes
is_gmailBOOLEANyes
valid_amtBOOLEANyes
-
-