diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2508222..e646bb5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -122,6 +122,10 @@ jobs: extra: "" example: cache_demo env_file: examples/cache_demo/.env.dev_duckdb + - engine: duckdb + extra: "" + example: ci_demo + env_file: examples/ci_demo/.env.dev_duckdb - engine: duckdb extra: "" example: dq_demo @@ -147,6 +151,10 @@ jobs: extra: "postgres" example: basic_demo env_file: examples/basic_demo/.env.dev_postgres + - engine: postgres + extra: "postgres" + example: ci_demo + env_file: examples/ci_demo/.env.dev_postgres - engine: postgres extra: "postgres" example: cache_demo @@ -183,6 +191,11 @@ jobs: example: cache_demo java: true env_file: examples/cache_demo/.env.dev_databricks + - engine: databricks_spark + extra: "spark" + example: ci_demo + java: true + env_file: examples/ci_demo/.env.dev_databricks - engine: databricks_spark extra: "spark" example: dq_demo diff --git a/Makefile.built b/Makefile.built index ae209a8..c78b490 100644 --- a/Makefile.built +++ b/Makefile.built @@ -30,6 +30,3 @@ upload-test: build check upload: build check uvx twine upload dist/* - -clean: - rm -rf build dist *.egg-info diff --git a/Makefile.pipeline b/Makefile.pipeline index a628cfe..36351cd 100644 --- a/Makefile.pipeline +++ b/Makefile.pipeline @@ -40,9 +40,6 @@ test: demo: seed run dag demo-open test @echo "\n✓ Demo done." -clean: - rm -rf .local "$(FF_PROJECT)/docs" dist build *.egg-info - # --- Cache demos --- cache_rw_first: diff --git a/docs/CI_Check.md b/docs/CI_Check.md new file mode 100644 index 0000000..6d6c1dd --- /dev/null +++ b/docs/CI_Check.md @@ -0,0 +1,328 @@ +# CI Checks & Change-Aware Runs + +This page documents the new **CI integration** primitives in FastFlowTransform: + +* `fft ci-check` – **DB-free CI checks** (parsing, DAG, basic lint). +* `fft run --changed-since` – **change-aware runs** for “only what matters” PR jobs. + +These are designed to plug straight into GitHub Actions / GitLab / any CI system. + +--- + +## 1. `fft ci-check`: DB-free CI validation + +`fft ci-check` is a **lightweight, database-free** command that validates your project is structurally sound and ready to run. + +### What it does + +Given a project directory, `fft ci-check`: + +1. Loads `project.yml`, `sources.yml`, and `models/`. +2. Parses all SQL & Python models (`*.ff.sql`, `*.ff.py`). +3. Builds the DAG and checks for: + + * **Missing dependencies** (`ref('...')` pointing to non-existent models). + * **Cycles** in the model graph. +4. Runs basic **lint/quality** checks (extensible via `ci.core`). +5. Prints a concise summary and exits with a CI-friendly exit code. + +No database connection is created – it never calls `ctx.make_executor()`. + +### Basic usage + +```bash +# From the project root +fft ci-check . + +# Explicit env/engine (for engine-specific model filtering) +fft ci-check . --env dev_duckdb --engine duckdb +``` + +Typical CI snippet (GitHub Actions): + +```yaml +- name: FFT ci-check + run: | + uv run fft ci-check . --env dev_duckdb --engine duckdb +``` + +### Exit codes + +* `0` – all checks passed, **no errors**. +* `1` – at least one **error**-level issue (e.g. missing model, cycle). +* `>1` – reserved for future use (e.g. internal failures). + +Warnings (style nits, non-fatal issues) do **not** change the exit code, but are printed in the summary. + +### Output format + +The CLI prints a human-readable summary, e.g.: + +```text +CI Summary +────────── +✓ models parsed: 12 +✓ graph built: ok +✖ issues (2) + • [MISSING_DEP] Error: orders.ff → depends on missing model 'dim_users' + • [CYCLE] Error: Cycle detected among nodes: a.ff, b.ff + +Totals +────── +✓ ok: 0 +! warn: 0 +✖ error: 2 +``` + +The underlying objects are: + +* `CiIssue` – single issue: + + * `code`: short ID (`MISSING_DEP`, `CYCLE`, `STYLE`, …) + * `level`: `"error"` or `"warn"` + * `message`: human-readable description + * `obj_name`: model name, where applicable + * `file`, `line`, `column`: optional source location +* `CiSummary` – overall result: + + * `issues: list[CiIssue]` + * `selected_nodes: list[str]` – models considered in this run + * `all_nodes: list[str]` – all models in the project + +> These live under `fastflowtransform/ci/core.py` if you want to extend them. + +--- + +## 2. SARIF output for code scanning + +`fft ci-check` can optionally emit a **SARIF** file that GitHub / Azure DevOps / other tools can ingest as a code-scanning result. + +### Writing SARIF + +Use `fastflowtransform.ci.sarif.write_sarif` in a small wrapper script: + +```python +from pathlib import Path + +from fastflowtransform.ci.core import run_ci_checks +from fastflowtransform.ci.sarif import write_sarif + +def main() -> None: + # project_dir: repo root or project root + summary = run_ci_checks(project_dir=".", env_name="dev_duckdb", engine_name="duckdb") + + out = Path("artifacts/fft-ci.sarif.json") + write_sarif( + summary, + out, + tool_name="FastFlowTransform CI", + tool_version=None, # or your package version + ) + +if __name__ == "__main__": + main() +``` + +Then in CI you can upload `artifacts/fft-ci.sarif.json` to your code-scanning provider. + +### SARIF mapping + +Each `CiIssue` becomes a SARIF `result`: + +* `code` → `ruleId` +* `level`: + + * `"error"` → SARIF `"error"` + * anything else → SARIF `"warning"` +* `message` → `message.text` +* `file`, `line`, `column` → `locations[0].physicalLocation.*` + +Minimal example of a single result: + +```json +{ + "ruleId": "MISSING_DEP", + "level": "error", + "message": { "text": "Model has missing dependency 'dim_users'" }, + "locations": [ + { + "physicalLocation": { + "artifactLocation": { "uri": "models/orders.ff.sql" }, + "region": { + "startLine": 12, + "startColumn": 5 + } + } + } + ] +} +``` + +--- + +## 3. Change-aware runs with `--changed-since` + +You can now ask `fft run` to only process models affected by **Git changes**, via: + +```bash +fft run . --env dev_duckdb --changed-since origin/main +``` + +### How it works + +1. `get_changed_models(project_dir, git_ref)` + Looks at `git diff --name-only ..HEAD` and filters paths + to known model files (`*.ff.sql`, `*.ff.py`) under your project. + +2. `compute_affected_models(changed, REGISTRY.nodes)` + Computes the closure of **upstream and downstream** nodes from those + changed models in the DAG (so that dependencies and dependents are included). + +3. `_apply_changed_since_filter(...)` in `cli/run.py` merges this with + your existing `--select` / `--exclude` selection. + +### Selection semantics + +Let: + +* `wanted` = models selected by existing `--select` / `--exclude` logic. +* `affected` = models impacted by `--changed-since`. + +Then: + +* **No `--changed-since`:** + + ```text + final selection = wanted + ``` + +* **`--changed-since` but NO `--select` / `--exclude`:** + + ```text + final selection = affected + ``` + + > The set of changed+affected models becomes the universe; your original `wanted` is ignored. + +* **`--changed-since` AND `--select` and/or `--exclude`:** + + ```text + final selection = wanted ∩ affected + ``` + + > This lets you combine tag/name selectors with git awareness, e.g. “only DQ models that were affected”. + +If `affected` ends up empty (e.g. no relevant files changed), `fft run` exits early with a friendly “Nothing to run” message and `exit 0`. + +### Examples + +**1. CI: only run changed models (plus deps) on PRs** + +```bash +# In CI, from the project root +fft run . \ + --env dev_duckdb \ + --engine duckdb \ + --changed-since origin/main +``` + +This will: + +* Inspect the diff vs. `origin/main`. +* Determine all affected models (changed + upstream/downstream). +* Run only those. + +**2. Combine with tags** + +```bash +# Only run affected models tagged 'finance' +fft run . \ + --env dev_duckdb \ + --engine duckdb \ + --select tag:finance \ + --changed-since origin/main +``` + +Here, the final selection is: + +```text +final = {models with tag:finance} ∩ {affected_by_git_changes} +``` + +**3. Combine with state-based selectors** + +`--changed-since` plays nicely with `state:modified` selectors: + +```bash +fft run . \ + --env dev_duckdb \ + --select "state:modified,tag:dq_demo" \ + --changed-since origin/main +``` + +This narrows the run to: + +* Models modified according to fingerprint cache (`state:modified`), +* Tagged with `tag:dq_demo`, +* **and** affected by Git changes since `origin/main`. + +--- + +## 4. Practical CI patterns + +### A. Pure structural check (no DB) + +Good for **fast feedback** on every PR: + +```yaml +- name: FFT structural check + run: | + uv run fft ci-check . --env dev_duckdb --engine duckdb +``` + +### B. Change-aware run + tests on a dev DB + +```yaml +- name: Seed demo data + run: | + uv run fft seed . --env dev_duckdb + +- name: Run affected models only + run: | + uv run fft run . --env dev_duckdb --engine duckdb --changed-since origin/main + +- name: Run DQ tests on affected marts + run: | + uv run fft test . --env dev_duckdb --select tag:dq_demo +``` + +### C. SARIF publishing (GitHub Actions example) + +```yaml +- name: FFT ci-check + SARIF + run: | + uv run python scripts/fft_ci_sarif.py + +- name: Upload SARIF to GitHub + uses: github/codeql-action/upload-sarif@v2 + with: + sarif_file: artifacts/fft-ci.sarif.json +``` + +Where `scripts/fft_ci_sarif.py` is the small wrapper shown earlier. + +--- + +## 5. Summary + +The new CI features are meant to be: + +* **Safe & fast** – `fft ci-check` runs without a DB. +* **Integration-friendly** – clear exit codes, SARIF for code scanning. +* **Efficient** – `fft run --changed-since` targets only what changed (plus dependencies). + +From here you can: + +* Add `fft ci-check` to your **PR pipelines**. +* Use `--changed-since` in **run jobs** to avoid reprocessing the world on every commit. +* Extend `fastflowtransform.ci.core` with project-specific checks if needed. diff --git a/docs/index.md b/docs/index.md index 9473cbf..01a5098 100644 --- a/docs/index.md +++ b/docs/index.md @@ -34,6 +34,7 @@ Use this page as the front door into the docs: start with the orientation sectio - **Profiles & environments:** [Profiles & Environments](Profiles.md) covers executor profiles, environment overrides, credential handling, and engine-specific flags. - **Runtimes & observability flags:** [Logging & Verbosity](Logging.md) explains log levels, JSON logs, progress indicators, and metrics toggles during `fft run`. - **Local runtimes & engines:** [Local Engine Setup](examples/Local_Engine_Setup.md) walks through DuckDB, Postgres, Spark/Delta, BigQuery, and Snowflake Snowpark bootstrapping for the demos. +- **CI-friendly workflows:** [CI Checks & Change-Aware Runs](CI_Check.md) introduces `fft ci-check` and `fft run --changed-since` for structural validation and diff-aware pipelines. --- diff --git a/examples/ci_demo/.env.dev_bigquery_bigframes b/examples/ci_demo/.env.dev_bigquery_bigframes new file mode 100644 index 0000000..2110839 --- /dev/null +++ b/examples/ci_demo/.env.dev_bigquery_bigframes @@ -0,0 +1,7 @@ +# BigQuery profile for the ci demo +FF_BQ_PROJECT=fft-basic-demo +FF_BQ_DATASET=ci_demo +FF_BQ_LOCATION=EU + +# Path to service account JSON key (or rely on gcloud / workload identity) +GOOGLE_APPLICATION_CREDENTIALS=../secrets/fft-bigquery-demo-key.json diff --git a/examples/ci_demo/.env.dev_bigquery_pandas b/examples/ci_demo/.env.dev_bigquery_pandas new file mode 100644 index 0000000..2110839 --- /dev/null +++ b/examples/ci_demo/.env.dev_bigquery_pandas @@ -0,0 +1,7 @@ +# BigQuery profile for the ci demo +FF_BQ_PROJECT=fft-basic-demo +FF_BQ_DATASET=ci_demo +FF_BQ_LOCATION=EU + +# Path to service account JSON key (or rely on gcloud / workload identity) +GOOGLE_APPLICATION_CREDENTIALS=../secrets/fft-bigquery-demo-key.json diff --git a/examples/ci_demo/.env.dev_databricks b/examples/ci_demo/.env.dev_databricks new file mode 100644 index 0000000..a883c0b --- /dev/null +++ b/examples/ci_demo/.env.dev_databricks @@ -0,0 +1,14 @@ +# Databricks Spark (local) profile defaults for dq_demo +FF_SPARK_MASTER=local[*] +FF_SPARK_APP_NAME=dq_demo + +# Managed table configuration (Hive-compatible Spark session) +FF_DBR_ENABLE_HIVE=1 +FF_DBR_DATABASE=dq_demo +# Uncomment to switch to Delta Lake (requires delta-spark dependency) +# FF_DBR_TABLE_FORMAT=delta + +# Prefer an existing JAVA_HOME (e.g., in CI); fall back to the macOS brew path for local use. +if [ -z "${JAVA_HOME:-}" ] && [ -d "/opt/homebrew/opt/openjdk@17" ]; then + JAVA_HOME=/opt/homebrew/opt/openjdk@17 +fi diff --git a/examples/ci_demo/.env.dev_duckdb b/examples/ci_demo/.env.dev_duckdb new file mode 100644 index 0000000..24c62b5 --- /dev/null +++ b/examples/ci_demo/.env.dev_duckdb @@ -0,0 +1,3 @@ +# DuckDB profile for ci_demo +FF_DUCKDB_PATH=.local/ci_demo.duckdb +FF_DUCKDB_SCHEMA=ci_demo diff --git a/examples/ci_demo/.env.dev_postgres b/examples/ci_demo/.env.dev_postgres new file mode 100644 index 0000000..1c5b896 --- /dev/null +++ b/examples/ci_demo/.env.dev_postgres @@ -0,0 +1,3 @@ +# Postgres profile for ci_demo (store real secrets outside VCS!) +FF_PG_DSN=postgresql+psycopg://postgres:postgres@localhost:5432 +FF_PG_SCHEMA=ci_demo diff --git a/examples/ci_demo/.env.dev_snowflake b/examples/ci_demo/.env.dev_snowflake new file mode 100644 index 0000000..0d055a5 --- /dev/null +++ b/examples/ci_demo/.env.dev_snowflake @@ -0,0 +1,18 @@ +# Snowflake Snowpark profile for the DQ demo + +# Your Snowflake account identifier, e.g. xy12345.eu-central-1 +FF_SF_ACCOUNT=your_account_id + +# Username & password (or extend to keypair auth) +FF_SF_USER=your_username +FF_SF_PASSWORD=your_password + +# Compute warehouse +FF_SF_WAREHOUSE=COMPUTE_WH + +# Database & schema for the demo +FF_SF_DATABASE=EXAMPLE_DEMO +FF_SF_SCHEMA=CI_DEMO + +# Optional role +FF_SF_ROLE=ANALYST diff --git a/examples/ci_demo/Makefile b/examples/ci_demo/Makefile new file mode 100644 index 0000000..a3c568f --- /dev/null +++ b/examples/ci_demo/Makefile @@ -0,0 +1,132 @@ +.PHONY: demo ci-check run-all run-changed run-changed-demo sarif artifacts clean + +# --- Config ------------------------------------------------------------------- + +DB ?= .local/ci_demo.duckdb +PROJECT ?= . +UV ?= uv + +# For the CI demo we keep it simple and default to DuckDB. +ENGINE ?= duckdb + +# BigQuery frame type selector (pandas | bigframes) +BQ_FRAME ?= bigframes + +UNAME_S := $(shell uname -s) +ifeq ($(UNAME_S),Darwin) + OPENER := open +else + OPENER := xdg-open +endif + +ifeq ($(ENGINE),duckdb) + PROFILE_ENV = dev_duckdb + ENGINE_TAG = engine:duckdb +endif +ifeq ($(ENGINE),postgres) + PROFILE_ENV = dev_postgres + ENGINE_TAG = engine:postgres +endif +ifeq ($(ENGINE),databricks_spark) + ENGINE_TAG = engine:databricks_spark + PROFILE_ENV = dev_databricks +endif +ifeq ($(ENGINE),bigquery) + ENGINE_TAG = engine:bigquery + ifeq ($(BQ_FRAME),pandas) + PROFILE_ENV = dev_bigquery_pandas + else + PROFILE_ENV = dev_bigquery_bigframes + endif +endif +ifeq ($(ENGINE),snowflake_snowpark) + PROFILE_ENV = dev_snowflake + ENGINE_TAG = engine:snowflake_snowpark +endif + +BASE_ENV = FFT_ACTIVE_ENV=$(PROFILE_ENV) FF_ENGINE=$(ENGINE) +ifeq ($(ENGINE),bigquery) + BASE_ENV := $(BASE_ENV) FF_ENGINE_VARIANT=$(BQ_FRAME) +endif + +RUN_ENV = $(BASE_ENV) + +SELECT_FLAGS = --select tag:example:ci_demo --select tag:$(ENGINE_TAG) + +CLEAN_SCRIPT = ../_scripts/cleanup_env.py + +ifeq ($(ENGINE),duckdb) + CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine duckdb --env "$(PROFILE_ENV)" --project "$(PROJECT)" --duckdb-path "$(DB)" +else ifeq ($(ENGINE),postgres) + CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine postgres --env "$(PROFILE_ENV)" --project "$(PROJECT)" +else ifeq ($(ENGINE),databricks_spark) + CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine databricks_spark --env "$(PROFILE_ENV)" --project "$(PROJECT)" +else ifeq ($(ENGINE),bigquery) + CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine bigquery --env "$(PROFILE_ENV)" --project "$(PROJECT)" +else ifeq ($(ENGINE),snowflake_snowpark) + CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine snowflake_snowpark --env "$(PROFILE_ENV)" --project "$(PROJECT)" +else + CLEAN_CMD = $(error Unsupported ENGINE=$(ENGINE) for cleanup) +endif + +# Git ref for change-aware runs (can override on the command line) +# make ENGINE=duckdb CHANGED_SINCE=origin/main run-changed +CHANGED_SINCE ?= HEAD~1 + +# --- Core targets ------------------------------------------------------------- + +# Static CI check: parses project, builds DAG, validates deps/cycles, +# and shows which nodes would be selected (no DB required). +ci-check: + env $(BASE_ENV) $(UV) run fft ci-check "$(PROJECT)" --env $(PROFILE_ENV) $(SELECT_FLAGS) + +# Full run of all CI demo models (normal fft run). +run-all: + env $(RUN_ENV) $(UV) run fft run "$(PROJECT)" --env $(PROFILE_ENV) $(SELECT_FLAGS) + +# Change-aware run: only models affected by Git changes since CHANGED_SINCE. +run-changed: + env $(RUN_ENV) $(UV) run fft run "$(PROJECT)" \ + --env $(PROFILE_ENV) \ + $(SELECT_FLAGS) \ + --changed-since "$(CHANGED_SINCE)" + +run-changed-demo: + @echo "== Running only 'changed' models (demo override) ==" + env $(RUN_ENV) \ + FF_CI_CHANGED_MODELS="fct_events.ff,py_enrich.ff" \ + $(UV) run fft run "$(PROJECT)" \ + --env $(PROFILE_ENV) \ + $(SELECT_FLAGS) \ + --changed-since "$(CHANGED_SINCE)" + +# Emit a SARIF file from the CI check summary, suitable for GitHub Code Scanning. +# Assumes you have examples/ci_demo/scripts/ci_to_sarif.py wired up to run_ci_check + write_sarif. +sarif: + env $(RUN_ENV) $(UV) run python scripts/ci_to_sarif.py \ + "$(PROJECT)" \ + --engine "$(ENGINE)" \ + --env "$(PROFILE_ENV)" \ + $(SELECT_FLAGS) \ + --out ".fastflowtransform/ci/ci_demo.sarif" + +artifacts: + @echo + @echo "== 📦 CI Demo Artifacts ==" + @echo " CI summary: .fastflowtransform/ci/ci_demo.sarif (SARIF)" + @echo " Run manifest: .fastflowtransform/target/manifest.json" + @echo " Run results: .fastflowtransform/target/run_results.json" + +clean: + $(CLEAN_CMD) + +# One-shot demo: static check, then a change-aware run, then SARIF output. +demo: clean + @echo "== 🧪 ci_demo (CI & change-aware runs) ==" + @echo "Profile=$(PROFILE_ENV) ENGINE=$(ENGINE) PROJECT=$(PROJECT)" + +$(MAKE) ci-check + +$(MAKE) run-changed + +$(MAKE) run-changed-demo + +$(MAKE) sarif + +$(MAKE) artifacts + @echo "✅ CI demo done." diff --git a/examples/ci_demo/README.md b/examples/ci_demo/README.md new file mode 100644 index 0000000..1e85a57 --- /dev/null +++ b/examples/ci_demo/README.md @@ -0,0 +1,128 @@ +# CI Demo Project + +The **CI Demo** is a minimal FastFlowTransform project designed to showcase: + +- `fft ci-check` for **DB-free PR checks** +- `fft run --changed-since ` for **change-aware runs** + +It uses three tiny models: + +- `base_events.ff.sql` → synthetic one-row events table +- `fct_events.ff.sql` → aggregates events +- `py_enrich.ff.py` → trivial Python post-processing + +No seeds, no DQ tests – just the essentials for CI. + +--- + +## 1. Quick start + +From the repo root: + +```bash +cd examples/ci_demo +```` + +### 1.1 Structural CI check (no DB required) + +```bash +fft ci-check . --env dev_duckdb --engine duckdb +``` + +What this does: + +* Loads `project.yml` and models +* Parses SQL and Python models +* Builds the dependency graph +* Detects: + + * missing refs + * cycles + * basic config/schema issues + +It **does not** connect to a database. + +Exit codes are CI-friendly: + +* `0` = all good +* `1` = parse/graph/config problems (suitable for PR blocking) +* `2` (optional) = only warnings, depending on your CI policy + +--- + +## 2. Change-aware runs (`--changed-since`) + +To run **only models affected by code changes** (since `origin/main`): + +```bash +fft run . \ + --env dev_duckdb \ + --engine duckdb \ + --changed-since origin/main +``` + +Semantics: + +* FFT inspects git diffs to find changed model files (SQL/Python). +* Computes the affected subgraph (changed + upstream/downstream). +* Intersects that with your `--select` / `--exclude` patterns, if provided. + +Examples: + +```bash +# Only run affected models that also match tag:example:ci_demo +fft run . \ + --env dev_duckdb \ + --engine duckdb \ + --changed-since origin/main \ + --select tag:example:ci_demo + +# Combine with cache: +fft run . \ + --env dev_duckdb \ + --engine duckdb \ + --changed-since origin/main \ + --cache RW +``` + +--- + +## 3. CI integration sketch + +### GitHub Actions (example) + +```yaml +name: fft-ci + +on: + pull_request: + paths: + - "examples/ci_demo/**" + - "src/fastflowtransform/**" + +jobs: + fft-ci: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 # required for --changed-since + + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install FFT + run: | + pip install -e . + + - name: Structural CI check (no DB) + working-directory: examples/ci_demo + run: | + fft ci-check . --env dev_duckdb --engine duckdb + + - name: Change-aware run + working-directory: examples/ci_demo + run: | + fft run . --env dev_duckdb --engine duckdb --changed-since origin/main diff --git a/examples/ci_demo/models/README.md b/examples/ci_demo/models/README.md new file mode 100644 index 0000000..32818bb --- /dev/null +++ b/examples/ci_demo/models/README.md @@ -0,0 +1,4 @@ +# Models directory + +Place SQL (`*.ff.sql`) and Python (`*.ff.py`) models here. +See docs/Config_and_Macros.md for modeling guidance and config options. diff --git a/examples/ci_demo/models/base_events.ff.sql b/examples/ci_demo/models/base_events.ff.sql new file mode 100644 index 0000000..cafc3bc --- /dev/null +++ b/examples/ci_demo/models/base_events.ff.sql @@ -0,0 +1,18 @@ +{{ config( + materialized='table', + tags=[ + 'example:ci_demo', + 'scope:staging', + 'engine:duckdb', + 'engine:postgres', + 'engine:databricks_spark', + 'engine:bigquery', + 'engine:snowflake_snowpark' + ], +) }} + +-- Base model: a tiny synthetic events table +select + 1 as event_id, + 'page_view' as event_type, + current_timestamp as event_ts diff --git a/examples/ci_demo/models/fct_events.ff.sql b/examples/ci_demo/models/fct_events.ff.sql new file mode 100644 index 0000000..dcc260e --- /dev/null +++ b/examples/ci_demo/models/fct_events.ff.sql @@ -0,0 +1,21 @@ +{{ config( + materialized='table', + tags=[ + 'example:ci_demo', + 'scope:staging', + 'engine:duckdb', + 'engine:postgres', + 'engine:databricks_spark', + 'engine:bigquery', + 'engine:snowflake_snowpark' + ], +) }} + +-- Simple fact table built from base_events +select + event_type, + count(*) as event_count, + min(event_ts) as first_event_ts, + max(event_ts) as last_event_ts +from {{ ref('base_events.ff') }} +group by event_type diff --git a/examples/ci_demo/models/py_enrich.ff.py b/examples/ci_demo/models/py_enrich.ff.py new file mode 100644 index 0000000..ce7be43 --- /dev/null +++ b/examples/ci_demo/models/py_enrich.ff.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from typing import Any + +from fastflowtransform.decorators import model + + +@model( + name="py_enrich.ff", + kind="python", + deps=["fct_events.ff"], + tags=[ + "example:ci_demo", + "scope:engine", + "engine:duckdb", + "engine:postgres", + "engine:databricks_spark", + "engine:bigquery", + "engine:snowflake_snowpark", + ], +) +def build(fct_events: Any) -> Any: + """ + Minimal Python model: takes the fct_events frame/table and adds a dummy flag. + The executor decides how 'fct_events' is provided (DuckDB.df, pandas, etc.). + """ + # Assume fct_events is something DataFrame-like with .assign + try: + return fct_events.assign(is_enriched=True) # pandas / polars-style + except AttributeError: + # Fallback: if it's not DataFrame-like, just return as-is + return fct_events diff --git a/examples/ci_demo/profiles.yml b/examples/ci_demo/profiles.yml new file mode 100644 index 0000000..c48eb3c --- /dev/null +++ b/examples/ci_demo/profiles.yml @@ -0,0 +1,56 @@ +# profiles.yml for ci_demo +# Connection details come from environment variables (via .env.* files). + +dev_duckdb: + engine: duckdb + duckdb: + path: "{{ env('FF_DUCKDB_PATH', '.local/ci_demo.duckdb') }}" + +dev_postgres: + engine: postgres + postgres: + dsn: "{{ env('FF_PG_DSN') }}" + db_schema: "{{ env('FF_PG_SCHEMA', 'ci_demo') }}" + +dev_databricks: + engine: databricks_spark + databricks_spark: + master: "{{ env('FF_SPARK_MASTER', 'local[*]') }}" + app_name: "{{ env('FF_SPARK_APP_NAME', 'ci_demo') }}" + warehouse_dir: "{{ project_dir() }}/.local/spark_warehouse" + extra_conf: + spark.hadoop.javax.jdo.option.ConnectionURL: "jdbc:derby:{{ project_dir() }}/.local/metastore_db;create=true" + spark.hadoop.datanucleus.rdbms.datastoreAdapterClassName: "org.datanucleus.store.rdbms.adapter.DerbyAdapter" + spark.hadoop.datanucleus.schema.autoCreateAll: "true" + spark.hadoop.javax.jdo.option.ConnectionDriverName: "org.apache.derby.jdbc.EmbeddedDriver" + spark.driver.extraJavaOptions: "-Dderby.stream.error.file={{ project_dir() }}/.local/derby.log" + +dev_bigquery_bigframes: + engine: bigquery + bigquery: + project: "{{ env('FF_BQ_PROJECT') }}" + dataset: "{{ env('FF_BQ_DATASET', 'ci_demo') }}" + location: "{{ env('FF_BQ_LOCATION', 'EU') }}" + use_bigframes: true + # allow_create_dataset: true + +dev_bigquery_pandas: + engine: bigquery + bigquery: + project: "{{ env('FF_BQ_PROJECT') }}" + dataset: "{{ env('FF_BQ_DATASET', 'ci_demo') }}" + location: "{{ env('FF_BQ_LOCATION', 'EU') }}" + use_bigframes: false + # allow_create_dataset: true + +dev_snowflake: + engine: snowflake_snowpark + snowflake_snowpark: + account: "{{ env('FF_SF_ACCOUNT') }}" + user: "{{ env('FF_SF_USER') }}" + password: "{{ env('FF_SF_PASSWORD') }}" + warehouse: "{{ env('FF_SF_WAREHOUSE', 'COMPUTE_WH') }}" + database: "{{ env('FF_SF_DATABASE', 'EXAMPLE_DEMO') }}" + schema: "{{ env('FF_SF_SCHEMA', 'CI_DEMO') }}" + role: "{{ env('FF_SF_ROLE', '') }}" + allow_create_schema: true diff --git a/examples/ci_demo/project.yml b/examples/ci_demo/project.yml new file mode 100644 index 0000000..06309ba --- /dev/null +++ b/examples/ci_demo/project.yml @@ -0,0 +1,8 @@ +name: dq_demo +version: "0.1" + +vars: {} + +models: {} + +seeds: {} diff --git a/examples/ci_demo/scripts/ci_to_sarif.py b/examples/ci_demo/scripts/ci_to_sarif.py new file mode 100644 index 0000000..a6012ba --- /dev/null +++ b/examples/ci_demo/scripts/ci_to_sarif.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python +from __future__ import annotations + +import argparse +from pathlib import Path + +from fastflowtransform.cli.bootstrap import _prepare_context +from fastflowtransform.ci.core import run_ci_check +from fastflowtransform.ci.sarif import write_sarif +from fastflowtransform.logging import bind_context, clear_context, echo + +try: + # Optional – use package version if available + from fastflowtransform import __version__ as FFT_VERSION +except Exception: # pragma: no cover + FFT_VERSION = None + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Run fft ci-check programmatically and emit a SARIF report.\n\n" + "Typical usage (from examples/ci_demo):\n" + " python scripts/ci_to_sarif.py . " + "--env dev_duckdb --engine duckdb --out ci-scan.sarif" + ) + ) + parser.add_argument( + "project", + nargs="?", + default=".", + help="Project directory (default: current directory).", + ) + parser.add_argument( + "--env", + dest="env_name", + default="dev_duckdb", + help="Environment/profile name (default: dev_duckdb).", + ) + parser.add_argument( + "--engine", + dest="engine", + default="duckdb", + help="Engine hint (duckdb|postgres|bigquery|...). Default: duckdb.", + ) + parser.add_argument( + "--select", + nargs="*", + default=None, + help="Optional selectors (simple substrings for now, e.g. customers, .ff, tag:whatever).", + ) + parser.add_argument( + "--out", + dest="out", + default="ci-scan.sarif", + help="Output SARIF file path (default: ci-scan.sarif).", + ) + return parser.parse_args() + + +def main() -> None: + args = _parse_args() + + # Load project & registry (no DB work required for run_ci_check) + ctx = _prepare_context( + project_arg=str(args.project), + env_name=args.env_name, + engine=args.engine, + vars_opt=None, + ) + bind_context(engine=ctx.profile.engine, env=args.env_name) + + echo( + f"[FFT] CI check (project={Path(args.project).resolve()}, " + f"env={args.env_name}, engine={ctx.profile.engine})" + ) + + # Static CI checks (graph, selection preview, etc.) + summary = run_ci_check(select=args.select) + + out_path = Path(args.out) + write_sarif( + summary, + out_path, + tool_name="FastFlowTransform CI", + tool_version=FFT_VERSION, + ) + + echo(f"[FFT] Wrote SARIF report to {out_path}") + clear_context() + + +if __name__ == "__main__": + main() diff --git a/examples/ci_demo/seeds/README.md b/examples/ci_demo/seeds/README.md new file mode 100644 index 0000000..2e553ed --- /dev/null +++ b/examples/ci_demo/seeds/README.md @@ -0,0 +1,4 @@ +# Seeds directory + +Add CSV or Parquet files for reproducible seeds. +Usage examples are covered in docs/Quickstart.md and docs/Config_and_Macros.md#13-seeds-sources-and-dependencies. diff --git a/examples/ci_demo/sources.yml b/examples/ci_demo/sources.yml new file mode 100644 index 0000000..83436dc --- /dev/null +++ b/examples/ci_demo/sources.yml @@ -0,0 +1,9 @@ +# Source declarations describe external tables. See docs/Sources.md for details. +version: 2 +# sources: + # Example: + # - name: raw + # schema: staging + # tables: + # - name: users + # identifier: seed_users diff --git a/examples/ci_demo/tests/dq/README.md b/examples/ci_demo/tests/dq/README.md new file mode 100644 index 0000000..1acd01d --- /dev/null +++ b/examples/ci_demo/tests/dq/README.md @@ -0,0 +1,4 @@ +# Data quality tests + +Store custom data-quality tests that run via `fft test` (docs/Data_Quality_Tests.md). +Use this directory for schema-bound tests separate from unit specs. diff --git a/examples/ci_demo/tests/unit/README.md b/examples/ci_demo/tests/unit/README.md new file mode 100644 index 0000000..b3c3c8d --- /dev/null +++ b/examples/ci_demo/tests/unit/README.md @@ -0,0 +1,4 @@ +# Unit tests + +Define YAML unit specs as described in docs/Config_and_Macros.md#73-model-unit-tests-fft-utest. +Invoke them with `fft utest --env `. diff --git a/examples/dq_demo/models/engines/bigquery/bigframes/mart_orders_agg.ff.py b/examples/dq_demo/models/engines/bigquery/bigframes/mart_orders_agg.ff.py deleted file mode 100644 index b52ada0..0000000 --- a/examples/dq_demo/models/engines/bigquery/bigframes/mart_orders_agg.ff.py +++ /dev/null @@ -1,71 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Any - -from fastflowtransform import engine_model - -if TYPE_CHECKING: - import bigframes.pandas as bpd - from bigframes.pandas import DataFrame as BFDataFrame -else: - bpd: Any = None - - class BFDataFrame: # pragma: no cover - placeholder for runtime type hints - ... - - -def _get_bigframes() -> Any: - try: - import bigframes.pandas as bpd_mod - except Exception as exc: # pragma: no cover - optional dep guard - raise RuntimeError( - "bigframes is required for this model. Install fastflowtransform[bigquery_bf]." - ) from exc - return bpd_mod - - -@engine_model( - env_match={ - "FF_ENGINE": "bigquery", - "FF_ENGINE_VARIANT": "bigframes", - }, - name="mart_orders_agg", - materialized="table", - tags=[ - "example:dq_demo", - "scope:mart", - "engine:bigquery", - ], - deps=["orders.ff", "customers.ff"], - require={ - "orders.ff": ["order_id", "customer_id", "amount", "order_ts"], - "customers.ff": ["customer_id", "name", "status"], - }, -) -def build(orders: BFDataFrame, customers: BFDataFrame) -> BFDataFrame: - _get_bigframes() - base = orders.merge(customers, on="customer_id", how="inner", suffixes=("", "_cust")) - - grouped = ( - base.groupby(["customer_id", "name", "status"], dropna=False) - .agg( - order_count=("order_id", "count"), - total_amount=("amount", "sum"), - first_order_ts=("order_ts", "min"), - last_order_ts=("order_ts", "max"), - ) - .reset_index() - ) - - grouped = grouped.rename(columns={"name": "customer_name"}) # type: ignore[arg-type] - return grouped[ - [ - "customer_id", - "customer_name", - "status", - "order_count", - "total_amount", - "first_order_ts", - "last_order_ts", - ] - ] diff --git a/examples/dq_demo/models/engines/bigquery/pandas/mart_orders_agg.ff.py b/examples/dq_demo/models/engines/bigquery/pandas/mart_orders_agg.ff.py deleted file mode 100644 index 0da4018..0000000 --- a/examples/dq_demo/models/engines/bigquery/pandas/mart_orders_agg.ff.py +++ /dev/null @@ -1,57 +0,0 @@ -from __future__ import annotations - -import pandas as pd - -from fastflowtransform import engine_model - - -@engine_model( - env_match={ - "FF_ENGINE": "bigquery", - "FF_ENGINE_VARIANT": "pandas", - }, - name="mart_orders_agg", - materialized="table", - tags=[ - "example:dq_demo", - "scope:mart", - "engine:bigquery", - ], - deps=["orders.ff", "customers.ff"], - require={ - "orders.ff": ["order_id", "customer_id", "amount", "order_ts"], - "customers.ff": ["customer_id", "name", "status"], - }, -) -def build(orders: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame: - """Aggregate orders per customer for reconciliation checks.""" - base = orders.merge(customers, on="customer_id", how="inner", suffixes=("", "_cust")) - - grouped = ( - base.groupby(["customer_id", "name", "status"], dropna=False) - .agg( - order_count=("order_id", "count"), - total_amount=("amount", "sum"), - first_order_ts=("order_ts", "min"), - last_order_ts=("order_ts", "max"), - ) - .reset_index() - ) - - grouped.rename( - columns={ - "name": "customer_name", - }, - inplace=True, - ) - return grouped[ - [ - "customer_id", - "customer_name", - "status", - "order_count", - "total_amount", - "first_order_ts", - "last_order_ts", - ] - ] diff --git a/mkdocs.yml b/mkdocs.yml index d9fb342..a39db2b 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -38,6 +38,7 @@ nav: - Logging: Logging.md - Unit Tests: Unit_Tests.md - Snapshots: Snapshots.md + - CI Checks & Change-Aware Runs: CI_Check.md - Troubleshooting: Troubleshooting.md - Examples: - Basic Demo: examples/Basic_Demo.md diff --git a/pyproject.toml b/pyproject.toml index 5b41b8f..54337f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "fastflowtransform" -version = "0.6.6" +version = "0.6.7" description = "Python framework for SQL & Python data transformation, ETL pipelines, and dbt-style data modeling" readme = "README.md" license = { text = "Apache-2.0" } diff --git a/src/fastflowtransform/ci/__init__.py b/src/fastflowtransform/ci/__init__.py new file mode 100644 index 0000000..a813586 --- /dev/null +++ b/src/fastflowtransform/ci/__init__.py @@ -0,0 +1,10 @@ +# fastflowtransform/ci/__init__.py +from __future__ import annotations + +from fastflowtransform.ci.core import CiIssue, CiSummary, run_ci_check + +__all__ = [ + "CiIssue", + "CiSummary", + "run_ci_check", +] diff --git a/src/fastflowtransform/ci/changed_since.py b/src/fastflowtransform/ci/changed_since.py new file mode 100644 index 0000000..03da8dc --- /dev/null +++ b/src/fastflowtransform/ci/changed_since.py @@ -0,0 +1,154 @@ +# fastflowtransform/ci/changed_since.py +from __future__ import annotations + +import os +import subprocess +from collections.abc import Mapping +from pathlib import Path + +from fastflowtransform.core import REGISTRY, Node +from fastflowtransform.logging import get_logger + +logger = get_logger("ci.changed_since") + + +def get_changed_models(project_dir: Path, git_ref: str) -> set[str]: + """ + Return the set of model names (REGISTRY.nodes keys) whose model files + changed since `git_ref`. + + We look at: + - models/**/*.ff.sql + - models/**/*.ff.py + + Anything else is currently ignored. If git is unavailable or the repo is + not initialized, we log a warning and return an empty set. + + For demos/CI you can override detection via the FF_CI_CHANGED_MODELS + environment variable: + + FF_CI_CHANGED_MODELS="models/a.ff.sql,models/b.ff.sql" # paths or names + + In that case we treat the provided comma-separated tokens as changed + *model names* and skip git entirely. + """ + # --- Demo/CI override ----------------------------------------------- + override = os.getenv("FF_CI_CHANGED_MODELS") + if override: + # Allow either bare model names ("a.ff") or file-ish tokens; + # we just strip whitespace and ignore empties. + tokens = [tok.strip() for tok in override.split(",")] + return {t for t in tokens if t} + + # --- Normal git-based detection ------------------------------------- + project_dir = project_dir.resolve() + + try: + proc = subprocess.run( + ["git", "diff", "--name-only", git_ref, "HEAD", "--", "."], + cwd=project_dir, + capture_output=True, + text=True, + check=False, + ) + except Exception as exc: # pragma: no cover (env-specific) + logger.warning("Unable to run 'git diff' for --changed-since=%s: %s", git_ref, exc) + return set() + + if proc.returncode != 0: + stderr = (proc.stderr or "").strip() + if stderr: + logger.warning( + "git diff --name-only %s...HEAD failed (exit %s): %s", + git_ref, + proc.returncode, + stderr, + ) + else: + logger.warning( + "git diff --name-only %s...HEAD failed (exit %s)", + git_ref, + proc.returncode, + ) + return set() + + node_names = set(REGISTRY.nodes.keys()) + changed_models: set[str] = set() + + for line in proc.stdout.splitlines(): + rel = line.strip() + if not rel: + continue + + # Normalize to POSIX-style paths for prefix checks + rel_posix = Path(rel).as_posix() + + # Only treat model files as model changes + if not rel_posix.startswith("models/"): + continue + + if not (rel_posix.endswith(".ff.sql") or rel_posix.endswith(".ff.py")): + continue + + stem = Path(rel_posix).stem # e.g. "customers.ff" + if stem in node_names: + changed_models.add(stem) + + if not changed_models: + logger.info( + "No model files under 'models/' changed since %s (based on git diff).", + git_ref, + ) + + return changed_models + + +def compute_affected_models( + changed: set[str], + nodes: Mapping[str, Node], +) -> set[str]: + """ + Given a set of changed model names, return the transitive closure of all + affected models: + + - all changed models + - all their upstream dependencies + - all their downstream dependents + + Only model dependencies (REGISTRY.nodes edges) are considered; sources are + ignored for this calculation. + """ + if not changed: + return set() + + # Build adjacency maps + upstream: dict[str, set[str]] = {} + downstream: dict[str, set[str]] = {name: set() for name in nodes} + + for name, node in nodes.items(): + deps = {d for d in (node.deps or []) if d in nodes} + upstream[name] = deps + for dep in deps: + downstream.setdefault(dep, set()).add(name) + + affected: set[str] = set(changed) + + # Upstream (ancestors) + stack = list(changed) + while stack: + cur = stack.pop() + for parent in upstream.get(cur, ()): + if parent not in affected: + affected.add(parent) + stack.append(parent) + + # Downstream (descendants) + stack = list(changed) + while stack: + cur = stack.pop() + for child in downstream.get(cur, ()): + if child not in affected: + affected.add(child) + stack.append(child) + + return affected diff --git a/src/fastflowtransform/ci/core.py b/src/fastflowtransform/ci/core.py new file mode 100644 index 0000000..419c77b --- /dev/null +++ b/src/fastflowtransform/ci/core.py @@ -0,0 +1,227 @@ +# fastflowtransform/ci/core.py +from __future__ import annotations + +from collections.abc import Sequence +from dataclasses import dataclass + +from fastflowtransform.cli.selectors import _compile_selector, _parse_select +from fastflowtransform.core import REGISTRY +from fastflowtransform.dag import topo_sort +from fastflowtransform.errors import DependencyNotFoundError, ModelCycleError +from fastflowtransform.logging import get_logger + +logger = get_logger("ci") + + +@dataclass +class CiIssue: + """ + A single CI issue (error or warning) discovered during static checks. + + This is intentionally generic enough to be mapped to SARIF, GitHub + annotations, or plain log lines. + """ + + level: str # "error" | "warning" + code: str # short machine-friendly code, e.g. "GRAPH_CYCLE" + message: str # human-readable + file: str | None = None + line: int | None = None + column: int | None = None + obj_name: str | None = None # model/source/test name, if applicable + + +@dataclass +class CiSummary: + """ + Overall result of a CI check run. + """ + + issues: list[CiIssue] + selected_nodes: list[str] + all_nodes: list[str] + + +def _graph_issues() -> list[CiIssue]: + """ + Run the same validation logic as your DAG / registry: + + - Missing dependencies → DependencyNotFoundError + - Cycles → ModelCycleError + + and convert these into CiIssue entries. + + This uses: + * REGISTRY.nodes + * fastflowtransform.dag.topo_sort + """ + issues: list[CiIssue] = [] + + nodes = getattr(REGISTRY, "nodes", {}) or {} + if not nodes: + # Nothing to validate here + return issues + + try: + # topo_sort will: + # - raise DependencyNotFoundError for missing deps + # - raise ModelCycleError for cycles + # - otherwise return a valid ordering + topo_sort(nodes) + return issues + except DependencyNotFoundError as exc: + # topo_sort raises DependencyNotFoundError(missing_map) + payload = getattr(exc, "missing", None) or (exc.args[0] if exc.args else None) + + if isinstance(payload, dict): + for node_name, missing in payload.items(): + if not missing: + continue + msg = f"Missing dependencies for '{node_name}': {', '.join(sorted(missing))}" + issues.append( + CiIssue( + level="error", + code="MISSING_DEP", + message=msg, + obj_name=node_name, + ) + ) + else: + # Fallback: just surface the exception message + issues.append( + CiIssue( + level="error", + code="MISSING_DEP", + message=str(exc), + ) + ) + return issues + except ModelCycleError as exc: + # topo_sort raises ModelCycleError("Cycle detected among nodes: a, b, c") + msg = str(exc) + node_list: list[str] = [] + # Best-effort parse of the node names from the message + parts = msg.split(":", 1) + if len(parts) == 2: + node_list = [p.strip() for p in parts[1].split(",") if p.strip()] + + issues.append( + CiIssue( + level="error", + code="GRAPH_CYCLE", + message=msg, + obj_name=",".join(node_list) if node_list else None, + ) + ) + return issues + + +def _resolve_selection( + all_nodes: Sequence[str], + patterns: Sequence[str] | None, +) -> list[str]: + """ + Best-effort selection resolution. + + If the CLI selector engine is available, we reuse it so that CI behaves + like `fft run` (supports tag:, state:, etc.). If that fails, we fall back + to a simple substring-based filter: + + pattern "foo" matches any node name containing "foo". + + If no patterns are provided, all nodes are considered "selected". + """ + if not all_nodes: + return [] + + if not patterns: + return list(all_nodes) + + # --- Prefer the real selector engine used by `fft run` ------------------ + try: + tokens = _parse_select(list(patterns)) + _, pred = _compile_selector(tokens) + + selected: list[str] = [] + for name in all_nodes: + node = REGISTRY.nodes.get(name) + if node is not None and pred(node): + selected.append(name) + + # Deduplicate while preserving order + seen: set[str] = set() + out: list[str] = [] + for n in selected: + if n not in seen: + seen.add(n) + out.append(n) + return out + except Exception as exc: # pragma: no cover - defensive fallback + logger.warning("CI selector resolution via cli.selectors failed: %s", exc) + + # --- Fallback: simple substring match on node names ---------------------- + selected_simple: list[str] = [] + for name in all_nodes: + if any(pat in name for pat in patterns): + selected_simple.append(name) + + seen = set() + out = [] + for n in selected_simple: + if n not in seen: + seen.add(n) + out.append(n) + return out + + +def run_ci_check( + *, + select: Sequence[str] | None = None, +) -> CiSummary: + """ + Run static CI checks against the currently-loaded project. + + Assumptions: + - REGISTRY.load_project(...) has already been called (via _prepare_context). + - No database connections are required (purely static). + + Returns: + CiSummary with issues and selection info. + + Exit codes are handled by the CLI wrapper, based on the returned issues. + """ + issues: list[CiIssue] = [] + + # --- Gather all nodes (models) known to the registry -------------------- + nodes = getattr(REGISTRY, "nodes", {}) or {} + all_nodes = sorted(nodes.keys()) + + if not all_nodes: + issues.append( + CiIssue( + level="warning", + code="NO_MODELS", + message="Registry contains no models for this project.", + ) + ) + + # --- Graph sanity checks (missing deps + cycles) ------------------------ + issues.extend(_graph_issues()) + + # --- Selection preview --------------------------------------------------- + selected_nodes = _resolve_selection(all_nodes, select) + + if all_nodes and not selected_nodes: + issues.append( + CiIssue( + level="warning", + code="SELECTION_EMPTY", + message="Selection is empty: no nodes would run with the given --select.", + ) + ) + + return CiSummary( + issues=issues, + selected_nodes=selected_nodes, + all_nodes=all_nodes, + ) diff --git a/src/fastflowtransform/ci/sarif.py b/src/fastflowtransform/ci/sarif.py new file mode 100644 index 0000000..3b331a1 --- /dev/null +++ b/src/fastflowtransform/ci/sarif.py @@ -0,0 +1,72 @@ +# fastflowtransform/ci/sarif.py +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from fastflowtransform.ci.core import CiIssue, CiSummary + + +def _issue_to_sarif_result(issue: CiIssue) -> dict[str, Any]: + """ + Map a CiIssue into a minimal SARIF result record. + """ + level = "error" if issue.level.lower() == "error" else "warning" + + locations: list[dict[str, Any]] = [] + if issue.file: + region: dict[str, Any] = {} + if issue.line is not None: + region["startLine"] = issue.line + if issue.column is not None: + region["startColumn"] = issue.column + + loc: dict[str, Any] = { + "physicalLocation": { + "artifactLocation": {"uri": issue.file}, + } + } + if region: + loc["physicalLocation"]["region"] = region + locations.append(loc) + + return { + "ruleId": issue.code, + "level": level, + "message": {"text": issue.message}, + "locations": locations, + } + + +def write_sarif( + summary: CiSummary, + path: Path, + *, + tool_name: str = "FastFlowTransform CI", + tool_version: str | None = None, +) -> None: + """ + Serialize a CiSummary into a SARIF file consumable by GitHub code scanning + or other tools. + + This is intentionally minimal but standards-compliant enough for basic use. + """ + results = [_issue_to_sarif_result(issue) for issue in summary.issues] + + driver: dict[str, Any] = {"name": tool_name} + if tool_version: + driver["version"] = tool_version + + sarif = { + "version": "2.1.0", + "runs": [ + { + "tool": {"driver": driver}, + "results": results, + } + ], + } + + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(sarif, indent=2), encoding="utf-8") diff --git a/src/fastflowtransform/cli/__init__.py b/src/fastflowtransform/cli/__init__.py index ec07ada..0b0abec 100644 --- a/src/fastflowtransform/cli/__init__.py +++ b/src/fastflowtransform/cli/__init__.py @@ -16,6 +16,7 @@ _resolve_profile, _resolve_project_path, ) +from fastflowtransform.cli.ci_cmd import register as _register_ci from fastflowtransform.cli.dag_cmd import dag, register as _register_dag from fastflowtransform.cli.docgen_cmd import docgen, register as _register_docgen from fastflowtransform.cli.docs_utils import ( @@ -132,6 +133,7 @@ def main( _register_init(app) _register_snapshot(app) _register_source(app) +_register_ci(app) __all__ = [ diff --git a/src/fastflowtransform/cli/ci_cmd.py b/src/fastflowtransform/cli/ci_cmd.py new file mode 100644 index 0000000..033a075 --- /dev/null +++ b/src/fastflowtransform/cli/ci_cmd.py @@ -0,0 +1,141 @@ +# fastflowtransform/cli/ci_cmd.py +from __future__ import annotations + +import textwrap +from collections.abc import Sequence + +import typer + +from fastflowtransform.ci.core import CiIssue, CiSummary, run_ci_check +from fastflowtransform.cli.bootstrap import _prepare_context +from fastflowtransform.cli.options import EngineOpt, EnvOpt, ProjectArg, SelectOpt, VarsOpt +from fastflowtransform.logging import bind_context, clear_context, echo + +from .selectors import _parse_select + + +def _format_issue_line(issue: CiIssue) -> str: + """ + Pretty-print a single CiIssue as a one-line summary. + + Example: + [E] MISSING_DEP (orders.ff): Missing dependencies for 'orders.ff': customers.ff + """ + level = (issue.level or "error").lower() + lvl = "E" if level == "error" else "W" + + target = f" ({issue.obj_name})" if issue.obj_name else "" + location = "" + if issue.file: + location = issue.file + if issue.line is not None: + location += f":{issue.line}" + if issue.column is not None: + location += f":{issue.column}" + location = f" [{location}]" + + msg = issue.message or "" + return f"[{lvl}] {issue.code}{target}: {msg}{location}" + + +def _print_text_summary( + summary: CiSummary, + *, + project: str, + select_tokens: Sequence[str], +) -> None: + """ + Human-friendly text output for `fft ci-check`, similar in spirit to `fft run`. + """ + echo("CI Check Summary") + echo("────────────────") + echo(f"Project: {project}") + echo(f"Models: {len(summary.selected_nodes)}/{len(summary.all_nodes)} selected") + if select_tokens: + echo(f"Select: {', '.join(select_tokens)}") + + # Issues section + echo("\nIssues") + echo("──────") + if not summary.issues: + echo("None 🎉") + else: + for issue in summary.issues: + echo(f"- {_format_issue_line(issue)}") + + # Selection preview + echo("\nSelected models") + echo("──────────────") + if not summary.selected_nodes: + echo("") + else: + for name in summary.selected_nodes: + echo(f"• {name}") + + +def ci_check( + project: ProjectArg = ".", + env_name: EnvOpt = "dev", + engine: EngineOpt = None, + vars: VarsOpt = None, + select: SelectOpt = None, +) -> None: + """ + Static CI check: parse project, validate DAG, and preview selection. + + Runs **without a database connection** by default. Intended for PR/CI jobs: + + - Validates that models parse and dependencies are resolvable. + - Detects dependency cycles. + - Performs a dry-run selection based on --select (no execution). + - Returns exit code 1 if any error-level issues are present. + """ + # Load project + registry, but do NOT create an executor → no DB work. + ctx = _prepare_context(project, env_name, engine, vars) + bind_context(engine=ctx.profile.engine, env=env_name) + + # Match run.py: simple profile/engine banner + echo(f"Profile: {env_name} | Engine: {ctx.profile.engine}") + + # Reuse the same select token parsing as fft run (but purely static) + select_tokens = _parse_select(select or []) + + # Run CI core checks on the loaded registry + summary = run_ci_check(select=select_tokens) + + # Text summary (stdout) + _print_text_summary( + summary, + project=str(ctx.project), + select_tokens=select_tokens, + ) + + # Decide exit code: any error-level issue ⇒ non-zero exit for CI + has_errors = any(issue.level == "error" for issue in summary.issues) + + clear_context() + + if has_errors: + raise typer.Exit(1) + + +def register(app: typer.Typer) -> None: + """ + Register `fft ci-check` on the main Typer app. + """ + app.command( + "ci-check", + help=textwrap.dedent( + """\ + Static CI check: parse models, validate DAG, and preview selection + without touching the database. + + Examples: + fft ci-check . --env dev + fft ci-check . --env dev --select tag:finance + """ + ), + )(ci_check) + + +__all__ = ["ci_check", "register"] diff --git a/src/fastflowtransform/cli/options.py b/src/fastflowtransform/cli/options.py index 14145f9..08d3613 100644 --- a/src/fastflowtransform/cli/options.py +++ b/src/fastflowtransform/cli/options.py @@ -1,3 +1,4 @@ +# fastflowtransform/cli/options.py from __future__ import annotations from enum import Enum @@ -193,11 +194,23 @@ class UTestCacheMode(str, Enum): ), ] +ChangedSinceOpt = Annotated[ + str | None, + typer.Option( + "--changed-since", + help=( + "Limit the run to models affected by files changed since the given " + "git ref (e.g. origin/main)." + ), + ), +] + __all__ = [ "CacheMode", "CacheOpt", "CaseOpt", + "ChangedSinceOpt", "EngineOpt", "EnvOpt", "ExcludeOpt", diff --git a/src/fastflowtransform/cli/run.py b/src/fastflowtransform/cli/run.py index 3930ffa..debd7a5 100644 --- a/src/fastflowtransform/cli/run.py +++ b/src/fastflowtransform/cli/run.py @@ -9,6 +9,7 @@ from contextlib import suppress from dataclasses import dataclass, field from datetime import UTC, datetime +from pathlib import Path from typing import Any import typer @@ -20,10 +21,15 @@ write_run_results, ) from fastflowtransform.cache import FingerprintCache, can_skip_node +from fastflowtransform.ci.changed_since import ( + compute_affected_models, + get_changed_models, +) from fastflowtransform.cli.bootstrap import CLIContext, _prepare_context from fastflowtransform.cli.options import ( CacheMode, CacheOpt, + ChangedSinceOpt, EngineOpt, EnvOpt, ExcludeOpt, @@ -38,6 +44,12 @@ SelectOpt, VarsOpt, ) +from fastflowtransform.cli.selectors import ( + _compile_selector, + _parse_select, + _selected_subgraph_names, + augment_with_state_modified, +) from fastflowtransform.core import REGISTRY, relation_for from fastflowtransform.dag import levels as dag_levels from fastflowtransform.fingerprint import ( @@ -52,13 +64,6 @@ from fastflowtransform.meta import ensure_meta_table from fastflowtransform.run_executor import ScheduleResult, schedule -from .selectors import ( - _compile_selector, - _parse_select, - _selected_subgraph_names, - augment_with_state_modified, -) - @dataclass class _RunEngine: @@ -415,6 +420,48 @@ def _wanted_names( ) +def _apply_changed_since_filter( + ctx: CLIContext, + wanted: set[str], + select: SelectOpt, + exclude: ExcludeOpt, + changed_since: str | None, +) -> set[str]: + """ + If --changed-since is provided, restrict the selection to models whose + files changed since the given git ref PLUS their upstream/downstream + neighbors. + + Semantics: + - Without --select/--exclude: + wanted = affected_models + - With --select or --exclude: + wanted = wanted ∩ affected_models + + (So you can combine tag/namespace selectors with --changed-since.) + """ + if not changed_since: + return wanted + + project_dir = ctx.project + if not isinstance(project_dir, Path): + project_dir = Path(project_dir) + + changed = get_changed_models(project_dir, changed_since) + affected = compute_affected_models(changed, REGISTRY.nodes) + + if not affected: + # Nothing affected by changes → nothing to run + return set() + + # If user also provided selectors/excludes, intersect with those. + if (select and len(select) > 0) or (exclude and len(exclude) > 0): + return wanted & affected + + # No further selectors → affected models define the universe + return affected + + def _explicit_targets( rebuild_only: RebuildOnlyOpt, rebuild: bool, select: SelectOpt, raw_selected: list[str] ) -> list[str]: @@ -542,6 +589,7 @@ def run( rebuild_only: RebuildOnlyOpt = None, offline: OfflineOpt = False, http_cache: HttpCacheOpt = None, + changed_since: ChangedSinceOpt = None, ) -> None: # _ensure_logging() # HTTP/API-Flags → ENV, damit fastflowtransform.api.http sie liest @@ -558,6 +606,14 @@ def run( select_tokens, _, raw_selected = _select_predicate_and_raw(engine_, ctx, select) wanted = _wanted_names(select_tokens=select_tokens, exclude=exclude, raw_selected=raw_selected) + wanted = _apply_changed_since_filter( + ctx=ctx, + wanted=wanted, + select=select, + exclude=exclude, + changed_since=changed_since, + ) + explicit_targets = _explicit_targets(rebuild_only, rebuild, select, raw_selected) _maybe_exit_if_empty(wanted, explicit_targets) diff --git a/tests/integration/examples/config.py b/tests/integration/examples/config.py index 399d4d9..0973195 100644 --- a/tests/integration/examples/config.py +++ b/tests/integration/examples/config.py @@ -47,6 +47,16 @@ class ExampleConfig: "databricks_spark": "dev_databricks", }, ), + ExampleConfig( + name="ci_demo", + path=ROOT / "examples" / "ci_demo", + make_target="demo", + env_by_engine={ + "duckdb": "dev_duckdb", + "postgres": "dev_postgres", + "databricks_spark": "dev_databricks", + }, + ), ExampleConfig( name="dq_demo", path=ROOT / "examples" / "dq_demo", diff --git a/tests/unit/ci/test_changed_since_unit.py b/tests/unit/ci/test_changed_since_unit.py new file mode 100644 index 0000000..10d2f2f --- /dev/null +++ b/tests/unit/ci/test_changed_since_unit.py @@ -0,0 +1,192 @@ +# tests/unit/ci/test_changed_since_unit.py +from __future__ import annotations + +from pathlib import Path + +import pytest + +from fastflowtransform.ci.changed_since import ( + compute_affected_models, + get_changed_models, +) +from fastflowtransform.core import REGISTRY, Node + + +class DummyProc: + def __init__(self, returncode: int, stdout: str = "", stderr: str = "") -> None: + self.returncode = returncode + self.stdout = stdout + self.stderr = stderr + + +def _reset_registry_nodes(): + REGISTRY.nodes.clear() + + +@pytest.fixture(autouse=True) +def _clean_registry_nodes(): + # Ensure we don't leak nodes across tests + old_nodes = dict(REGISTRY.nodes) + try: + REGISTRY.nodes.clear() + yield + finally: + REGISTRY.nodes = old_nodes + + +@pytest.mark.unit +def test_get_changed_models_filters_and_maps(monkeypatch, tmp_path): + """ + get_changed_models should: + - call 'git diff --name-only HEAD -- .' + - only consider files under 'models/' + - only consider *.ff.sql / *.ff.py + - map paths to REGISTRY.nodes keys via Path(...).stem + """ + # Prepare a fake project dir (unused except for cwd) + project_dir = tmp_path + + # Populate REGISTRY.nodes with model names that match stems + # of the changed files below. + REGISTRY.nodes = { + "customers.ff": Node( + name="customers.ff", kind="sql", path=Path("models/customers.ff.sql"), deps=[] + ), + "orders_agg.ff": Node( + name="orders_agg.ff", + kind="sql", + path=Path("models/marts/orders_agg.ff.sql"), + deps=["customers.ff"], + ), + "util.ff": Node(name="util.ff", kind="python", path=Path("models/py/util.ff.py"), deps=[]), + } + + def fake_run(cmd, cwd, capture_output=False, text=False, check=False, **kwargs): + # Sanity-check the git invocation + assert "git" in cmd[0] + assert "diff" in cmd[1] + assert "--name-only" in cmd + assert cwd == project_dir + + # Mix of relevant & irrelevant paths. + stdout_text = "\n".join( + [ + "models/customers.ff.sql", # → customers.ff + "models/marts/orders_agg.ff.sql", # → orders_agg.ff + "models/py/util.ff.py", # → util.ff + "seeds/customers.csv", # ignored (not under models/) + "models/readme.md", # ignored (wrong suffix) + ] + ) + return DummyProc(returncode=0, stdout=stdout_text, stderr="") + + monkeypatch.setattr("subprocess.run", fake_run, raising=True) + + changed = get_changed_models(project_dir, git_ref="origin/main") + + # Only matching models should be returned. + assert changed == {"customers.ff", "orders_agg.ff", "util.ff"} + + +@pytest.mark.unit +def test_get_changed_models_git_error(monkeypatch, tmp_path): + """ + If git diff fails (non-zero exit code), get_changed_models should + return an empty set and not raise. + """ + project_dir = tmp_path + + def fake_run(cmd, cwd, stdout, stderr, text, check): + return DummyProc(returncode=1, stdout="", stderr="fatal: not a git repository") + + monkeypatch.setattr("subprocess.run", fake_run, raising=True) + + REGISTRY.nodes = { + "customers.ff": Node( + name="customers.ff", kind="sql", path=Path("models/customers.ff.sql"), deps=[] + ) + } + + changed = get_changed_models(project_dir, git_ref="origin/main") + assert changed == set() + + +@pytest.mark.unit +def test_get_changed_models_no_model_files(monkeypatch, tmp_path): + """ + If git diff returns only non-model paths, get_changed_models should + return an empty set. + """ + project_dir = tmp_path + + def fake_run(cmd, cwd, stdout, stderr, text, check): + stdout_text = "\n".join( + [ + "README.md", + "seeds/customers.csv", + "scripts/some_tool.py", + ] + ) + return DummyProc(returncode=0, stdout=stdout_text, stderr="") + + monkeypatch.setattr("subprocess.run", fake_run, raising=True) + + REGISTRY.nodes = { + "customers.ff": Node( + name="customers.ff", kind="sql", path=Path("models/customers.ff.sql"), deps=[] + ) + } + + changed = get_changed_models(project_dir, git_ref="origin/main") + assert changed == set() + + +@pytest.mark.unit +def test_compute_affected_models_chain(): + """ + For a simple chain A -> B -> C (B depends on A, C depends on B), + a change in B should mark {A, B, C} as affected. + """ + nodes = { + "A.ff": Node(name="A.ff", kind="sql", path=Path("models/A.ff.sql"), deps=[]), + "B.ff": Node(name="B.ff", kind="sql", path=Path("models/B.ff.sql"), deps=["A.ff"]), + "C.ff": Node(name="C.ff", kind="sql", path=Path("models/C.ff.sql"), deps=["B.ff"]), + } + + changed = {"B.ff"} + affected = compute_affected_models(changed, nodes) + + # B is changed; A is upstream of B; C is downstream of B. + assert affected == {"A.ff", "B.ff", "C.ff"} + + +@pytest.mark.unit +def test_compute_affected_models_disconnected(): + """ + Disconnected components should not leak: changing X only affects its + own connected component. + """ + nodes = { + "A.ff": Node(name="A.ff", kind="sql", path=Path("models/A.ff.sql"), deps=[]), + "B.ff": Node(name="B.ff", kind="sql", path=Path("models/B.ff.sql"), deps=["A.ff"]), + "X.ff": Node(name="X.ff", kind="sql", path=Path("models/X.ff.sql"), deps=[]), + "Y.ff": Node(name="Y.ff", kind="sql", path=Path("models/Y.ff.sql"), deps=["X.ff"]), + } + + changed = {"X.ff"} + affected = compute_affected_models(changed, nodes) + + # Only the X↔Y component should be affected. + assert affected == {"X.ff", "Y.ff"} + + +@pytest.mark.unit +def test_compute_affected_models_empty_changed(): + """ + If 'changed' is empty, compute_affected_models should return an empty set. + """ + nodes = { + "A.ff": Node(name="A.ff", kind="sql", path=Path("models/A.ff.sql"), deps=[]), + } + affected = compute_affected_models(set(), nodes) + assert affected == set() diff --git a/tests/unit/ci/test_sarif_unit.py b/tests/unit/ci/test_sarif_unit.py new file mode 100644 index 0000000..e17a40b --- /dev/null +++ b/tests/unit/ci/test_sarif_unit.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from fastflowtransform.ci.core import CiIssue, CiSummary +from fastflowtransform.ci.sarif import write_sarif + + +def _empty_summary(issues: list[CiIssue]) -> CiSummary: + """ + Helper to construct CiSummary with the required fields used + by the current implementation (issues, selected_nodes, all_nodes). + """ + return CiSummary( + issues=issues, + selected_nodes=[], # list[str], not set() + all_nodes=[], # list[str], not set() + ) + + +@pytest.mark.unit +def test_write_sarif_empty_summary(tmp_path: Path) -> None: + """ + When the summary has no issues, SARIF should still be valid: + - version 2.1.0 + - 1 run + - correct tool name + - empty results list + """ + summary = _empty_summary([]) + out_path = tmp_path / "sarif" / "empty.sarif.json" + + write_sarif(summary, out_path, tool_name="FastFlowTransform CI", tool_version="1.2.3") + + assert out_path.exists() + data = json.loads(out_path.read_text(encoding="utf-8")) + + assert data["version"] == "2.1.0" + assert len(data["runs"]) == 1 + + run = data["runs"][0] + driver = run["tool"]["driver"] + + assert driver["name"] == "FastFlowTransform CI" + assert driver["version"] == "1.2.3" + assert run["results"] == [] + + +@pytest.mark.unit +def test_write_sarif_single_error_issue_with_location(tmp_path: Path) -> None: + """ + A single error issue with file/line/column should be mapped to: + - level: 'error' + - ruleId: issue.code + - message.text: issue.message + - locations[0].physicalLocation.artifactLocation.uri == file + - region.startLine / startColumn + """ + issue = CiIssue( + code="MISSING_DEP", + level="error", + message="Model has missing dependency", + obj_name="orders.ff", + file="models/orders.ff.sql", + line=10, + column=3, + ) + summary = _empty_summary([issue]) + out_path = tmp_path / "sarif" / "single_error.sarif.json" + + write_sarif(summary, out_path, tool_name="fft-ci", tool_version=None) + + data = json.loads(out_path.read_text(encoding="utf-8")) + run = data["runs"][0] + + assert run["tool"]["driver"]["name"] == "fft-ci" + # No version key when tool_version=None + assert "version" not in run["tool"]["driver"] + + assert len(run["results"]) == 1 + res = run["results"][0] + + assert res["ruleId"] == "MISSING_DEP" + assert res["level"] == "error" + assert res["message"]["text"] == "Model has missing dependency" + + assert len(res["locations"]) == 1 + loc = res["locations"][0] + phys = loc["physicalLocation"] + + assert phys["artifactLocation"]["uri"] == "models/orders.ff.sql" + region = phys["region"] + assert region["startLine"] == 10 + assert region["startColumn"] == 3 + + +@pytest.mark.unit +def test_write_sarif_warn_issue_without_location(tmp_path: Path) -> None: + """ + A non-error level should be normalized to SARIF 'warning', and + issues without file/line/column should emit an empty 'locations' list. + """ + issue = CiIssue( + code="STYLE", + level="warn", + message="Style nit", + obj_name="customers.ff", + file=None, + line=None, + column=None, + ) + summary = _empty_summary([issue]) + out_path = tmp_path / "sarif" / "warn_no_location.sarif.json" + + write_sarif(summary, out_path, tool_name="fft-ci") + + data = json.loads(out_path.read_text(encoding="utf-8")) + run = data["runs"][0] + res = run["results"][0] + + assert res["ruleId"] == "STYLE" + # any non-"error" level becomes "warning" for SARIF + assert res["level"] == "warning" + assert res["message"]["text"] == "Style nit" + assert res["locations"] == [] diff --git a/tests/unit/cli/run/test_run_changed_since_unit.py b/tests/unit/cli/run/test_run_changed_since_unit.py new file mode 100644 index 0000000..bc6f4ae --- /dev/null +++ b/tests/unit/cli/run/test_run_changed_since_unit.py @@ -0,0 +1,149 @@ +# tests/unit/cli/test_run_changed_since_unit.py +from __future__ import annotations + +from importlib import import_module +from pathlib import Path + +import pytest + +# Import the *module* fastflowtransform.cli.run, not the cli.run attribute +run_cmd = import_module("fastflowtransform.cli.run") + + +class DummyCtx: + def __init__(self, project: Path | str) -> None: + self.project = Path(project) + + +@pytest.mark.unit +def test_apply_changed_since_no_flag_returns_wanted(): + """ + When changed_since is None, _apply_changed_since_filter should return + 'wanted' unchanged. + """ + ctx = DummyCtx(project=".") + wanted = {"a.ff", "b.ff"} + + out = run_cmd._apply_changed_since_filter( + ctx=ctx, + wanted=wanted, + select=None, + exclude=None, + changed_since=None, + ) + + assert out == wanted + + +@pytest.mark.unit +def test_apply_changed_since_no_affected(monkeypatch, tmp_path): + """ + When git-based detection yields no affected models, the filter should + return an empty set. + """ + project_dir = tmp_path + ctx = DummyCtx(project=project_dir) + + def fake_get_changed_models(project_dir_arg, git_ref): + # We expect to be called with the project dir passed in. + assert Path(project_dir_arg) == project_dir + return set() + + def fake_compute_affected_models(changed, nodes): + # With no changed models, affected is empty. + assert changed == set() + return set() + + # Patch the functions on the actual module object + monkeypatch.setattr(run_cmd, "get_changed_models", fake_get_changed_models, raising=True) + monkeypatch.setattr( + run_cmd, "compute_affected_models", fake_compute_affected_models, raising=True + ) + + wanted = {"a.ff", "b.ff"} + + out = run_cmd._apply_changed_since_filter( + ctx=ctx, + wanted=wanted, + select=None, + exclude=None, + changed_since="origin/main", + ) + + # Nothing affected => nothing to run + assert out == set() + + +@pytest.mark.unit +def test_apply_changed_since_no_select_uses_affected(monkeypatch, tmp_path): + """ + When there are no --select/--exclude patterns, _apply_changed_since_filter + returns the full 'affected' set from compute_affected_models. + """ + project_dir = tmp_path + ctx = DummyCtx(project=project_dir) + + def fake_get_changed_models(project_dir_arg, git_ref): + assert Path(project_dir_arg) == project_dir + assert git_ref == "origin/main" + return {"m1.ff"} + + def fake_compute_affected_models(changed, nodes): + # m1.ff changed; pretend m2.ff is downstream + assert changed == {"m1.ff"} + return {"m1.ff", "m2.ff"} + + monkeypatch.setattr(run_cmd, "get_changed_models", fake_get_changed_models, raising=True) + monkeypatch.setattr( + run_cmd, "compute_affected_models", fake_compute_affected_models, raising=True + ) + + wanted = {"x.ff", "y.ff"} # initial 'wanted' is ignored when no select/exclude + + out = run_cmd._apply_changed_since_filter( + ctx=ctx, + wanted=wanted, + select=None, + exclude=None, + changed_since="origin/main", + ) + + # No select/exclude → affected defines the universe + assert out == {"m1.ff", "m2.ff"} + + +@pytest.mark.unit +def test_apply_changed_since_with_select_intersects(monkeypatch, tmp_path): + """ + When --select or --exclude are present, the filter should intersect + 'wanted' with 'affected'. + """ + project_dir = tmp_path + ctx = DummyCtx(project=project_dir) + + def fake_get_changed_models(project_dir_arg, git_ref): + assert Path(project_dir_arg) == project_dir + return {"m1.ff"} + + def fake_compute_affected_models(changed, nodes): + # Suppose m1.ff and m2.ff are affected, but only m2.ff is in 'wanted' + assert changed == {"m1.ff"} + return {"m1.ff", "m2.ff"} + + monkeypatch.setattr(run_cmd, "get_changed_models", fake_get_changed_models, raising=True) + monkeypatch.setattr( + run_cmd, "compute_affected_models", fake_compute_affected_models, raising=True + ) + + wanted = {"m2.ff", "unrelated.ff"} + + out = run_cmd._apply_changed_since_filter( + ctx=ctx, + wanted=wanted, + select=["tag:dq"], # any non-empty list triggers intersection logic + exclude=None, + changed_since="origin/main", + ) + + # Intersection of wanted and affected + assert out == {"m2.ff"} diff --git a/tests/unit/cli/test_ci_cmd_unit.py b/tests/unit/cli/test_ci_cmd_unit.py new file mode 100644 index 0000000..48eebfc --- /dev/null +++ b/tests/unit/cli/test_ci_cmd_unit.py @@ -0,0 +1,186 @@ +import pytest +import typer + +from fastflowtransform.ci.core import CiIssue, CiSummary +from fastflowtransform.cli import ci_cmd + + +class DummyProfile: + def __init__(self, engine: str = "duckdb") -> None: + self.engine = engine + + +class DummyCtx: + def __init__(self, engine: str = "duckdb") -> None: + self.profile = DummyProfile(engine=engine) + # Just something printable - run.py also uses ctx.project in logs. + self.project = "examples/dq_demo" + + +@pytest.mark.unit +def test_ci_check_no_issues_exit_zero(monkeypatch, capsys): + """ + When run_ci_check returns no issues, ci_check should: + - print a summary, + - exit with code 0 (i.e. not raise typer.Exit). + """ + + # Avoid loading a real project + def fake_prepare_context(project, env_name, engine, vars): + assert project == "." + assert env_name == "dev_duckdb" + return DummyCtx(engine="duckdb") + + monkeypatch.setattr(ci_cmd, "_prepare_context", fake_prepare_context, raising=True) + + # Make _parse_select deterministic + def fake_parse_select(tokens): + # ci_check passes select or [] into _parse_select + assert tokens == [] + return ["tag:ci"] + + monkeypatch.setattr(ci_cmd, "_parse_select", fake_parse_select, raising=True) + + # run_ci_check returns an all-green summary + def fake_run_ci_check(select=None): + # We want to see the parsed tokens arrive here + assert select == ["tag:ci"] + return CiSummary( + issues=[], + selected_nodes=["model_a", "model_b"], + all_nodes=["model_a", "model_b", "model_c"], + ) + + monkeypatch.setattr(ci_cmd, "run_ci_check", fake_run_ci_check, raising=True) + + # Should not raise + ci_cmd.ci_check( + project=".", + env_name="dev_duckdb", + engine=None, + vars=None, + select=None, + ) + + out = capsys.readouterr().out + + # Basic smoke checks on the output + assert "CI Check Summary" in out + assert "Profile: dev_duckdb | Engine: duckdb" in out + assert "Models:" in out + assert "Issues" in out + assert "None" in out # "None 🎉" is fine, we just look for "None" + assert "Selected models" in out + assert "model_a" in out + assert "model_b" in out + + +@pytest.mark.unit +def test_ci_check_warn_only_exit_zero(monkeypatch, capsys): + """ + If all issues are 'warn'-level, ci_check should still exit with code 0. + """ + + def fake_prepare_context(project, env_name, engine, vars): + return DummyCtx(engine="duckdb") + + monkeypatch.setattr(ci_cmd, "_prepare_context", fake_prepare_context, raising=True) + + def fake_parse_select(tokens): + return ["tag:warn-only"] + + monkeypatch.setattr(ci_cmd, "_parse_select", fake_parse_select, raising=True) + + warn_issue = CiIssue( + code="STYLE", + level="warn", + message="Minor style nit", + obj_name="customers", + file="models/customers.ff.sql", + line=12, + column=3, + ) + + def fake_run_ci_check(select=None): + assert select == ["tag:warn-only"] + return CiSummary( + issues=[warn_issue], + selected_nodes=["customers"], + all_nodes=["customers", "orders"], + ) + + monkeypatch.setattr(ci_cmd, "run_ci_check", fake_run_ci_check, raising=True) + + # Should NOT raise typer.Exit + ci_cmd.ci_check( + project=".", + env_name="dev_duckdb", + engine=None, + vars=None, + select=["tag:warn-only"], + ) + + out = capsys.readouterr().out + assert "CI Check Summary" in out + # We expect a [W] line for the warning + assert "[W] STYLE" in out + assert "Minor style nit" in out + + +@pytest.mark.unit +def test_ci_check_with_error_issues_exit_one(monkeypatch, capsys): + """ + If any issue has level='error', ci_check should raise typer.Exit(1). + """ + + def fake_prepare_context(project, env_name, engine, vars): + return DummyCtx(engine="duckdb") + + monkeypatch.setattr(ci_cmd, "_prepare_context", fake_prepare_context, raising=True) + + def fake_parse_select(tokens): + # We expect to receive the raw --select tokens here + assert tokens == ["tag:changed"] + return ["tag:changed"] + + monkeypatch.setattr(ci_cmd, "_parse_select", fake_parse_select, raising=True) + + err_issue = CiIssue( + code="MISSING_DEP", + level="error", + message="Missing dependencies for 'orders.ff': customers.ff", + obj_name="orders.ff", + file="models/orders.ff.sql", + line=20, + column=5, + ) + + def fake_run_ci_check(select=None): + # We should see the parsed tokens passed through to core + assert select == ["tag:changed"] + return CiSummary( + issues=[err_issue], + selected_nodes=["orders.ff"], + all_nodes=["orders.ff", "customers.ff"], + ) + + monkeypatch.setattr(ci_cmd, "run_ci_check", fake_run_ci_check, raising=True) + + with pytest.raises(typer.Exit) as excinfo: + ci_cmd.ci_check( + project=".", + env_name="dev_duckdb", + engine=None, + vars=None, + select=["tag:changed"], + ) + + # Exit code should be 1 + assert excinfo.value.exit_code == 1 + + out = capsys.readouterr().out + assert "CI Check Summary" in out + assert "MISSING_DEP" in out + assert "orders.ff" in out + assert "Missing dependencies" in out + assert "models/orders.ff.sql" in out diff --git a/uv.lock b/uv.lock index c462d99..80f8d10 100644 --- a/uv.lock +++ b/uv.lock @@ -733,7 +733,7 @@ wheels = [ [[package]] name = "fastflowtransform" -version = "0.6.6" +version = "0.6.7" source = { editable = "." } dependencies = [ { name = "duckdb" },