Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .flowforge/cache/dev-duckdb.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"engine": "duckdb",
"entries": {},
"profile": "dev",
"version": 1
}
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
name: CI

env:
ACT: ""

on:
push:
branches: [ main ]
Expand Down Expand Up @@ -63,7 +66,7 @@ jobs:
- name: Unit tests (fast)
env:
PYTHONWARNINGS: default
run: uv run pytest -q -m "not slow and not postgres" --maxfail=1
run: uv run pytest -q tests -m "not slow and not postgres" --maxfail=1

# ---------- smoke: examples/simple_duckdb with view + ephemeral ----------
smoke-duckdb:
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ dist/

# Docs Output
examples/**/docs/
FlowForge.md
tickets/**
3 changes: 3 additions & 0 deletions Makefile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ test-pg-batch:
unittest:
FLOWFORGE_SQL_DEBUG=1 $(UV) run pytest -q tests

cover:
uv run pytest --cov=src/flowforge --cov-report=term-missing --cov-report=xml --cov-report=html

utest:
flowforge utest "$(FF_PROJECT)" --env "$(FF_ENV)"

Expand Down
28 changes: 27 additions & 1 deletion Makefile.pipeline
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ seed:

# Run/DAG/Test reuse the same duckdb path (FF_ENV can switch engine)
run:
$(FLOWFORGE) run "$(FF_PROJECT)" --env "$(FF_ENV)"
$(FLOWFORGE) run "$(FF_PROJECT)" --env "$(FF_ENV)" --jobs=1

run_parallel:
# Two independent staging nodes ("users", "orders") run in parallel
FF_ENGINE=duckdb FF_DUCKDB_PATH="$(DB)" flowforge run "$(PROJECT)" --env dev --jobs 4

run-parallel:
$(FLOWFORGE) run "$(FF_PROJECT)" --env "$(FF_ENV)" --jobs=4 --keep-going

dag:
$(FLOWFORGE) dag "$(FF_PROJECT)" --env "$(FF_ENV)" --html
Expand All @@ -32,3 +39,22 @@ demo: seed run dag demo-open test

clean:
rm -rf .local "$(FF_PROJECT)/docs" dist build *.egg-info

# --- Cache demos (v0.3) ---

cache_rw_first:
# first run writes cache and meta
FF_ENGINE=duckdb FF_DUCKDB_PATH="$(DB)" flowforge run "$(PROJECT)" --env dev --cache=rw

cache_rw_second:
# second run: should be a no-op (skips) if nothing changed
FF_ENGINE=duckdb FF_DUCKDB_PATH="$(DB)" flowforge run "$(PROJECT)" --env dev --cache=rw

cache_invalidate_env:
# changing an FF_* env var invalidates fingerprints
FF_ENGINE=duckdb FF_DUCKDB_PATH="$(DB)" FF_DEMO_TOGGLE=1 flowforge run "$(PROJECT)" --env dev --cache=rw

rebuild_users:
# force rebuild of a single model regardless of cache
FF_ENGINE=duckdb FF_DUCKDB_PATH="$(DB)" flowforge run "$(PROJECT)" --env dev --cache=rw --rebuild users.ff

92 changes: 91 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# FlowForge (PoC 0.1)
# FlowForge (PoC 0.3)

[![CI](https://github.com/<org>/<repo>/actions/workflows/ci.yml/badge.svg)](https://github.com/<org>/<repo>/actions/workflows/ci.yml)
[![PyPI version](https://img.shields.io/pypi/v/flowforge.svg)](https://pypi.org/project/flowforge/)
Expand Down Expand Up @@ -94,6 +94,96 @@ flowforge test examples/simple_duckdb --env dev --select batch

---

> For a deep dive into the v0.3 features, see **[Parallelism & Cache](docs/Cache_and_Parallelism.md)**.

## Parallelism & Cache (v0.3)

FlowForge 0.3 adds a level-wise parallel scheduler and an opt-in build cache.

### Parallel execution
- DAG is split into **levels** (all nodes with the same maximum distance from sources).
- Within a level, up to `--jobs` nodes run **in parallel**. Dependencies are never violated.
- `--keep-going`: tasks already started in a level run to completion, but **subsequent levels won’t start** if any task in the current level fails.

**Examples**
```bash
# run with 4 workers per level
flowforge run examples/simple_duckdb --env dev --jobs 4

# keep tasks in the current level running even if one fails
flowforge run examples/simple_duckdb --env dev --jobs 4 --keep-going
```

### Cache modes
The cache decides whether a node can be **skipped** when nothing relevant changed.

```
--cache=off # always build
--cache=rw # default: skip on match; write cache after build
--cache=ro # skip on match; build on miss, but don't write cache
--cache=wo # always build and write cache
--rebuild <glob> # ignore cache for selected nodes
--no-cache # alias for --cache=off
```

**When is a node skipped?**
FlowForge computes a **fingerprint** from:
- SQL/Python source (rendered SQL or function source)
- environment context (engine, profile name, selected `FF_*` env vars, normalized `sources.yml`)
- **dependency fingerprints** (change upstream ⇒ downstream fingerprint changes)
The node is skipped if the fingerprint matches the on-disk cache **and** the physical relation exists.

**Examples**
```bash
# first run (build + cache write)
flowforge run . --env dev --cache=rw

# second run (no-op if nothing changed)
flowforge run . --env dev --cache=rw

# force rebuild of a specific model
flowforge run . --env dev --cache=rw --rebuild marts_daily.ff

# diagnose a surprising skip: change an FF_* env var to invalidate fingerprints
FF_DEMO_TOGGLE=1 flowforge run . --env dev --cache=rw
```

**Troubleshooting**
- *“Why did it skip?”* → Compare your last changes: SQL/Python code, `sources.yml`, `FF_*` env vars, profile/engine. Any change alters the fingerprint.
- *“Relation missing but cache says skip”* → FlowForge also checks relation existence; if it was dropped externally, it will **rebuild**.
- *“Parallel tasks interleave logs”* → Logs are serialized via an internal queue to keep lines readable; use `-v`/`-vv` for more detail.

---

## Selective runs

Use patterns to run only a subgraph.

- `--select <pattern>`: builds only targets that match **and their dependencies**.
- `--exclude <pattern>`: excludes matching targets from the build (deps remain if still required).

Examples:
flowforge run . --select marts_daily.ff
flowforge run . --exclude 'mart_*'

---

## Rebuild controls

- `--rebuild` → rebuild **all selected** nodes (ignore cache).
- `--rebuild-only NAME …` → rebuild only the specified nodes (ignore cache).

These flags compose with `--select/--exclude`.

Examples:
# Rebuild everything that matches --select
flowforge run . --select marts_daily.ff --rebuild

# Rebuild only a specific node
flowforge run . --rebuild-only marts_daily.ff

---

## Documentation

- **Documentation hub:** choose your path (operators vs contributors) — see [`docs/index.md`](docs/index.md).
Expand Down
Loading