Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions docs/CLI_Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,46 @@ FastFlowTransform’s CLI is the entry point for seeding data, running DAGs, gen
| `fft dag <project> --html` | Render the DAG graph/site for quick inspection. |
| `fft docgen <project> [--out site/docs] [--emit-json path] [--open-source]` | Generate the full documentation bundle (graph + model pages + optional JSON). Default output is `<project>/site/docs`. |
| `fft test <project> [--env dev]` | Run schema/data-quality tests defined in `project.yml` or schema YAML files. |
| `fft source-freshness <project> [--env dev] [--select …]` | Evaluate freshness rules for sources and emit a summary (e.g. in the DQ demo). |
| `fft utest <project>` | Execute unit tests defined under `tests/unit/*.yml`. |
| `fft sync-db-comments <project>` | Push model/column descriptions into Postgres or Snowflake comments. |

Use `--select` to scope `run`, `dag`, or `test` commands (e.g. `state:modified`, `tag:finance`, `result:error`). Environment overrides rely on the selected profile in `profiles.yml` or the `FF_*` variables.
Use `--select` to scope `run`, `dag`, `test`, or `source-freshness` commands (e.g. `state:modified`, `tag:finance`, `result:error`). Environment overrides rely on the selected profile in `profiles.yml` or the `FF_*` variables.

## Source Freshness Checks

`fft source-freshness <project> --env <env>` evaluates **freshness rules for sources** (typically configured alongside your `sources.yml` / project metadata).

Typical usage:

```bash
fft source-freshness examples/dq_demo --env dev_duckdb
fft source-freshness . --env dev --select tag:critical_source
````

Key points:

* Uses the active profile (`--env`) to connect to the warehouse.
* Honors `--select`/`--exclude` so you can restrict checks to specific tags or source groups.
* Prints a per-source summary (status, max loaded timestamp, configured thresholds) plus an overall status code suitable for CI.
* Integrates naturally with the DQ demo: the same environment and seeds are reused, but checks focus on **source recency** rather than row-level tests in marts.

## HTTP/API Helpers

Python models can make HTTP calls via `fastflowtransform.api.http`. When you need examples, head over to `docs/Api_Models.md` for `get_json`, `get_df`, pagination helpers, caching, and offline modes.

## DAG & Documentation

- Narrow the graph with `fft dag ... --select <pattern>` (for example `state:modified` or `tag:finance`). Combined with `--html` this produces a focused mini-site under `<project>/docs/index.html`.
- Control schema introspection via `--with-schema/--no-schema`. Use `--no-schema` when the executor should avoid fetching column metadata (for example, BigQuery without sufficient permissions).
- `fft docgen` renders the DAG, model pages, and an optional JSON manifest in one command. Append `--open-source` to open `index.html` in your default browser after rendering.
* Narrow the graph with `fft dag ... --select <pattern>` (for example `state:modified` or `tag:finance`). Combined with `--html` this produces a focused mini-site under `<project>/docs/index.html`.
* Control schema introspection via `--with-schema/--no-schema`. Use `--no-schema` when the executor should avoid fetching column metadata (for example, BigQuery without sufficient permissions).
* `fft docgen` renders the DAG, model pages, and an optional JSON manifest in one command. Append `--open-source` to open `index.html` in your default browser after rendering.

## Sync Database Comments

`fft sync-db-comments <project> --env <env>` pushes model and column descriptions from project YAML or Markdown into database comments. The command currently supports Postgres and Snowflake Snowpark:

- Start with `--dry-run` to review the generated `COMMENT` statements.
- Postgres honors `profiles.yml -> postgres.db_schema` (and any `FF_PG_SCHEMA` override).
- Snowflake reuses the session or connection exposed by the executor.
* Start with `--dry-run` to review the generated `COMMENT` statements.
* Postgres honors `profiles.yml -> postgres.db_schema` (and any `FF_PG_SCHEMA` override).
* Snowflake reuses the session or connection exposed by the executor.

If no descriptions are found, the command exits without making changes.
162 changes: 162 additions & 0 deletions docs/Source_Freshness.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# Source Freshness

Source freshness checks answer a simple question:

> “How old is the latest data in this source, and is that acceptable?”

They complement table-level DQ tests by validating **recency of inputs** (seeds, raw tables,
landing zones) *before* you build marts.

- Configuration lives alongside your `sources.yml` metadata.
- Evaluation is done via the `fft source-freshness` CLI command.
- Output is CI-friendly (non-zero exit when critical freshness rules fail).

---

## When to use source freshness

Use source freshness when:

- you rely on upstream ingestion jobs (ETL, CDC, streaming) and need a guard-rail like
“`crm.orders` must be < 60 minutes old”;
- you have critical feeds (payments, auth logs, PII) where stale data is dangerous;
- you want a cheap pre-flight check in CI before running a heavier `fft run` + `fft test`.

It is *not* a replacement for table-level `freshness` tests on marts – they work nicely together.

---

## Configuration

Freshness rules are attached to source tables in your metadata (conceptually alongside `sources.yml`).

A minimal example:

```yaml
version: 2
sources:
- name: crm
schema: raw
tables:
- name: orders
identifier: seed_orders
freshness:
loaded_at_field: "_ff_loaded_at"
max_delay_minutes: 1440 # 1 day
warn_after_minutes: 720 # optional: warning threshold
error_after_minutes: 1440 # optional: hard error threshold
tags: ["example:dq_demo", "critical_source"]
````

