Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions _scripts/concat_docs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#!/usr/bin/env python3
# concat_docs.py
"""
Fügt alle Markdown-Dateien aus dem docs-Verzeichnis zu einer einzelnen Datei zusammen.
- Respektiert die Reihenfolge in mkdocs.yml (nav).
- Ignoriert doppelte Einträge / Anker (#...).
- Hängt übrige .md-Dateien (nicht in nav) am Ende an.
- Optional: Headings demoten (um mehrfaches H1 zu vermeiden).
Concatenates all Markdown files from the docs directory into a single file.
- Respects the order in mkdocs.yml (nav).
- Ignores duplicate entries / anchors (#...).
- Appends remaining .md files (not in nav) at the end.
- Optional: Demote headings (to avoid multiple H1s).

Beispiel:
Example:
python concat_docs.py -o Combined.md
python concat_docs.py -o Combined.md --demote --exclude "reference/**" --exclude "site/**"
"""
Expand All @@ -30,18 +30,18 @@


def load_nav_order(project_root: Path) -> list[Path]:
"""Liest mkdocs.yml und extrahiert eine geordnete Liste der Markdown-Pfade (ohne Anker)."""
"""Read mkdocs.yml and extract an ordered list of Markdown paths (without anchors)."""
yml_path = project_root / MKDOCS_YML
ordered: list[Path] = []
if yaml is None or not yml_path.exists():
return ordered # keine Order-Info -> leere Liste
return ordered # no ordering info -> empty list
data = yaml.safe_load(yml_path.read_text(encoding="utf-8"))
nav = data.get("nav") if isinstance(data, dict) else None
if not isinstance(nav, list):
return ordered

def normalize_nav_item(item) -> list[str]:
# item kann dict ({"Title": "path.md" | ["subitems"]}) oder string sein
# Item can be dict ({"Title": "path.md" | ["subitems"]}) or string
out: list[str] = []
if isinstance(item, str):
out.append(item)
Expand All @@ -60,12 +60,12 @@ def normalize_nav_item(item) -> list[str]:

seen = set()
for p in paths:
# Nur Dateien unter docs berücksichtigen; Anker entfernen
# Only consider files under docs; strip anchors
p_no_anchor = p.split("#", 1)[0]
if not p_no_anchor.lower().endswith(".md"):
continue
# mkdocs erlaubt relative Pfade; wir interpretieren sie relativ zu docs/
# Falls der Pfad bereits "docs/..." enthält, normalisieren wir trotzdem
# mkdocs allows relative paths; interpret them relative to docs/
# If the path already contains "docs/...", normalize it anyway
if p_no_anchor.startswith(DOCS_DIR_DEFAULT + "/"):
rel = Path(p_no_anchor).relative_to(DOCS_DIR_DEFAULT)
else:
Expand Down Expand Up @@ -94,8 +94,8 @@ def apply_excludes(paths: list[Path], patterns: list[str]) -> list[Path]:

def demote_headings(text: str, levels: int = 1) -> str:
"""
Erhöht die Anzahl der '#' um 'levels' für alle ATX-Headings (Markdown #).
Lässt Codeblöcke unberührt.
Increase the number of '#' by 'levels' for all ATX headings (Markdown #).
Leave code fences untouched.
"""
if levels <= 0:
return text
Expand Down Expand Up @@ -160,12 +160,12 @@ def main():
print(f"Fehler: docs-Verzeichnis nicht gefunden: {docs_dir}", file=sys.stderr)
sys.exit(1)

# 1) Reihenfolge aus mkdocs.yml (falls nicht deaktiviert / vorhanden)
# 1) Order from mkdocs.yml (if not disabled / available)
nav_order = load_nav_order(project_root) if not args.no_nav else []
all_md = collect_md_files(docs_dir)
all_md = apply_excludes(all_md, args.exclude)

# 2) Liste zusammenstellen: zuerst nav, dann Rest (ohne Duplikate)
# 2) Build list: nav entries first, then the rest (without duplicates)
ordered: list[Path] = []
seen = set()
for rel in nav_order:
Expand Down
7 changes: 5 additions & 2 deletions docs/Config_and_Macros.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ Use the `@model` decorator from `fastflowtransform.core` to register a callable.
- `name` (optional) → overrides the logical name (defaults to stem).
- `deps` → list of dependency nodes (file stems or logical names).
- `requires` → column contract per dependency (validated via `validation.validate_required_columns`).
- `materialized` (optional) → `'table' | 'view' | 'ephemeral'`; mirrors `config(materialized=...)` for SQL.
- `tags` (optional) → convenience for attaching selection labels without writing `meta={"tags": ...}`.