Key fields:

* `loaded_at_field`: timestamp column used to compute the **max** loaded time. When seeds are
materialized via `fft seed`, every table automatically includes `_ff_loaded_at` (UTC timestamp
captured during the seed run). Pointing freshness rules at this metadata column keeps demo seeds
“fresh” even if the CSV contains static business timestamps.
* `max_delay_minutes` / `warn_after_minutes` / `error_after_minutes`:

* if only `max_delay_minutes` is set, it is treated as an error threshold;
* `warn_after_minutes` and `error_after_minutes` allow a 3-state result:

* ✅ **on-time** (age ≤ `warn_after_minutes`)
* ❕ **late (warning)** (`warn_after_minutes` < age ≤ `error_after_minutes`)
* ❌ **stale (error)** (age > `error_after_minutes`)

The exact field names should mirror whatever you wired into `run_source_freshness`; adjust the snippet if your structure differs.

---

## Running checks

Basic usage:

```bash
fft source freshness <project> --env <env>
```

Examples:

```bash
# Check all sources in the DQ demo (DuckDB)
fft source freshness examples/dq_demo --env dev_duckdb

# Only check sources tagged "critical_source"
fft source freshness . \
--env dev \
--select tag:critical_source

# Combine with other selectors (depends on your implementation)
fft source freshness . \
--env dev \
--select source:crm --exclude tag:experimental
```

The command:

* connects using the selected profile (`--env`);
* loads source + freshness metadata;
* executes a `max(loaded_at_column)` query per configured source;
* compares the result to your thresholds and produces:

* per-source rows (age, thresholds, status),
* an overall exit status (`0` if all within thresholds, non-zero on error).

---

## CI / automation

Typical pattern in CI:

```bash
# 1) Check source recency
fft source freshness . --env ci

# 2) Only if sources are fresh, run the pipeline and DQ tests
fft run . --env ci
fft test . --env ci --select tag:ci
```

Because `fft source freshness` exits non-zero on stale inputs, you can simply let the
CI job fail early rather than running a full DAG on obviously outdated data.

---

## Troubleshooting

**“No freshness rules found”**

* You called `fft source-freshness` but nothing was evaluated.
* Check that:

* at least one source table has a `freshness:` block;
* your `--select` / `--exclude` patterns aren’t filtering everything out.

**“Column not found”**

* The `loaded_at_column` doesn’t exist in the physical source.
* Verify the column name and that your `identifier` / schema overrides for that source are correct.

**Unexpectedly large ages**

* Make sure your warehouse and timestamps are in the expected timezone.
* Confirm that the ingestion job actually updates `loaded_at_column` (and not some other field).

---

## Relationship to table-level freshness tests

Table-level `freshness` tests in `project.yml`:

* operate on **models** (e.g. `mart_orders_agg.last_order_ts`);
* run via `fft test`.

Source freshness:

* operates on **sources** (e.g. `crm.orders.order_ts`);
* runs via `fft source freshness`.

Using both lets you catch:

1. Stale upstream ingestion (source is old),
2. And downstream pipeline lag or bugs (mart not refreshed even though source is fresh).
118 changes: 99 additions & 19 deletions docs/Sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

Place `sources.yml` at your project root (same level as `models/`). Example:

```
```text
project/
├── models/
├── sources.yml
└── seeds/
```
````

## YAML Schema (Version 2)

Expand Down Expand Up @@ -39,18 +39,18 @@ sources:

### Fields

| Level | Field | Description |
|----------|-------------|-------------|
| source | `name` | Logical group identifier referenced by `source('name', ...)`. |
| | `schema` | Default target schema/database for the group. |
| | `database`/`catalog` | Optional qualifiers per engine (BigQuery, Snowflake). |
| | `overrides` | Map of engine → config snippet (schema overrides, formats, locations). |
| table | `name` | Logical table name (second argument in `source()`). |
| | `identifier`| Physical name; defaults to `name` if omitted. |
| | `location` | File/path location (used with `format`). |
| | `format` | Ingestion format for engines supporting path-based sources (`delta`, `parquet`, …). |
| | `options` | Dict of format options (Spark/Databricks). |
| | `overrides` | Additional engine-specific settings merged with source-level overrides. |
| Level | Field | Description |
| ------ | -------------------- | ----------------------------------------------------------------------------------- |
| source | `name` | Logical group identifier referenced by `source('name', ...)`. |
| | `schema` | Default target schema/database for the group. |
| | `database`/`catalog` | Optional qualifiers per engine (BigQuery, Snowflake). |
| | `overrides` | Map of engine → config snippet (schema overrides, formats, locations). |
| table | `name` | Logical table name (second argument in `source()`). |
| | `identifier` | Physical name; defaults to `name` if omitted. |
| | `location` | File/path location (used with `format`). |
| | `format` | Ingestion format for engines supporting path-based sources (`delta`, `parquet`, …). |
| | `options` | Dict of format options (Spark/Databricks). |
| | `overrides` | Additional engine-specific settings merged with source-level overrides. |

Engine-specific overrides follow this merge order:

Expand All @@ -60,8 +60,8 @@ Engine-specific overrides follow this merge order:

### Engine Behavior

- **DuckDB / Postgres / BigQuery / Snowflake**: expect `identifier` (plus `schema`/`database` where relevant). Path-based sources raise errors.
- **Databricks Spark**: supports `format` + `location`. The executor registers a temp view with optional `options` (e.g. `compression`).
* **DuckDB / Postgres / BigQuery / Snowflake**: expect `identifier` (plus `schema`/`database` where relevant). Path-based sources raise errors.
* **Databricks Spark**: supports `format` + `location`. The executor registers a temp view with optional `options` (e.g. `compression`).

### Path-Based Sources Example

Expand All @@ -77,6 +77,70 @@ Engine-specific overrides follow this merge order:
multiline: true
```

### Example: Typical Project Sources

A typical analytics project mixes **seeded reference data**, **database tables**, and **lakehouse paths**. A single `sources.yml` might look like this:

```yaml
version: 2
sources:
# Seeded reference data (CSV → tables)
- name: ref
schema: ref
tables:
- name: countries
identifier: seed_countries
- name: currencies
identifier: seed_currencies

# Core application database (OLTP / CDC)
- name: crm
schema: crm
overrides:
postgres:
schema: public
bigquery:
dataset: crm_raw
tables:
- name: customers
identifier: customers
- name: orders
identifier: orders

# Lakehouse-style raw events (Spark-only)
- name: events
tables:
- name: clickstream
overrides:
databricks_spark:
format: parquet
location: "abfss://raw@storage.dfs.core.windows.net/clickstream/*.parquet"
- name: pageviews
overrides:
databricks_spark:
format: delta
location: "abfss://delta@storage.dfs.core.windows.net/pageviews"
```

Models then reference sources in a uniform way:

```sql
-- Seeded lookup
select * from {{ source('ref', 'countries') }};

-- OLTP / warehouse tables
select * from {{ source('crm', 'customers') }};

-- Lakehouse paths (on Spark)
select * from {{ source('events', 'clickstream') }};
```

The executor resolves each reference to the correct physical object for the active engine:

* Postgres: `"public"."customers"`
* BigQuery: `crm_raw.customers`
* Databricks: `delta.` or `parquet.` tables / paths behind the scenes.

## Referencing Sources in Models

```sql
Expand All @@ -99,10 +163,26 @@ targets:
postgres: staging
```

### Seed metadata columns

The `fft seed` command automatically appends a small set of metadata columns to every materialized
seed table:

| Column | Description |
|-------------------|----------------------------------------------------------------|
| `_ff_loaded_at` | UTC timestamp captured when the seed was written. |
| `_ff_seed_id` | Stable identifier derived from the path inside `seeds/`. |
| `_ff_seed_file` | Absolute path of the source file (CSV/Parquet) used to load it.|

These columns live alongside your business fields, so downstream models (and freshness checks)
can reference them directly. For example, point a source freshness rule to `_ff_loaded_at` to
assert “seed data was loaded within the last N minutes” irrespective of the timestamps stored in
the raw file.

## Validation & Errors

- Missing `identifier` *and* `location` produce `KeyError` during rendering.
- Unknown source/table names raise `KeyError` with suggestions.
- Unsupported path-based sources on an engine (`location` provided but no `format`) raise descriptive `NotImplementedError`.
* Missing `identifier` *and* `location` produce `KeyError` during rendering.
* Unknown source/table names raise `KeyError` with suggestions.
* Unsupported path-based sources on an engine (`location` provided but no `format`) raise descriptive `NotImplementedError`.

Keep `sources.yml` declarative, use engine overrides for schema differences, and lean on `.env` files where credentials or URIs vary per environment.
Loading