Dependencies determine the call signature:

Expand All @@ -78,7 +80,8 @@ import pandas as pd
@model(
name="users_enriched",
deps=["users.ff"],
requires={"users": {"id", "email"}}
requires={"users": {"id", "email"}},
materialized="view",
)
def enrich(df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
Expand Down Expand Up @@ -172,7 +175,7 @@ override those defaults, add per-engine overrides, or point at files:

## 2. `config()` options

Call `config()` at the top of SQL models (and optionally within Python models via decorator kwargs in future versions).
Call `config()` at the top of SQL models. Python models get the same options via the `@model(..., materialized=..., tags=...)` decorator kwargs.

```sql
{{ config(
Expand Down
49 changes: 49 additions & 0 deletions docs/examples/Basic_Demo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Basic Demo Project

The `examples/basic_demo` project shows the smallest end-to-end FastFlowTransform pipeline. It combines one seed, a staging model, and a final mart while staying portable across DuckDB, Postgres, and Databricks Spark.

## Why it exists
- **Start small** – demonstrate the minimum folder structure (`seeds/`, `models/`, `profiles.yml`) needed to run `fft`.
- **Engine parity** – prove that a single project can target multiple engines by swapping profiles.
- **Understand outputs** – show where documentation and manifests land after a run.

Use it as a sandbox before adding your own sources, macros, or Python models.

## Project layout

| Path | Purpose |
|------|---------|
| `seeds/seed_users.csv` | Sample CRM-style user data. `fft seed` materializes it as `crm.users`. |
| `models/staging/users_clean.ff.sql` | Normalizes emails, casts types, and tags the model for all engines. |
| `models/marts/mart_users_by_domain.ff.sql` | Aggregates users per email domain and records the first/last signup dates. |
| `models/engines/*/mart_latest_signup.ff.py` | Engine-specific Python models (pandas for DuckDB/Postgres, PySpark for Databricks) selecting the most recent signup per domain from the staging view. |
| `profiles.yml` | Declares `dev_duckdb`, `dev_postgres`, and `dev_databricks` profiles driven by environment variables. |
| `.env.dev_*` | Template environment files you can `source` per engine. |
| `Makefile` | One command (`make demo ENGINE=…`) to seed, run, document, test, and preview results. |

## Running the demo

1. `cd examples/basic_demo`
2. Choose an engine and export its environment variables:
```bash
set -a; source .env.dev_duckdb; set +a
# swap to .env.dev_postgres or .env.dev_databricks for other engines
```
3. Execute the full flow:
```bash
make demo ENGINE=duckdb
```
The Makefile runs `fft seed`, `fft run`, `fft dag`, `fft test`, and `fft show basic_demo.mart_users_by_domain`. To preview the Python mart, run `make show ENGINE=duckdb SHOW_MODEL=mart_latest_signup` (or swap `ENGINE` as needed).
4. Inspect artifacts:
- `.fastflowtransform/target/manifest.json` and `run_results.json`
- `site/dag/index.html` for the rendered model graph
- CLI output from `fft show` displaying the aggregated mart

The demo also enables baseline data quality checks in `project.yml`. Running `fft test` (or `make test`) verifies that primary keys remain unique/not-null across `seed_users`, `users_clean`, `mart_users_by_domain`, and the Python mart, while ensuring aggregate metrics such as `user_count` never drop below zero and each domain appears only once in `mart_latest_signup`.

## Next steps

- Add more CSVs under `seeds/` and declare them in `sources.yml`.
- Create additional staging models so marts can reuse normalized data.
- Introduce Python models or macros mirroring how the API demo scales up.
- Update `.env.dev_*` with real credentials once you connect to shared databases.
3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Welcome! This page is your starting point for FastFlowTransform docs. Pick the t
- [Sources Declaration](./Sources.md)
- [Project Configuration](./Project_Config.md)
- [State Selection (changed & results)](./State_Selection.md)
- [Basic Demo Overview](./examples/Basic_Demo.md)
- [Cross-Table Reconciliations](./Technical_Overview.md#cross-table-reconciliations)
- [Auto-Docs & Lineage](./Technical_Overview.md#auto-docs-lineage)
- [Developer Guide](./Technical_Overview.md#part-ii-architecture-internals)
Expand All @@ -39,7 +40,7 @@ Welcome! This page is your starting point for FastFlowTransform docs. Pick the t
- **Understand the project layout & CLI workflow:** see *Project Layout*, *Makefile Targets*, and *CLI Flows* in the [Technical Overview](Technical_Overview.md#project-layout).
- **Configure runtimes & profiles:** review executor profiles, environment overrides, and logging options in the [Technical Overview](Technical_Overview.md#profiles-environment-overrides).
- **Model data quality & troubleshoot runs:** the [Technical Overview](Technical_Overview.md#model-unit-tests-fft-utest) covers unit tests, troubleshooting tips, and exit codes.
- **Explore runnable demos:** browse the `examples/` directory in the repo; each subproject comes with its own README.
- **Explore runnable demos:** start with the [Basic Demo Overview](examples/Basic_Demo.md) or browse the `examples/` directory; each subproject ships with its own README.

### 2. Extend FastFlowTransform (Developers & Contributors)

Expand Down
48 changes: 32 additions & 16 deletions examples/_scripts/cleanup_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
if SRC_DIR.exists() and str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))

from fastflowtransform.logging import LOG_PREFIX
from fastflowtransform.settings import EnvSettings, resolve_profile


def _log(msg: str) -> None:
print(msg)
if LOG_PREFIX:
print(f"{LOG_PREFIX} {msg}")
else:
print(msg)


def _coerce_path(value: str | None, project: Path) -> Path | None:
Expand Down Expand Up @@ -261,32 +265,44 @@ def main(argv: list[str] | None = None) -> int:
or ("dev_" + args.engine if args.engine in {"duckdb", "postgres"} else "dev")
)

os.environ["FFT_ACTIVE_ENV"] = env_name
_load_dotenv_layered(project, env_name)

profile = None
try:
os.environ["FFT_ACTIVE_ENV"] = env_name
_load_dotenv_layered(project, env_name)
profile = _load_profile(project, env_name, args.engine)
except Exception as exc: # pragma: no cover - best-effort logging
_log(
f"Warning: failed to resolve profile '{env_name}' for engine '{args.engine}': {exc}. "
"Continuing with environment variables only."
)

warehouse_path: Path | None = None
warehouse_path: Path | None = None
try:
if args.engine == "duckdb":
profile_duckdb = getattr(getattr(profile, "duckdb", None), "path", None)
profile_duckdb = (
getattr(getattr(profile, "duckdb", None), "path", None) if profile else None
)
db_path = args.duckdb_path or os.getenv("FF_DUCKDB_PATH") or profile_duckdb
cleanup_duckdb(project=project, db_path=db_path, dry_run=args.dry_run)
elif args.engine == "postgres":
profile_pg = getattr(profile, "postgres", None)
profile_dsn = getattr(profile_pg, "dsn", None)
profile_schema = getattr(profile_pg, "db_schema", None)
profile_pg = getattr(profile, "postgres", None) if profile else None
profile_dsn = getattr(profile_pg, "dsn", None) if profile_pg else None
profile_schema = getattr(profile_pg, "db_schema", None) if profile_pg else None
dsn = args.postgres_dsn or os.getenv("FF_PG_DSN") or profile_dsn
schema = args.postgres_schema or os.getenv("FF_PG_SCHEMA") or profile_schema
cleanup_postgres(dsn=dsn, schema=schema, dry_run=args.dry_run)
elif args.engine == "databricks_spark":
profile_db = getattr(profile, "databricks_spark", None)
profile_master = getattr(profile_db, "master", None)
profile_app = getattr(profile_db, "app_name", None)
profile_warehouse = getattr(profile_db, "warehouse_dir", None)
profile_database = getattr(profile_db, "database", None)
profile_catalog = getattr(profile_db, "catalog", None)
profile_use_hive = getattr(profile_db, "use_hive_metastore", False)
profile_extra_conf = getattr(profile_db, "extra_conf", None)
profile_db = getattr(profile, "databricks_spark", None) if profile else None
profile_master = getattr(profile_db, "master", None) if profile_db else None
profile_app = getattr(profile_db, "app_name", None) if profile_db else None
profile_warehouse = getattr(profile_db, "warehouse_dir", None) if profile_db else None
profile_database = getattr(profile_db, "database", None) if profile_db else None
profile_catalog = getattr(profile_db, "catalog", None) if profile_db else None
profile_use_hive = (
getattr(profile_db, "use_hive_metastore", False) if profile_db else False
)
profile_extra_conf = getattr(profile_db, "extra_conf", None) if profile_db else None
warehouse_path = cleanup_databricks(
project=project,
master=args.spark_master or profile_master,
Expand Down
97 changes: 97 additions & 0 deletions examples/basic_demo/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
.PHONY: seed run test dag show artifacts clean demo help

# --- Configuration -----------------------------------------------------------

DB ?= .local/basic_demo.duckdb
PROJECT ?= .
UV ?= uv

# Engine selector (duckdb|postgres|databricks_spark)
ENGINE ?= duckdb

# Resolve profile and tags per engine
ifeq ($(ENGINE),duckdb)
PROFILE_ENV = dev_duckdb
ENGINE_TAG = engine:duckdb
endif
ifeq ($(ENGINE),postgres)
PROFILE_ENV = dev_postgres
ENGINE_TAG = engine:postgres
endif
ifeq ($(ENGINE),databricks_spark)
PROFILE_ENV = dev_databricks
ENGINE_TAG = engine:databricks_spark
endif

BASE_ENV = FFT_ACTIVE_ENV=$(PROFILE_ENV)
RUN_ENV = $(BASE_ENV)

SELECT_FLAGS = --select tag:example:basic_demo --select tag:$(ENGINE_TAG)

SHOW_MODEL ?= mart_users_by_domain

CLEAN_SCRIPT = ../_scripts/cleanup_env.py

ifeq ($(ENGINE),duckdb)
CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine duckdb --env "$(PROFILE_ENV)" --project "$(PROJECT)" --duckdb-path "$(DB)"
else ifeq ($(ENGINE),postgres)
CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine postgres --env "$(PROFILE_ENV)" --project "$(PROJECT)"
else ifeq ($(ENGINE),databricks_spark)
CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine databricks_spark --env "$(PROFILE_ENV)" --project "$(PROJECT)"
else
$(error Unsupported ENGINE=$(ENGINE) - pick duckdb|postgres|databricks_spark)
endif

# --- Targets ----------------------------------------------------------------

help:
@echo "FastFlowTransform Basic Demo"
@echo "Targets:"
@echo " make seed ENGINE=$(ENGINE)"
@echo " make run ENGINE=$(ENGINE)"
@echo " make dag ENGINE=$(ENGINE)"
@echo " make test ENGINE=$(ENGINE)"
@echo " make show ENGINE=$(ENGINE) SHOW_MODEL=$(SHOW_MODEL)"
@echo " make demo ENGINE=$(ENGINE)"
@echo " make clean ENGINE=$(ENGINE)"
@echo
@echo "Variables: DB=$(DB) PROJECT=$(PROJECT) UV=$(UV)"

seed:
env $(BASE_ENV) $(UV) run fft seed "$(PROJECT)" --env $(PROFILE_ENV)

run:
env $(RUN_ENV) $(UV) run fft run "$(PROJECT)" --env $(PROFILE_ENV) $(SELECT_FLAGS)

test:
env $(BASE_ENV) $(UV) run fft test "$(PROJECT)" --env $(PROFILE_ENV) $(SELECT_FLAGS)

dag:
env $(RUN_ENV) $(UV) run fft dag "$(PROJECT)" --env $(PROFILE_ENV) $(SELECT_FLAGS) --html

show:
@if [ -f "$(PROJECT)/site/dag/index.html" ]; then \
$(OPENER) "$(PROJECT)/site/dag/index.html" 2>/dev/null || echo "Open manually at: $(PROJECT)/site/dag/index.html"; \
else \
echo "No HTML found: $(PROJECT)/site/dag/index.html"; \
fi

artifacts:
@echo
@echo "== 📦 Artifacts =="
@echo " $(PROJECT)/.fastflowtransform/target/{manifest.json,run_results.json,catalog.json}"
@echo " DAG HTML: $(PROJECT)/site/dag/index.html"

clean:
$(CLEAN_CMD)

demo: clean
@echo "== 🚀 Basic Demo ($(ENGINE)) =="
@echo "Profile=$(PROFILE_ENV) PROJECT=$(PROJECT)"
+$(MAKE) seed ENGINE=$(ENGINE)
+$(MAKE) run ENGINE=$(ENGINE)
+$(MAKE) dag ENGINE=$(ENGINE)
+$(MAKE) test ENGINE=$(ENGINE)
+$(MAKE) show ENGINE=$(ENGINE)
+$(MAKE) artifacts
@echo "✅ Demo complete."
Loading