From de2f83beca69088c07bbbbecaafd6d7cab765b11 Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Mon, 17 Nov 2025 17:35:59 +0100 Subject: [PATCH 01/32] Added Biqquery support + added bigquery to examples + fixed issues + refactor + made imports optional for different executors --- .github/workflows/ci.yml | 66 +- .gitignore | 1 + .pre-commit-config.yaml | 1 - Contributing.md | 12 + Makefile | 1 - Makefile.pipeline | 2 +- README.md | 243 ++----- _scripts/concat_docs.py | 16 +- docs/Api_Models.md | 2 +- docs/CLI_Guide.md | 2 +- docs/Cache_and_Parallelism.md | 2 +- docs/Config_and_Macros.md | 10 +- docs/Quickstart.md | 39 +- docs/Technical_Overview.md | 117 +--- docs/examples/API_Demo.md | 38 +- docs/examples/Basic_Demo.md | 73 ++- docs/examples/DQ_Demo.md | 20 + docs/examples/Incremental_Demo.md | 34 +- docs/examples/Local_Engine_Setup.md | 182 ++++++ docs/examples/Macros_Demo.md | 43 +- examples/_scripts/cleanup_env.py | 79 ++- examples/api_demo/.env.dev_bigquery_bigframes | 7 + examples/api_demo/.env.dev_bigquery_pandas | 7 + examples/api_demo/Makefile | 20 +- .../models/common/mart_users_join.ff.sql | 3 +- examples/api_demo/models/common/users.ff.sql | 3 +- .../bigquery/bigframes/api_users_http.ff.py | 29 + .../bigframes/api_users_requests.ff.py | 30 + .../bigquery/pandas/api_users_http.ff.py | 26 + .../bigquery/pandas/api_users_requests.ff.py | 28 + examples/api_demo/profiles.yml | 18 + .../basic_demo/.env.dev_bigquery_bigframes | 7 + examples/basic_demo/.env.dev_bigquery_pandas | 7 + examples/basic_demo/Makefile | 23 +- examples/basic_demo/README.md | 54 +- .../bigframes/mart_latest_signup.ff.py | 39 ++ .../bigquery/pandas/mart_latest_signup.ff.py | 36 ++ .../models/marts/mart_users_by_domain.ff.sql | 3 +- .../models/staging/users_clean.ff.sql | 3 +- examples/basic_demo/profiles.yml | 18 + .../cache_demo/.env.dev_bigquery_bigframes | 14 + examples/cache_demo/.env.dev_bigquery_pandas | 14 + examples/cache_demo/.env.dev_databricks | 7 + examples/cache_demo/.env.dev_postgres | 3 + examples/cache_demo/Makefile | 54 +- examples/cache_demo/README.md | 30 +- .../bigquery/bigframes/http_users.ff.py | 29 + .../bigquery/bigframes/py_constants.ff.py | 23 + .../engines/bigquery/pandas/http_users.ff.py | 25 + .../bigquery/pandas/py_constants.ff.py | 20 + .../engines/databricks_spark/http_users.ff.py | 24 + .../databricks_spark/py_constants.ff.py | 18 + .../{http => engines/duckdb}/http_users.ff.py | 5 +- .../models/engines/duckdb/py_constants.ff.py | 17 + .../models/engines/postgres/http_users.ff.py | 22 + .../engines/postgres/py_constants.ff.py | 17 + .../models/marts/mart_user_orders.ff.sql | 2 +- .../models/python/py_constants.ff.py | 16 - .../models/seeds_consumers/stg_orders.ff.sql | 4 +- .../models/seeds_consumers/stg_users.ff.sql | 2 +- examples/cache_demo/profiles.yml | 35 ++ examples/cache_demo/project.yml | 21 +- examples/cache_demo/seeds/seed_users.csv | 58 ++ examples/cache_demo/site/dag/http_users.html | 87 ++- .../site/dag/mart_user_orders.ff.html | 97 +++ .../cache_demo/site/dag/py_constants.html | 67 +- .../cache_demo/site/dag/stg_orders.ff.html | 103 +++ .../cache_demo/site/dag/stg_users.ff.html | 77 +++ examples/cache_demo/sources.yml | 1 + examples/dq_demo/.env.dev_bigquery_bigframes | 7 + examples/dq_demo/.env.dev_bigquery_pandas | 7 + examples/dq_demo/Makefile | 19 +- examples/dq_demo/README.md | 30 +- .../bigquery/bigframes/mart_orders_agg.ff.py | 53 ++ .../bigquery/pandas/mart_orders_agg.ff.py | 57 ++ .../models/marts/mart_orders_agg.ff.sql | 3 +- .../dq_demo/models/staging/customers.ff.sql | 3 +- examples/dq_demo/models/staging/orders.ff.sql | 3 +- examples/dq_demo/profiles.yml | 18 + .../.fastflowtransform/cache/dev-duckdb.json | 10 - .../.fastflowtransform/target/catalog.json | 70 --- .../.fastflowtransform/target/manifest.json | 45 -- .../target/run_results.json | 34 - examples/events_users_duckdb/Makefile | 86 --- examples/events_users_duckdb/README.md | 50 -- .../models/fct_events_inc.ff.sql | 24 - .../events_users_duckdb/models/users.ff.sql | 3 - .../models/users_enriched.ff.py | 13 - .../models/users_enriched.yml | 15 - examples/events_users_duckdb/profiles.yml | 10 - examples/events_users_duckdb/project.yml | 8 - .../seeds/seed_events_initial.csv | 3 - .../events_users_duckdb/seeds/seed_users.csv | 4 - .../site/dag/fct_events_inc.ff.html | 262 -------- .../events_users_duckdb/site/dag/index.html | 247 -------- .../site/dag/users.ff.html | 223 ------- .../site/dag/users_enriched.html | 232 ------- examples/events_users_duckdb/sources.yml | 9 - .../.env.dev_bigquery_bigframes | 7 + .../incremental_demo/.env.dev_bigquery_pandas | 7 + examples/incremental_demo/Makefile | 16 + examples/incremental_demo/README.md | 15 +- .../models/common/events_base.ff.sql | 3 +- .../common/fct_events_sql_inline.ff.sql | 3 +- .../models/common/fct_events_sql_yaml.ff.sql | 3 +- .../bigframes/fct_events_py_incremental.ff.py | 33 + .../pandas/fct_events_py_incremental.ff.py | 31 + .../fct_events_py_incremental.ff.py | 6 +- .../duckdb/fct_events_py_incremental.ff.py | 9 - examples/incremental_demo/profiles.yml | 18 + .../macros_demo/.env.dev_bigquery_bigframes | 7 + examples/macros_demo/.env.dev_bigquery_pandas | 7 + examples/macros_demo/Makefile | 18 +- examples/macros_demo/README.md | 6 + .../models/common/dim_users.ff.sql | 24 +- .../models/common/fct_user_sales.ff.sql | 2 +- .../models/common/stg_orders.ff.sql | 2 +- .../models/common/stg_users.ff.sql | 2 +- .../bigquery/bigframes/py_exmaple.ff.py | 20 + .../engines/bigquery/pandas/py_exmaple.ff.py | 17 + .../engines/databricks_spark/py_exmaple.ff.py | 14 + .../models/engines/postgres/py_exmaple.ff.py | 14 + examples/macros_demo/models/macros/utils.sql | 11 +- examples/macros_demo/profiles.yml | 18 + .../macros_demo/site/dag/dim_users.ff.html | 125 ++++ .../site/dag/fct_user_sales.ff.html | 145 +++++ examples/macros_demo/site/dag/py_example.html | 47 +- .../macros_demo/site/dag/stg_orders.ff.html | 129 ++++ .../macros_demo/site/dag/stg_users.ff.html | 143 +++++ .../.fastflowtransform/cache/dev-duckdb.json | 15 - .../.fastflowtransform/target/catalog.json | 179 ------ .../.fastflowtransform/target/manifest.json | 126 ---- .../target/run_results.json | 82 --- examples/simple_duckdb/Makefile | 28 - examples/simple_duckdb/__init__.py | 0 .../simple_duckdb/models/ephemeral_ids.ff.sql | 7 - examples/simple_duckdb/models/macros/util.sql | 14 - .../models/macros_py/sql_helpers.py | 8 - .../models/mart_orders_enriched.ff.py | 23 - .../simple_duckdb/models/mart_users.ff.sql | 8 - examples/simple_duckdb/models/orders.ff.sql | 4 - examples/simple_duckdb/models/users.ff.sql | 8 - .../simple_duckdb/models/users_enriched.ff.py | 10 - examples/simple_duckdb/models/v_users.ff.sql | 7 - .../models/v_users_enriched.ff.sql | 3 - examples/simple_duckdb/profiles.yml | 20 - examples/simple_duckdb/project.yml | 81 --- examples/simple_duckdb/seeds/seed_orders.csv | 4 - examples/simple_duckdb/seeds/seed_users.csv | 4 - examples/simple_duckdb/sources.yml | 11 - .../tests/fixtures/orders_small.csv | 4 - .../tests/fixtures/users_enriched_small.csv | 3 - .../tests/unit/mart_orders_enriched.yml | 32 - .../tests/unit/mart_orders_enriched_csv.yml | 18 - .../simple_duckdb/tests/unit/marts_daily.yml | 29 - .../tests/unit/users_enriched.yml | 32 - exports/Combined.md | 592 ++++++++++++------ pyproject.toml | 37 +- src/fastflowtransform/api/http.py | 13 +- src/fastflowtransform/cli/bootstrap.py | 50 +- src/fastflowtransform/cli/options.py | 2 +- src/fastflowtransform/cli/run.py | 7 +- src/fastflowtransform/config/project.py | 1 + src/fastflowtransform/core.py | 2 - src/fastflowtransform/dag.py | 1 - src/fastflowtransform/decorators.py | 62 +- src/fastflowtransform/executors/__init__.py | 69 +- src/fastflowtransform/executors/_shims.py | 38 +- src/fastflowtransform/executors/base.py | 16 +- .../executors/bigquery/__init__.py | 48 ++ .../{ => bigquery}/_bigquery_mixin.py | 11 +- .../{bigquery_exec.py => bigquery/base.py} | 175 +++--- .../executors/bigquery/bigframes.py | 161 +++++ .../executors/bigquery/pandas.py | 81 +++ .../executors/bigquery_bf_exec.py | 302 --------- ...icks_spark_exec.py => databricks_spark.py} | 2 +- .../executors/{duckdb_exec.py => duckdb.py} | 4 +- .../{postgres_exec.py => postgres.py} | 2 +- ...snowpark_exec.py => snowflake_snowpark.py} | 2 +- src/fastflowtransform/incremental.py | 16 +- src/fastflowtransform/logging.py | 1 - src/fastflowtransform/run_executor.py | 2 +- src/fastflowtransform/schema_loader.py | 8 +- src/fastflowtransform/seeding.py | 100 ++- src/fastflowtransform/settings.py | 10 +- src/fastflowtransform/testing/__init__.py | 9 + src/fastflowtransform/testing/base.py | 20 +- src/fastflowtransform/testing/registry.py | 46 ++ src/fastflowtransform/utest.py | 6 - tests/common/fixtures.py | 41 +- tests/common/mock/bigquery.py | 61 +- tests/common/mock/profiles.py | 3 +- .../test_catalog_duckdb_integration.py | 2 +- .../test_test_cmd_schema_merge_integration.py | 2 +- .../test_buildins_var_this_integration.py | 2 +- ...t_python_model_dependencies_integration.py | 2 +- tests/integration/examples/config.py | 30 +- .../test_ephemeral_inlining_integration.py | 2 +- .../duckdb/test_executor_meta_hook_duckdb.py | 2 +- ...st_executor_meta_hook_smoke_integration.py | 2 +- .../test_materializations_integration.py | 2 +- ...t_python_dependency_loading_integration.py | 4 +- ...hon_model_materialized_view_integration.py | 2 +- .../test_databricks_spark_exec_integration.py | 4 +- .../meta/test_meta_duckdb_integration.py | 2 +- .../test_schema_yaml_basic_integration.py | 2 +- ...st_schema_yaml_registry_mix_integration.py | 2 +- .../integration/test_artifacts_integration.py | 2 +- .../registry/test_dispatch_integration.py | 2 +- tests/unit/cli/test_bootstrap_unit.py | 20 +- tests/unit/cli/test_sync_db_comments_unit.py | 21 +- tests/unit/docs/test_docs_unit.py | 1 - .../executors/test_bigquery_bf_exec_unit.py | 82 ++- .../unit/executors/test_bigquery_exec_unit.py | 54 +- .../test_databricks_spark_exec_unit.py | 17 +- tests/unit/executors/test_duckdb_exec_unit.py | 4 +- .../unit/executors/test_postgres_exec_unit.py | 6 +- tests/unit/executors/test_shims_unit.py | 25 +- .../executors/test_snowflake_snowpark_exec.py | 10 +- tests/unit/render/test_this_proxy_unit.py | 2 +- tests/unit/render/test_this_relation_unit.py | 6 +- tests/unit/test_utest_unit.py | 54 +- .../unit/testing/test_accepted_values_unit.py | 2 +- uv.lock | 64 +- 224 files changed, 4365 insertions(+), 3516 deletions(-) create mode 100644 examples/api_demo/.env.dev_bigquery_bigframes create mode 100644 examples/api_demo/.env.dev_bigquery_pandas create mode 100644 examples/api_demo/models/engines/bigquery/bigframes/api_users_http.ff.py create mode 100644 examples/api_demo/models/engines/bigquery/bigframes/api_users_requests.ff.py create mode 100644 examples/api_demo/models/engines/bigquery/pandas/api_users_http.ff.py create mode 100644 examples/api_demo/models/engines/bigquery/pandas/api_users_requests.ff.py create mode 100644 examples/basic_demo/.env.dev_bigquery_bigframes create mode 100644 examples/basic_demo/.env.dev_bigquery_pandas create mode 100644 examples/basic_demo/models/engines/bigquery/bigframes/mart_latest_signup.ff.py create mode 100644 examples/basic_demo/models/engines/bigquery/pandas/mart_latest_signup.ff.py create mode 100644 examples/cache_demo/.env.dev_bigquery_bigframes create mode 100644 examples/cache_demo/.env.dev_bigquery_pandas create mode 100644 examples/cache_demo/.env.dev_databricks create mode 100644 examples/cache_demo/.env.dev_postgres create mode 100644 examples/cache_demo/models/engines/bigquery/bigframes/http_users.ff.py create mode 100644 examples/cache_demo/models/engines/bigquery/bigframes/py_constants.ff.py create mode 100644 examples/cache_demo/models/engines/bigquery/pandas/http_users.ff.py create mode 100644 examples/cache_demo/models/engines/bigquery/pandas/py_constants.ff.py create mode 100644 examples/cache_demo/models/engines/databricks_spark/http_users.ff.py create mode 100644 examples/cache_demo/models/engines/databricks_spark/py_constants.ff.py rename examples/cache_demo/models/{http => engines/duckdb}/http_users.ff.py (88%) create mode 100644 examples/cache_demo/models/engines/duckdb/py_constants.ff.py create mode 100644 examples/cache_demo/models/engines/postgres/http_users.ff.py create mode 100644 examples/cache_demo/models/engines/postgres/py_constants.ff.py delete mode 100644 examples/cache_demo/models/python/py_constants.ff.py create mode 100644 examples/dq_demo/.env.dev_bigquery_bigframes create mode 100644 examples/dq_demo/.env.dev_bigquery_pandas create mode 100644 examples/dq_demo/models/engines/bigquery/bigframes/mart_orders_agg.ff.py create mode 100644 examples/dq_demo/models/engines/bigquery/pandas/mart_orders_agg.ff.py delete mode 100644 examples/events_users_duckdb/.fastflowtransform/cache/dev-duckdb.json delete mode 100644 examples/events_users_duckdb/.fastflowtransform/target/catalog.json delete mode 100644 examples/events_users_duckdb/.fastflowtransform/target/manifest.json delete mode 100644 examples/events_users_duckdb/.fastflowtransform/target/run_results.json delete mode 100644 examples/events_users_duckdb/Makefile delete mode 100644 examples/events_users_duckdb/README.md delete mode 100644 examples/events_users_duckdb/models/fct_events_inc.ff.sql delete mode 100644 examples/events_users_duckdb/models/users.ff.sql delete mode 100644 examples/events_users_duckdb/models/users_enriched.ff.py delete mode 100644 examples/events_users_duckdb/models/users_enriched.yml delete mode 100644 examples/events_users_duckdb/profiles.yml delete mode 100644 examples/events_users_duckdb/project.yml delete mode 100644 examples/events_users_duckdb/seeds/seed_events_initial.csv delete mode 100644 examples/events_users_duckdb/seeds/seed_users.csv delete mode 100644 examples/events_users_duckdb/site/dag/fct_events_inc.ff.html delete mode 100644 examples/events_users_duckdb/site/dag/index.html delete mode 100644 examples/events_users_duckdb/site/dag/users.ff.html delete mode 100644 examples/events_users_duckdb/site/dag/users_enriched.html delete mode 100644 examples/events_users_duckdb/sources.yml create mode 100644 examples/incremental_demo/.env.dev_bigquery_bigframes create mode 100644 examples/incremental_demo/.env.dev_bigquery_pandas create mode 100644 examples/incremental_demo/models/engines/bigquery/bigframes/fct_events_py_incremental.ff.py create mode 100644 examples/incremental_demo/models/engines/bigquery/pandas/fct_events_py_incremental.ff.py create mode 100644 examples/macros_demo/.env.dev_bigquery_bigframes create mode 100644 examples/macros_demo/.env.dev_bigquery_pandas create mode 100644 examples/macros_demo/models/engines/bigquery/bigframes/py_exmaple.ff.py create mode 100644 examples/macros_demo/models/engines/bigquery/pandas/py_exmaple.ff.py create mode 100644 examples/macros_demo/models/engines/databricks_spark/py_exmaple.ff.py create mode 100644 examples/macros_demo/models/engines/postgres/py_exmaple.ff.py delete mode 100644 examples/simple_duckdb/.fastflowtransform/cache/dev-duckdb.json delete mode 100644 examples/simple_duckdb/.fastflowtransform/target/catalog.json delete mode 100644 examples/simple_duckdb/.fastflowtransform/target/manifest.json delete mode 100644 examples/simple_duckdb/.fastflowtransform/target/run_results.json delete mode 100644 examples/simple_duckdb/Makefile delete mode 100644 examples/simple_duckdb/__init__.py delete mode 100644 examples/simple_duckdb/models/ephemeral_ids.ff.sql delete mode 100644 examples/simple_duckdb/models/macros/util.sql delete mode 100644 examples/simple_duckdb/models/macros_py/sql_helpers.py delete mode 100644 examples/simple_duckdb/models/mart_orders_enriched.ff.py delete mode 100644 examples/simple_duckdb/models/mart_users.ff.sql delete mode 100644 examples/simple_duckdb/models/orders.ff.sql delete mode 100644 examples/simple_duckdb/models/users.ff.sql delete mode 100644 examples/simple_duckdb/models/users_enriched.ff.py delete mode 100644 examples/simple_duckdb/models/v_users.ff.sql delete mode 100644 examples/simple_duckdb/models/v_users_enriched.ff.sql delete mode 100644 examples/simple_duckdb/profiles.yml delete mode 100644 examples/simple_duckdb/project.yml delete mode 100644 examples/simple_duckdb/seeds/seed_orders.csv delete mode 100644 examples/simple_duckdb/seeds/seed_users.csv delete mode 100644 examples/simple_duckdb/sources.yml delete mode 100644 examples/simple_duckdb/tests/fixtures/orders_small.csv delete mode 100644 examples/simple_duckdb/tests/fixtures/users_enriched_small.csv delete mode 100644 examples/simple_duckdb/tests/unit/mart_orders_enriched.yml delete mode 100644 examples/simple_duckdb/tests/unit/mart_orders_enriched_csv.yml delete mode 100644 examples/simple_duckdb/tests/unit/marts_daily.yml delete mode 100644 examples/simple_duckdb/tests/unit/users_enriched.yml create mode 100644 src/fastflowtransform/executors/bigquery/__init__.py rename src/fastflowtransform/executors/{ => bigquery}/_bigquery_mixin.py (71%) rename src/fastflowtransform/executors/{bigquery_exec.py => bigquery/base.py} (62%) create mode 100644 src/fastflowtransform/executors/bigquery/bigframes.py create mode 100644 src/fastflowtransform/executors/bigquery/pandas.py delete mode 100644 src/fastflowtransform/executors/bigquery_bf_exec.py rename src/fastflowtransform/executors/{databricks_spark_exec.py => databricks_spark.py} (99%) rename src/fastflowtransform/executors/{duckdb_exec.py => duckdb.py} (98%) rename src/fastflowtransform/executors/{postgres_exec.py => postgres.py} (99%) rename src/fastflowtransform/executors/{snowflake_snowpark_exec.py => snowflake_snowpark.py} (99%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7aed4e5..baa3987 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -66,7 +66,54 @@ jobs: - name: Unit tests (fast) env: PYTHONWARNINGS: default - run: uv run pytest -q tests -m unit --maxfail=1 + run: uv run pytest -q tests -m "unit and not (postgres or databricks_spark or bigquery or snowflake)" --maxfail=1 + + # ---------- Engine-specific unit slices (require optional extras) ---------- + unit-matrix: + runs-on: ubuntu-latest + needs: checks + strategy: + fail-fast: false + matrix: + include: + - name: postgres + extra: postgres + marker: "unit and postgres" + - name: databricks_spark + extra: spark + marker: "unit and databricks_spark" + java: true + - name: bigquery + extra: bigquery_bf + marker: "unit and bigquery" + - name: snowflake + extra: snowflake + marker: "unit and snowflake" + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup uv (and Python) + uses: astral-sh/setup-uv@v5 + with: + python-version: "3.12" + enable-cache: true + + - name: Sync deps (dev + extra) + run: uv sync --extra dev --extra ${{ matrix.extra }} --frozen + + - name: Setup Java for Spark + if: matrix.java == true + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: '17' + + - name: Run unit tests for engine + env: + PYTHONWARNINGS: default + run: uv run pytest -q tests -m "${{ matrix.marker }}" --maxfail=1 # ---------- Examples: Integration Tests ---------- examples-matrix: @@ -75,7 +122,13 @@ jobs: strategy: fail-fast: false matrix: - engine: [duckdb, postgres, databricks_spark] + include: + - engine: duckdb + extra: "" + - engine: postgres + extra: "postgres" + - engine: databricks_spark + extra: "spark" services: postgres: @@ -102,8 +155,13 @@ jobs: python-version: "3.12" enable-cache: true - - name: Sync deps (dev) - run: uv sync --extra dev --frozen + - name: Sync deps (dev + extra) + run: | + extras="--extra dev" + if [ -n "${{ matrix.extra }}" ]; then + extras="$extras --extra ${{ matrix.extra }}" + fi + uv sync $extras --frozen - name: Setup Java for Spark if: matrix.engine == 'databricks_spark' diff --git a/.gitignore b/.gitignore index 35acb1e..1d2f868 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Envs & Secrets .env.local .env.*.local +secrets/ # Local DBs / Artifacts *.duckdb diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5658f47..e188b15 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,7 +23,6 @@ repos: pass_filenames: false types_or: [python] - # Optional: dieselben Checks auch beim Push fΓΌrs β€žCI-GefΓΌhlβ€œ - id: prepush-ruff name: ruff (pre-push) entry: uv run ruff check src tests diff --git a/Contributing.md b/Contributing.md index ea84ae2..d892432 100644 --- a/Contributing.md +++ b/Contributing.md @@ -61,6 +61,18 @@ pytest -q make demo ``` +For engines behind optional extras, run targeted installs/tests in a matrix (local or CI) to catch import/runtime gaps without pulling every dependency: + +```bash +uv pip install -e .[duckdb] # core +uv pip install -e .[postgres] +uv pip install -e .[bigquery] +uv pip install -e .[bigquery_bf] +uv pip install -e .[spark] +uv pip install -e .[snowflake] +# or uv pip install -e .[full] for an all-in-one sweep +``` + --- ## πŸ§‘β€πŸ€β€πŸ§‘ Code of Conduct diff --git a/Makefile b/Makefile index 3c59f7f..7e9b8cb 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,6 @@ SHELL := /bin/bash -# Defaults (per CLI ΓΌberschreibbar): make FF_PROJECT=examples/postgres FF_ENV=stg FF_PROJECT ?= examples/simple_duckdb FF_DB ?= $(FF_PROJECT)/.local/demo.duckdb FF_ENV ?= dev diff --git a/Makefile.pipeline b/Makefile.pipeline index 4b82eb8..a628cfe 100644 --- a/Makefile.pipeline +++ b/Makefile.pipeline @@ -43,7 +43,7 @@ demo: seed run dag demo-open test clean: rm -rf .local "$(FF_PROJECT)/docs" dist build *.egg-info -# --- Cache demos (v0.3) --- +# --- Cache demos --- cache_rw_first: # first run writes cache and meta diff --git a/README.md b/README.md index b0d43e9..71d8392 100644 --- a/README.md +++ b/README.md @@ -1,215 +1,48 @@ -# FastFlowTransform (PoC 0.5.1) +# FastFlowTransform -[![CI](https://github.com///actions/workflows/ci.yml/badge.svg)](https://github.com///actions/workflows/ci.yml) +[![CI](https://github.com/MirrorsAndMisdirections/FastFlowTransform/actions/workflows/ci.yml/badge.svg)](https://github.com/MirrorsAndMisdirections/FastFlowTransform/actions/workflows/ci.yml) [![PyPI version](https://img.shields.io/pypi/v/fastflowtransform.svg)](https://pypi.org/project/fastflowtransform/) -> ⚠️ **Project status:** early proof-of-concept. Stable enough for demos and smaller workflows. Public APIs may still change. +FastFlowTransform (FFT) is a SQL + Python data modeling engine with a deterministic DAG, level-wise parallelism, optional caching, incremental builds, auto-docs, and built-in data-quality tests. Projects are plain directories containing models, seeds, and YAML config; the `fft` CLI handles compilation, execution, docs, and validation across multiple execution engines. ---- - -## Table of Contents - -- [Overview](#overview) -- [Key Features](#key-features) -- [Requirements](#requirements) -- [Installation](#installation) -- [Quickstart](#quickstart) -- [Documentation](#documentation) -- [Contributing](#contributing) -- [License](#license) - ---- - -## Overview - -FastFlowTransform combines SQL and Python models in a lightweight DAG engine. A project is simply a directory with models, optional seeds, and configuration. The CLI renders SQL, runs Python models, materialises results, generates HTML documentation, and executes data-quality checks against multiple execution backends. - -> ℹ️ **Project layout & CLI overview** -> Curious about the full folder structure, Makefile targets, or example models? See the *Project Layout* and related sections in the [User Guide](docs/Technical_Overview.md#project-layout). - ---- - -## Key Features - -- **Polyglot modelling:** build transformation nodes in SQL (`*.ff.sql`) or Python (`*.ff.py`) and wire them together with `ref()`/`source()` and `deps=[...]`. -- **Multiple executors:** DuckDB (local default), Postgres, BigQuery (classic + BigFrames), Databricks Spark, and Snowflake Snowpark are supported via pluggable executors. -- **Deterministic DAG:** dependencies are resolved statically; `fft dag` renders either Mermaid source or a ready-to-view HTML mini site. -- **Data quality built in:** configure checks such as `not_null`, `unique`, `row_count_between`, `greater_equal`, `non_negative_sum`, and `freshness` in `project.yml`. -- **Environment-aware configuration:** `profiles.yml` plus environment variables (`FF_*`) drive executor settings; CLI flags can override at runtime. -- **Seeds & docs:** `fft seed` loads CSV/Parquet seeds, and `fft dag --html` produces browsable documentation for every model. - ---- +## Highlights +- SQL or Python models (`*.ff.sql` / `*.ff.py`) wired with `ref()` / `source()` / `deps=[...]`. +- Executors for DuckDB, Postgres, BigQuery (pandas + BigFrames), Databricks/Spark, and Snowflake Snowpark. +- Level-wise parallel scheduler with cache fingerprints, rebuild flags, and state/result selectors. +- Incremental and materialized models with engine-specific merge/append hooks. +- Tests everywhere: schema/YAML checks, reconciliation rules, and fast model unit tests (`fft utest`). +- Docs on demand: `fft dag --html` and `fft docgen` generate a browsable site plus JSON artifacts; optional `sync-db-comments` to push descriptions to Postgres/Snowflake. +- HTTP helpers for Python models (`fastflowtransform.api.http`) and Jinja macros/config for templating. ## Requirements - -- Python **3.12+** -- Optional client libraries per executor (e.g. `google-cloud-bigquery`, `snowflake-snowpark-python`, `pyspark`, appropriate database drivers). Install only what you need for your chosen backend. - ---- - -## Installation - -```bash -python -m pip install --upgrade pip -pip install -e . -# Optional: install pre-commit hooks -pip install pre-commit -pre-commit install -``` - -You can also bootstrap everything with the provided Makefile: - -```bash -make install # upgrades pip + installs FastFlowTransform in editable mode -``` - ---- - -## Quickstart - -### Project skeleton (optional) - -```bash -fft init ./demo_project --engine duckdb -``` - -`fft init` generates a non-interactive skeleton (no demo models) and adds inline comments pointing to the relevant documentation pages. - -> πŸ“š **Read more… CLI-Details** -> For flag referencees, automatization and backgrounds see [`docs/Technical_Overview.md`](docs/Technical_Overview.md#cli-flows). - -Run the end-to-end DuckDB demo (seed β†’ run β†’ docs β†’ tests) in under a minute: - -```bash -make demo -``` - -The target project lives in `examples/simple_duckdb`. After the demo finishes you'll find the rendered DAG at `examples/simple_duckdb/site/dag/index.html`. Open it via: - -```bash -open examples/simple_duckdb/site/dag/index.html # macOS -xdg-open examples/simple_duckdb/site/dag/index.html # Linux -``` - -If you prefer manual control: - -```bash -fft seed examples/simple_duckdb --env dev -fft run examples/simple_duckdb --env dev -fft dag examples/simple_duckdb --env dev --html -fft test examples/simple_duckdb --env dev --select batch -``` - ---- - -> For a deep dive into the v0.3 features, see **[Parallelism & Cache](docs/Cache_and_Parallelism.md)**. - -## Parallelism & Cache (v0.3) - -FastFlowTransform 0.3 adds a level-wise parallel scheduler and an opt-in build cache. - -### Parallel execution -- DAG is split into **levels** (all nodes with the same maximum distance from sources). -- Within a level, up to `--jobs` nodes run **in parallel**. Dependencies are never violated. -- `--keep-going`: tasks already started in a level run to completion, but **subsequent levels won’t start** if any task in the current level fails. - -**Examples** -```bash -# run with 4 workers per level -fft run examples/simple_duckdb --env dev --jobs 4 - -# keep tasks in the current level running even if one fails -fft run examples/simple_duckdb --env dev --jobs 4 --keep-going -``` - -### Cache modes -The cache decides whether a node can be **skipped** when nothing relevant changed. - -``` ---cache=off # always build ---cache=rw # default: skip on match; write cache after build ---cache=ro # skip on match; build on miss, but don't write cache ---cache=wo # always build and write cache ---rebuild # ignore cache for selected nodes ---no-cache # alias for --cache=off -``` - -**When is a node skipped?** -FastFlowTransform computes a **fingerprint** from: -- SQL/Python source (rendered SQL or function source) -- environment context (engine, profile name, selected `FF_*` env vars, normalized `sources.yml`) -- **dependency fingerprints** (change upstream β‡’ downstream fingerprint changes) -The node is skipped if the fingerprint matches the on-disk cache **and** the physical relation exists. - -**Examples** -```bash -# first run (build + cache write) -fft run . --env dev --cache=rw - -# second run (no-op if nothing changed) -fft run . --env dev --cache=rw - -# force rebuild of a specific model -fft run . --env dev --cache=rw --rebuild marts_daily.ff - -# diagnose a surprising skip: change an FF_* env var to invalidate fingerprints -FF_DEMO_TOGGLE=1 fft run . --env dev --cache=rw -``` - -**Troubleshooting** -- *β€œWhy did it skip?”* β†’ Compare your last changes: SQL/Python code, `sources.yml`, `FF_*` env vars, profile/engine. Any change alters the fingerprint. -- *β€œRelation missing but cache says skip”* β†’ FastFlowTransform also checks relation existence; if it was dropped externally, it will **rebuild**. -- *β€œParallel tasks interleave logs”* β†’ Logs are serialized via an internal queue to keep lines readable; use `-v`/`-vv` for more detail. - ---- - -## Selective runs - -Use patterns to run only a subgraph. - -- `--select `: builds only targets that match **and their dependencies**. -- `--exclude `: excludes matching targets from the build (deps remain if still required). - -Examples: - fft run . --select marts_daily.ff - fft run . --exclude 'mart_*' - ---- - -## Rebuild controls - -- `--rebuild` β†’ rebuild **all selected** nodes (ignore cache). -- `--rebuild-only NAME …` β†’ rebuild only the specified nodes (ignore cache). - -These flags compose with `--select/--exclude`. - -Examples: - # Rebuild everything that matches --select - fft run . --select marts_daily.ff --rebuild - - # Rebuild only a specific node - fft run . --rebuild-only marts_daily.ff - ---- - -## Documentation - -- **Documentation hub:** choose your path (operators vs contributors) β€” see [`docs/index.md`](docs/index.md). -- **User & operator guide:** project layout, CLI usage, troubleshooting tips β€” see [`docs/Technical_Overview.md`](docs/Technical_Overview.md). -- **Docgen shortcut:** append `--open-source` to `fft docgen ...` to launch the freshly rendered `index.html` immediately; use `--no-schema` when column introspection should be skipped. -- **Modeling reference:** configuration, Jinja helpers, macros β€” see [`docs/Config_and_Macros.md`](docs/Config_and_Macros.md). -- **API calls in Python models:** [`docs/API_Models.md`](docs/API_Models.md) -- **Database comments sync:** preview database comment updates with `fft sync-db-comments . --env dev --dry-run` before applying them to Postgres or Snowflake. -- **Examples:** runnable demo projects live under `examples/`; - ---- +- Python 3.12+ +- Engine extras installed only as needed (e.g. BigQuery, Snowflake, Spark/Delta, Postgres drivers). The core DuckDB path works out of the box. + +## Install & Quickstart +- Pick the engine extras you need (combine as `pkg[a,b]`): + - DuckDB/core: `pip install fastflowtransform` + - Postgres: `pip install fastflowtransform[postgres]` + - BigQuery (pandas): `pip install fastflowtransform[bigquery]` + - BigQuery (BigFrames): `pip install fastflowtransform[bigquery_bf]` + - Databricks/Spark + Delta: `pip install fastflowtransform[spark]` + - Snowflake Snowpark: `pip install fastflowtransform[snowflake]` + - Everything: `pip install fastflowtransform[full]` +- Installation and first run: see `docs/Quickstart.md` (venv + editable install, DuckDB demo, and init walkthrough). +- CLI usage and flags: see `docs/CLI_Guide.md`. +- Makefile shortcut: `make demo` runs the simple DuckDB example end-to-end and opens the DAG (`examples/simple_duckdb`). + +## Docs & examples +- Docs hub: `docs/index.md` or https://fastflowtransform.com. +- Operational guide & architecture: `docs/Technical_Overview.md`. +- Modeling reference & macros: `docs/Config_and_Macros.md`. +- Parallelism, cache, and state selection: `docs/Cache_and_Parallelism.md`, `docs/State_Selection.md`. +- Incremental models: `docs/Incremental.md`. +- Data-quality + YAML tests: `docs/Data_Quality_Tests.md`, `docs/YAML_Tests.md`, `docs/Unit_Tests.md`. +- CLI details and troubleshooting: `docs/CLI_Guide.md`, `docs/Troubleshooting.md`. +- Runnable demos live under `examples/` (basic, materializations, incremental, DQ, macros, cache, env matrix, API, events). ## Contributing - -Issues and pull requests are welcome! Please read [`Contributing.md`](./Contributing.md) for guidelines, development setup, and testing instructions. Sharing minimal reproduction steps plus `fft --version` output greatly speeds up reviews. - ---- +Issues and PRs are welcome. See `Contributing.md` for development setup, testing (`make demo`, `uv run pytest`, `fft utest`), and code-style guidelines. ## License - -FastFlowTransform is licensed under the [Apache License 2.0](./License). +Apache 2.0 β€” see `License.md`. diff --git a/_scripts/concat_docs.py b/_scripts/concat_docs.py index 0b04287..fe59d5c 100644 --- a/_scripts/concat_docs.py +++ b/_scripts/concat_docs.py @@ -131,31 +131,31 @@ def main(): "-d", "--docs-dir", default=DOCS_DIR_DEFAULT, - help="Pfad zum docs-Verzeichnis (Default: docs)", + help="Path to docs directory (Default: docs)", ) - parser.add_argument("-o", "--output", required=True, help="Ausgabedatei (z. B. Combined.md)") + parser.add_argument("-o", "--output", required=True, help="Output file (e.g. Combined.md)") parser.add_argument( "--demote", action="store_true", - help="Headings ab der zweiten Datei um eine Ebene demoten (# -> ##, usw.)", + help="Demote headings starting with the second file by one level (# -> ##, etc.)", ) parser.add_argument( "--exclude", action="append", default=[], - help="Glob-Pattern zum Ausschließen (z. B. 'reference/**'). Mehrfach nutzbar.", + help="Glob pattern to exclude (e.g. 'reference/**'). Can be used multiple times.", ) parser.add_argument( "--no-nav", action="store_true", - help="mkdocs.yml ignorieren und alphabetisch alle .md zusammenfΓΌgen", + help="Ignore mkdocs.yml and concatenate all .md alphabetically", ) args = parser.parse_args() project_root = Path(".").resolve() docs_dir = (project_root / args.docs_dir).resolve() if not docs_dir.exists(): - print(f"Fehler: docs-Verzeichnis nicht gefunden: {docs_dir}", file=sys.stderr) + print(f"Error: docs directory not found: {docs_dir}", file=sys.stderr) sys.exit(1) # 1) Order from mkdocs.yml (if not disabled / available) @@ -176,7 +176,7 @@ def main(): seen.add(rel.as_posix()) if not ordered: - print("Keine Markdown-Dateien gefunden.", file=sys.stderr) + print("No Markdown files found.", file=sys.stderr) sys.exit(2) out_path = Path(args.output).resolve() @@ -194,7 +194,7 @@ def main(): out_text = f"# Combined Documentation\n\n" + "\n".join(parts) out_path.write_text(out_text, encoding="utf-8") - print(f"βœ”οΈ {len(ordered)} Dateien zusammengefΓΌhrt β†’ {out_path}") + print(f"βœ”οΈ {len(ordered)} files merged β†’ {out_path}") if __name__ == "__main__": diff --git a/docs/Api_Models.md b/docs/Api_Models.md index 0d93861..8e3e1b6 100644 --- a/docs/Api_Models.md +++ b/docs/Api_Models.md @@ -306,4 +306,4 @@ fft run . --env dev --select dim_countries_from_api --http-cache ro - Technical guide: *Developer Guide – Architecture & Internals* - Unit tests: `tests/api/test_http_*.py` -- Runtime & cache: *Parallelism & Cache (v0.3)* +- Runtime & cache: *Parallelism & Cache* diff --git a/docs/CLI_Guide.md b/docs/CLI_Guide.md index 042e5ed..3c6f327 100644 --- a/docs/CLI_Guide.md +++ b/docs/CLI_Guide.md @@ -9,7 +9,7 @@ FastFlowTransform’s CLI is the entry point for seeding data, running DAGs, gen | `fft seed [--env dev]` | Materialize CSV/Parquet seeds into the configured engine. | | `fft run [--env dev]` | Execute the DAG (obeys cache + parallel flags). | | `fft dag --html` | Render the DAG graph/site for quick inspection. | -| `fft docgen --out site/docs` | Generate the full documentation bundle (graph + model pages + optional JSON). | +| `fft docgen [--out site/docs] [--emit-json path] [--open-source]` | Generate the full documentation bundle (graph + model pages + optional JSON). Default output is `/site/docs`. | | `fft test [--env dev]` | Run schema/data-quality tests defined in `project.yml` or schema YAML files. | | `fft utest ` | Execute unit tests defined under `tests/unit/*.yml`. | | `fft sync-db-comments ` | Push model/column descriptions into Postgres or Snowflake comments. | diff --git a/docs/Cache_and_Parallelism.md b/docs/Cache_and_Parallelism.md index 479ccec..596b87d 100644 --- a/docs/Cache_and_Parallelism.md +++ b/docs/Cache_and_Parallelism.md @@ -233,6 +233,6 @@ FF_RUN_DATE=2025-01-01 fft run . --env dev --cache=rw @@ -10,6 +10,7 @@ - [User Guide – Operational](./Technical_Overview.md#part-i--operational-guide) - [Modeling Reference](./Config_and_Macros.md) - - [Parallelism & Cache (v0.3)](./Cache_and_Parallelism.md) +- [Parallelism & Cache](./Cache_and_Parallelism.md) - [Developer Guide – Architecture & Internals](./Technical_Overview.md#part-ii--architecture--internals) ```` diff --git a/docs/Config_and_Macros.md b/docs/Config_and_Macros.md index cdcdca3..2ceeac7 100644 --- a/docs/Config_and_Macros.md +++ b/docs/Config_and_Macros.md @@ -1,8 +1,8 @@ -# FastFlowTransform Modeling Reference (v0.1) +# FastFlowTransform Modeling Reference > Authoritative reference for FastFlowTransform’s modeling layer: SQL/Python models, configuration macros, templating helpers, and testing hooks. -> Works with FastFlowTransform v0.1 (T1–T11). Supported engines: DuckDB, Postgres, BigQuery (pandas & BigFrames), Databricks/Spark, Snowflake/Snowpark. -> **Execution & Cache (v0.3) quick notes** +> Supported engines: DuckDB, Postgres, BigQuery (pandas & BigFrames), Databricks/Spark, Snowflake/Snowpark. +> **Execution & Cache quick notes** > - Parallelism is level-wise; use `fft run --jobs N`. > - Use `--cache={off|ro|rw|wo}` to control skipping behavior. > - Fingerprints include rendered SQL / Python function source, selected `FF_*` env vars, `sources.yml` and upstream fingerprints. @@ -184,7 +184,7 @@ Call `config()` at the top of SQL models. Python models get the same options via ) }} ``` -Supported keys (v0.1): +Supported keys: | Key | Type | Description | |----------------|-----------------|------------------------------------------------------------------------------| @@ -307,7 +307,7 @@ from {{ ref('users.ff') }}; - Default β†’ materialized as `table`. - `materialized='view'` produces an engine-specific temporary table first, then creates/overwrites a view that selects from it. -- Ephemeral Python models are not supported in v0.1. +- Ephemeral Python models are not supported. --- diff --git a/docs/Quickstart.md b/docs/Quickstart.md index 2973c8b..a06407f 100644 --- a/docs/Quickstart.md +++ b/docs/Quickstart.md @@ -15,12 +15,33 @@ The command is non-interactive, refuses to overwrite existing directories, and l ## 1. Install & bootstrap ```bash -python -m venv .venv -. .venv/bin/activate -pip install -e ./fastflowtransform +python3 -m venv .venv +. .venv/bin/activate # or source .venv/bin/activate +pip install --upgrade pip +pip install -e . # run from the repo root; use `uv pip install --editable .` if you prefer uv fft --help ``` +Choose extras if you target other engines (combine as needed): + +```bash +# Postgres +pip install -e .[postgres] + +# BigQuery (pandas) or BigFrames +pip install -e .[bigquery] # pandas +pip install -e .[bigquery_bf] # BigFrames + +# Databricks/Spark + Delta +pip install -e .[spark] + +# Snowflake Snowpark +pip install -e .[snowflake] + +# Everything +pip install -e .[full] +``` + ## 2. Create project layout ```bash @@ -47,12 +68,19 @@ cat <<'SQL' > demo/models/users.ff.sql select id, email from {{ source('raw', 'users') }} SQL + +cat <<'YAML' > demo/profiles.yml +dev: + engine: duckdb + duckdb: + path: ".local/demo.duckdb" +YAML ``` ## 3. Seed static inputs ```bash -fft seed demo --profile dev +fft seed demo --env dev ``` This materializes the CSV into the configured engine (DuckDB by default) using `seed_users` as the physical table. @@ -60,7 +88,7 @@ This materializes the CSV into the configured engine (DuckDB by default) using ` ## 4. Run the pipeline ```bash -fft run demo --cache off +fft run demo --env dev --cache off ``` You should see log lines similar to `βœ“ L01 [DUCK] users.ff`. The resulting table lives in the target schema (`staging` in this example). @@ -80,5 +108,6 @@ You should see log lines similar to `βœ“ L01 [DUCK] users.ff`. The resulting tab - Add `project.yml` for reusable `vars:` and metadata - Explore `fft docs` to generate HTML documentation - Use engine profiles under `profiles.yml` to target Postgres, BigQuery, or Databricks (path-based sources supported via `format` + `location` overrides) +- Render the DAG site for this project: `fft dag demo --env dev --html` (find it under `demo/site/dag/index.html`) Refer to `docs/Config_and_Macros.md` for advanced configuration options. diff --git a/docs/Technical_Overview.md b/docs/Technical_Overview.md index 7612bbf..2aec7df 100644 --- a/docs/Technical_Overview.md +++ b/docs/Technical_Overview.md @@ -1,4 +1,4 @@ -# 🧭 FastFlowTransform – Technical Developer Documentation (v0.4) +# 🧭 FastFlowTransform – Technical Developer Documentation > Status: latest updates from your context dump. This document consolidates project structure, architecture, core APIs, error handling, CLI, examples, and roadmap into a print/git-friendly Markdown. > @@ -53,64 +53,7 @@ ### Project Layout -```text -fastflowtransform/ -β”œβ”€β”€ pyproject.toml -β”œβ”€β”€ src/ -β”‚ └── fastflowtransform/ -β”‚ β”œβ”€β”€ __init__.py -β”‚ β”œβ”€β”€ cli.py -β”‚ β”œβ”€β”€ core.py -β”‚ β”œβ”€β”€ dag.py -β”‚ β”œβ”€β”€ docs.py -β”‚ β”œβ”€β”€ errors.py -β”‚ β”œβ”€β”€ settings.py -β”‚ β”œβ”€β”€ seeding.py -β”‚ β”œβ”€β”€ testing.py -β”‚ β”œβ”€β”€ validation.py -β”‚ β”œβ”€β”€ decorators.py # optional, if not kept in core.py -β”‚ β”œβ”€β”€ docs/ -β”‚ β”‚ └── templates/ -β”‚ β”‚ β”œβ”€β”€ index.html.j2 -β”‚ β”‚ └── model.html.j2 -β”‚ β”œβ”€β”€ executors/ -β”‚ β”‚ β”œβ”€β”€ __init__.py -β”‚ β”‚ β”œβ”€β”€ base.py -β”‚ β”‚ β”œβ”€β”€ duckdb_exec.py -β”‚ β”‚ β”œβ”€β”€ postgres_exec.py -β”‚ β”‚ β”œβ”€β”€ bigquery_exec.py # pandas + BigQuery client -β”‚ β”‚ β”œβ”€β”€ bigquery_bf_exec.py # BigQuery DataFrames (bigframes) -β”‚ β”‚ β”œβ”€β”€ databricks_spark_exec.py # PySpark (without pandas) -β”‚ β”‚ └── snowflake_snowpark_exec.py# Snowpark (without pandas) -β”‚ └── streaming/ -β”‚ β”œβ”€β”€ __init__.py -β”‚ β”œβ”€β”€ file_tail.py -β”‚ └── sessionizer.py -β”‚ -β”œβ”€β”€ examples/ -β”‚ β”œβ”€β”€ simple_duckdb/ -β”‚ β”‚ β”œβ”€β”€ models/ -β”‚ β”‚ β”‚ β”œβ”€β”€ users.ff.sql -β”‚ β”‚ β”‚ β”œβ”€β”€ users_enriched.ff.py -β”‚ β”‚ β”‚ β”œβ”€β”€ orders.ff.sql -β”‚ β”‚ β”‚ β”œβ”€β”€ mart_orders_enriched.ff.py -β”‚ β”‚ β”‚ └── mart_users.ff.sql -β”‚ β”‚ β”œβ”€β”€ seeds/ -β”‚ β”‚ β”‚ β”œβ”€β”€ seed_users.csv -β”‚ β”‚ β”‚ └── seed_orders.csv -β”‚ β”‚ β”œβ”€β”€ sources.yml -β”‚ β”‚ β”œβ”€β”€ project.yml -β”‚ β”‚ β”œβ”€β”€ Makefile -β”‚ β”‚ └── .local/demo.duckdb (after make seed/run) -β”‚ └── postgres/ # similar structure if needed -β”‚ -β”œβ”€β”€ tests/ -β”‚ β”œβ”€β”€ conftest.py -β”‚ β”œβ”€β”€ duckdb/ … # end-to-end + unit -β”‚ β”œβ”€β”€ postgres/ … -β”‚ └── streaming/ … -└── README.md -``` +For an up-to-date view, browse the repository tree or run `find . -maxdepth 2` from the root; all examples live under `examples/` with their own READMEs. ### Example Projects and Seeds @@ -145,20 +88,6 @@ Need to understand profile precedence, `.env` layering, or the Pydantic models t Level-wise parallelism, cache modes, fingerprint formula, and the `_ff_meta` audit table are documented in [Cache_and_Parallelism.md](./Cache_and_Parallelism.md). Use that reference for CLI examples (`--jobs`, `--cache`, `--rebuild`), skip conditions, and troubleshooting tips related to concurrency. -### Roadmap Snapshot - -| Version | Content | -|---------|---------------------------------------------------| -| 0.2 | `config(materialized=...)`, Jinja macros, variables | -| 0.3 | Parallel execution, cache | -| 0.4 | Incremental models | -| 0.5 | Streaming connectors (Kafka, S3) | -| 1.0 | Stable API, plugin SDK | - -> See also: feature pyramid & roadmap phases (OSS/SaaS) in the separate document. - ---- - ### Cross-Table Reconciliations Reconciliation tests (`reconcile_equal`, `reconcile_ratio_within`, `reconcile_diff_within`, `reconcile_coverage`) are fully documented in the [Data Quality Test Reference](./Data_Quality_Tests.md#cross-table-reconciliations). Use that guide for YAML schemas, tolerance parameters, and engine notes before wiring the checks into `fft test`. @@ -293,13 +222,13 @@ class BaseExecutor(ABC): def _materialize_relation(self, relation: str, df: pd.DataFrame, node: Node) -> None: ... ``` -**DuckDB (`duckdb_exec.py`)** +**DuckDB (`duckdb.py`)** - `run_sql(node, env)` renders Jinja (`ref/source`) and executes the SQL. - `_read_relation` loads a table as `DataFrame`; surfaces actionable errors when a dependency is missing. - `_materialize_relation` writes the `DataFrame` as a table (`create or replace table ...`). -**Postgres (`postgres_exec.py`)** +**Postgres (`postgres.py`)** - `_SAConnShim` (compatible with `testing._exec`). - `run_sql` renders SQL and rewrites `CREATE OR REPLACE TABLE` to `DROP + CREATE AS`. @@ -357,41 +286,7 @@ def seed_project(project_dir: Path, executor, schema: Optional[str] = None) -> i ### CLI Implementation -Operational usage lives in [CLI Flows](#cli-flows). This section drills into the Typer command definitions in `cli.py`. - -**Commands:** - -- `fft run [--env dev] [--engine ...]` -- `fft dag [--env dev] [--html] [--select ...] [--with-schema/--no-schema]` -- `fft docgen [--env dev] [--out dir] [--emit-json path] [--open-source]` -- `fft test [--env dev] [--select batch|streaming|tag:...]` -- `fft seed [--env dev]` -- `fft sync-db-comments [--env dev] [--dry-run]` -- `fft utest [--env dev] [--cache off|ro|rw] [--reuse-meta]` -- `fft --version` - -**Key components:** - -```python -def _load_project_and_env(project_arg) -> tuple[Path, Environment]: ... -def _resolve_profile(env_name, engine, proj) -> tuple[EnvSettings, Profile]: ... -def _get_test_con(executor: Any) -> Any: ... -``` - -**Test summary (exit 2 on failures):** - -``` -Data Quality Summary -──────────────────── -βœ… not_null users.email (3ms) -❌ unique users.id (2ms) - ↳ users.id has 1 duplicate - -Totals -────── -βœ“ passed: 1 -βœ— failed: 1 -``` +Operational usage lives in [CLI Flows](#cli-flows) and the dedicated [CLI Guide](CLI_Guide.md). For implementation details, see the Typer commands in `src/fastflowtransform/cli/`. --- @@ -436,7 +331,7 @@ from pathlib import Path from jinja2 import Environment, FileSystemLoader from fastflowtransform.core import REGISTRY from fastflowtransform.dag import topo_sort -from fastflowtransform.executors.duckdb_exec import DuckExecutor +from fastflowtransform.executors.duckdb import DuckExecutor proj = Path("examples/simple_duckdb").resolve() REGISTRY.load_project(proj) diff --git a/docs/examples/API_Demo.md b/docs/examples/API_Demo.md index 1a9d6d3..104c43b 100644 --- a/docs/examples/API_Demo.md +++ b/docs/examples/API_Demo.md @@ -3,7 +3,7 @@ The `examples/api_demo` scenario demonstrates how FastFlowTransform blends local data, external APIs, and multiple execution engines. It highlights: - **Hybrid data model**: joins a local seed (`crm.users`) with live user data from JSONPlaceholder. -- **Multiple environments**: switch between DuckDB, Postgres, and Databricks Spark using `profiles.yml` + `.env.*`. +- **Multiple environments**: switch between DuckDB, Postgres, Databricks Spark, and BigQuery (pandas or BigFrames client) using `profiles.yml` + `.env.*`. - **HTTP integration**: compare the built-in FastFlowTransform HTTP client (`api_users_http`) with a plain `requests` implementation (`api_users_requests`). - **Offline caching & telemetry**: inspect HTTP snapshots via `run_results.json`. - **Engine-aware registration**: scope Python models via `engine_model` and SQL models via `config(engines=[...])` so only the active engine’s nodes load. @@ -20,7 +20,8 @@ The `examples/api_demo` scenario demonstrates how FastFlowTransform blends local 'kind:seed-consumer', 'engine:duckdb', 'engine:postgres', - 'engine:databricks_spark' + 'engine:databricks_spark', + 'engine:bigquery' ] ) }} select id, email @@ -28,14 +29,14 @@ The `examples/api_demo` scenario demonstrates how FastFlowTransform blends local ``` Consumes `sources.yml β†’ crm.users` (seeded from `seeds/seed_users.csv`). -2. **API enrichment** – two Python implementations under `models/engines/duckdb/`: +2. **API enrichment** – engine-specific Python implementations under `models/engines//`: - `api_users_http.ff.py` uses the built-in HTTP wrapper (`fastflowtransform.api.http.get_df`) with cache/offline support. - `api_users_requests.ff.py` uses raw `requests` for maximum flexibility. - - Wrap engine-specific callables with `engine_model(only="duckdb", ...)` to skip registration when another engine is selected. + - Engine-specific callables are scoped with `engine_model(only=...)` (DuckDB/Postgres/Spark) or `env_match={"FF_ENGINE": "bigquery", "FF_ENGINE_VARIANT": ...}` (BigQuery pandas/BigFrames) to stay isolated per engine. 3. **Mart join** – `models/common/mart_users_join.ff.sql` ```sql - {{ config(engines=['duckdb','postgres','databricks_spark']) }} + {{ config(engines=['duckdb','postgres','databricks_spark','bigquery']) }} {% set api_users_model = var('api_users_model', 'api_users_http') %} {% set api_users_refs = { 'api_users_http': ref('api_users_http'), @@ -69,14 +70,28 @@ dev_postgres: postgres: dsn: "{{ env('FF_PG_DSN') }}" db_schema: "{{ env('FF_PG_SCHEMA', 'public') }}" + +dev_bigquery_bigframes: + engine: bigquery + bigquery: + project: "{{ env('FF_BQ_PROJECT') }}" + dataset: "{{ env('FF_BQ_DATASET', 'api_demo') }}" + location: "{{ env('FF_BQ_LOCATION', 'EU') }}" + use_bigframes: true ``` `.env.dev_*` files supply the actual values. `_load_dotenv_layered()` loads them in priority order: repo `.env` β†’ project `.env` β†’ `.env.` β†’ shell overrides (highest priority). Secrets stay out of version control. +### BigQuery specifics + +- Set `ENGINE=bigquery` in the Makefile targets and choose a client via `BQ_FRAME=pandas` or `BQ_FRAME=bigframes` (default). +- Required env vars: `FF_BQ_PROJECT`, `FF_BQ_DATASET` (defaults to `api_demo`), and optionally `FF_BQ_LOCATION`. Uncomment `allow_create_dataset` in `profiles.yml` for first-run convenience. +- BigFrames variants ingest the HTTP payload into a pandas DataFrame, then wrap it as a BigFrames DataFrame (FFT’s `get_df(..., output="bigframes")` is not implemented yet). + ## Makefile Workflow -`Makefile` chooses the profile via `ENGINE` (`duckdb`/`postgres`/`databricks_spark`) and wraps the main commands: +`Makefile` chooses the profile via `ENGINE` (`duckdb`/`postgres`/`databricks_spark`/`bigquery`) and wraps the main commands. For BigQuery, set `BQ_FRAME=pandas|bigframes`: ```make ENGINE ?= duckdb @@ -85,6 +100,14 @@ ifeq ($(ENGINE),duckdb) PROFILE_ENV = dev_duckdb endif ... +ifeq ($(ENGINE),bigquery) + ENGINE_TAG = engine:bigquery + ifeq ($(BQ_FRAME),pandas) + PROFILE_ENV = dev_bigquery_pandas + else + PROFILE_ENV = dev_bigquery_bigframes + endif +endif seed: uv run fft seed "$(PROJECT)" --env $(PROFILE_ENV) @@ -98,6 +121,7 @@ Common targets: |--------------------------|-------------| | `make ENGINE=duckdb seed`| Materialize seeds into DuckDB. | | `make ENGINE=postgres run`| Execute the full pipeline against Postgres. | +| `make ENGINE=bigquery run BQ_FRAME=bigframes`| Run against BigQuery (default BigFrames client; set `BQ_FRAME=pandas` to switch). | | `make dag` | Render documentation (`site/dag/`). | | `make api-run` | Run only API models (uses HTTP cache). | | `make api-offline` | Force offline mode (`FF_HTTP_OFFLINE=1`). | @@ -107,7 +131,7 @@ HTTP tuning parameters (`FF_HTTP_ALLOWED_DOMAINS`, cache dir, timeouts) live in ## End-to-End Demo -1. **Select engine**: `make ENGINE=duckdb` (default). Set `ENGINE=postgres` or `ENGINE=databricks_spark` to switch. +1. **Select engine**: `make ENGINE=duckdb` (default). Set `ENGINE=postgres`, `ENGINE=databricks_spark`, or `ENGINE=bigquery BQ_FRAME=` to switch. 2. **Seed data**: `make seed` 3. **Run pipeline**: `make run` 4. **Explore docs**: `make dag` β†’ open `examples/api_demo/site/dag/index.html` diff --git a/docs/examples/Basic_Demo.md b/docs/examples/Basic_Demo.md index baea5de..7ba73e2 100644 --- a/docs/examples/Basic_Demo.md +++ b/docs/examples/Basic_Demo.md @@ -1,10 +1,12 @@ # Basic Demo Project -The `examples/basic_demo` project shows the smallest end-to-end FastFlowTransform pipeline. It combines one seed, a staging model, and a final mart while staying portable across DuckDB, Postgres, and Databricks Spark. +The `examples/basic_demo` project shows the smallest end-to-end FastFlowTransform pipeline. It combines one seed, a staging model, and a final mart while staying portable across DuckDB, Postgres, Databricks Spark, and BigQuery. ## Why it exists + - **Start small** – demonstrate the minimum folder structure (`seeds/`, `models/`, `profiles.yml`) needed to run `fft`. - **Engine parity** – prove that a single project can target multiple engines by swapping profiles. +- **Cloud & local** – show that the same project runs both on local engines (DuckDB/Postgres/Spark) and in a cloud warehouse (BigQuery). - **Understand outputs** – show where documentation and manifests land after a run. Use it as a sandbox before adding your own sources, macros, or Python models. @@ -13,12 +15,12 @@ Use it as a sandbox before adding your own sources, macros, or Python models. | Path | Purpose | |------|---------| -| `seeds/seed_users.csv` | Sample CRM-style user data. `fft seed` materializes it as `crm.users`. | +| `seeds/seed_users.csv` | Sample CRM-style user data. `fft seed` materializes it as a physical `seed_users` table in the active engine (schema/dataset depends on the profile). | | `models/staging/users_clean.ff.sql` | Normalizes emails, casts types, and tags the model for all engines. | | `models/marts/mart_users_by_domain.ff.sql` | Aggregates users per email domain and records the first/last signup dates. | -| `models/engines/*/mart_latest_signup.ff.py` | Engine-specific Python models (pandas for DuckDB/Postgres, PySpark for Databricks) selecting the most recent signup per domain from the staging view. | -| `profiles.yml` | Declares `dev_duckdb`, `dev_postgres`, and `dev_databricks` profiles driven by environment variables. | -| `.env.dev_*` | Template environment files you can `source` per engine. | +| `models/engines/*/mart_latest_signup.ff.py` | Engine-specific Python models selecting the most recent signup per domain from the staging view:
β€’ pandas for DuckDB/Postgres
β€’ PySpark for Databricks
β€’ BigQuery DataFrames (BigFrames) for BigQuery. | +| `profiles.yml` | Declares `dev_duckdb`, `dev_postgres`, `dev_databricks`, and `dev_bigquery` profiles driven by environment variables. | +| `.env.dev_*` | Template environment files you can `source` per engine (`.env.dev_duckdb`, `.env.dev_postgres`, `.env.dev_databricks`, `.env.dev_bigquery`). | | `Makefile` | One command (`make demo ENGINE=…`) to seed, run, document, test, and preview results. | ## Running the demo @@ -26,24 +28,61 @@ Use it as a sandbox before adding your own sources, macros, or Python models. 1. `cd examples/basic_demo` 2. Choose an engine and export its environment variables: ```bash + # DuckDB set -a; source .env.dev_duckdb; set +a - # swap to .env.dev_postgres or .env.dev_databricks for other engines + + # Postgres + # set -a; source .env.dev_postgres; set +a + + # Databricks Spark + # set -a; source .env.dev_databricks; set +a + + # BigQuery (choose one) + # set -a; source .env.dev_bigquery_pandas; set +a # pandas client + # set -a; source .env.dev_bigquery_bigframes; set +a # BigFrames ``` -3. Execute the full flow: + +3. Execute the full flow for the selected engine: + ```bash + # DuckDB / Postgres / Databricks make demo ENGINE=duckdb + # make demo ENGINE=postgres + # make demo ENGINE=databricks_spark + + # BigQuery (set BQ_FRAME to choose pandas vs bigframes) + # builds into ..* + # requires a GCP project, dataset, and credentials (see BigQuery setup docs) + # set profiles.yml β†’ bigquery.allow_create_dataset: true if the dataset should be auto-created + # make demo ENGINE=bigquery BQ_FRAME=bigframes + # make demo ENGINE=bigquery BQ_FRAME=pandas + ``` + + The Makefile runs `fft seed`, `fft run`, `fft dag`, and `fft test`. + + To open the rendered DAG site after a run: + + ```bash + make show ENGINE=duckdb + make show ENGINE=bigquery ``` - The Makefile runs `fft seed`, `fft run`, `fft dag`, `fft test`, and `fft show basic_demo.mart_users_by_domain`. To preview the Python mart, run `make show ENGINE=duckdb SHOW_MODEL=mart_latest_signup` (or swap `ENGINE` as needed). 4. Inspect artifacts: - - `.fastflowtransform/target/manifest.json` and `run_results.json` - - `site/dag/index.html` for the rendered model graph - - CLI output from `fft show` displaying the aggregated mart -The demo also enables baseline data quality checks in `project.yml`. Running `fft test` (or `make test`) verifies that primary keys remain unique/not-null across `seed_users`, `users_clean`, `mart_users_by_domain`, and the Python mart, while ensuring aggregate metrics such as `user_count` never drop below zero and each domain appears only once in `mart_latest_signup`. + * `.fastflowtransform/target/manifest.json` and `run_results.json` + * `site/dag/index.html` for the rendered model graph + * Use your engine’s client (or `fft run` logs) to inspect the mart outputs + +## Data quality tests + +The demo enables baseline data quality checks in `project.yml`. Running `fft test` (or `make test ENGINE=…`) verifies that: + +* Primary keys remain unique/not-null across: -## Next steps + * `seed_users` + * `users_clean` + * `mart_users_by_domain` + * the Python mart `mart_latest_signup` +* Aggregate metrics such as `user_count` never drop below zero. +* Each email domain appears only once in `mart_latest_signup`. -- Add more CSVs under `seeds/` and declare them in `sources.yml`. -- Create additional staging models so marts can reuse normalized data. -- Introduce Python models or macros mirroring how the API demo scales up. -- Update `.env.dev_*` with real credentials once you connect to shared databases. +These tests run against whatever engine/profile is active β€” including BigQuery, where they execute as standard SQL queries on the configured dataset. diff --git a/docs/examples/DQ_Demo.md b/docs/examples/DQ_Demo.md index 257b44a..8bbbde0 100644 --- a/docs/examples/DQ_Demo.md +++ b/docs/examples/DQ_Demo.md @@ -49,6 +49,8 @@ examples/dq_demo/ .env.dev_duckdb .env.dev_postgres .env.dev_databricks + .env.dev_bigquery_pandas + .env.dev_bigquery_bigframes Makefile # optional, convenience wrapper around fft commands profiles.yml project.yml @@ -339,6 +341,24 @@ This executes just the cross-table checks, which is handy when you’re iteratin --- +## BigQuery variant (pandas or BigFrames) + +To run the same demo on BigQuery: + +1. Copy `.env.dev_bigquery_pandas` or `.env.dev_bigquery_bigframes` to `.env` and fill in: + ```bash + FF_BQ_PROJECT= + FF_BQ_DATASET=dq_demo + FF_BQ_LOCATION= # e.g., EU or US + GOOGLE_APPLICATION_CREDENTIALS=../secrets/.json # or rely on gcloud / WIF + ``` +2. Run via the Makefile from `examples/dq_demo`: + ```bash + make demo ENGINE=bigquery BQ_FRAME=pandas # or bigframes + ``` + +Both profiles accept `allow_create_dataset` in `profiles.yml` if you want the example to create the dataset automatically. + ## Things to experiment with To understand the tests better, intentionally break the data and re-run `fft test`: diff --git a/docs/examples/Incremental_Demo.md b/docs/examples/Incremental_Demo.md index d974e78..3005b1a 100644 --- a/docs/examples/Incremental_Demo.md +++ b/docs/examples/Incremental_Demo.md @@ -1,6 +1,6 @@ # Incremental, Delta & Iceberg Demo -This example project shows how to use **incremental models** and **Delta-/Iceberg-style merges** in FastFlowTransform across DuckDB, Postgres and Databricks Spark (Parquet, Delta & Iceberg). +This example project shows how to use **incremental models** and **Delta-/Iceberg-style merges** in FastFlowTransform across DuckDB, Postgres, Databricks Spark (Parquet, Delta & Iceberg), and BigQuery (pandas or BigFrames). It is intentionally small and self-contained so you can copy/paste patterns into your own project. @@ -24,6 +24,8 @@ incremental_demo/ .env.dev_postgres .env.dev_databricks_delta .env.dev_databricks_iceberg + .env.dev_bigquery_pandas + .env.dev_bigquery_bigframes Makefile profiles.yml project.yml @@ -44,6 +46,11 @@ incremental_demo/ fct_events_py_incremental.ff.py databricks_spark/ fct_events_py_incremental.ff.py + bigquery/ + pandas/ + fct_events_py_incremental.ff.py + bigframes/ + fct_events_py_incremental.ff.py ``` *Your actual filenames may differ slightly; the concepts are the same.* @@ -71,6 +78,7 @@ The demo revolves around a tiny `events` dataset and three different ways to bui * DuckDB / Postgres: incremental insert/merge in SQL * Databricks Spark: `MERGE INTO` for Delta or Iceberg where available (Spark 4), with a fallback full-refresh strategy for other formats + * BigQuery: pandas- or BigFrames-backed DataFrame models with incremental merge logic handled by the BigQuery executor 4. **Iceberg profile for Spark 4** @@ -240,6 +248,7 @@ On subsequent runs, the engine evaluates the `delta.sql` snippet and: * **DuckDB / Postgres**: inserts or merges the resulting rows into the target table * **Databricks Spark**: tries a `MERGE INTO` (Delta) and falls back to a full-refresh if necessary +* **BigQuery**: applies incremental insert/merge logic in SQL via the BigQuery executor --- @@ -313,6 +322,8 @@ Files: models/engines/duckdb/fct_events_py_incremental.ff.py models/engines/postgres/fct_events_py_incremental.ff.py models/engines/databricks_spark/fct_events_py_incremental.ff.py +models/engines/bigquery/pandas/fct_events_py_incremental.ff.py +models/engines/bigquery/bigframes/fct_events_py_incremental.ff.py ``` Each engine variant uses the same logical signature: @@ -550,9 +561,26 @@ FFT_ACTIVE_ENV=dev_postgres fft test . \ --select tag:example:incremental_demo ``` -Packen wΓΌrde ich den Hinweis direkt an die Stelle, wo du schon beschreibst, wie man die Demo auf Databricks startet – also deine aktuelle Sektion: +### BigQuery + +```bash +# pandas +FF_ENGINE=bigquery FF_ENGINE_VARIANT=pandas FFT_ACTIVE_ENV=dev_bigquery_pandas fft seed . +FF_ENGINE=bigquery FF_ENGINE_VARIANT=pandas FFT_ACTIVE_ENV=dev_bigquery_pandas fft run . \ + --select tag:example:incremental_demo --select tag:engine:bigquery --cache rw +FF_ENGINE=bigquery FF_ENGINE_VARIANT=pandas FFT_ACTIVE_ENV=dev_bigquery_pandas fft test . \ + --select tag:example:incremental_demo + +# BigFrames +FF_ENGINE=bigquery FF_ENGINE_VARIANT=bigframes FFT_ACTIVE_ENV=dev_bigquery_bigframes fft seed . +FF_ENGINE=bigquery FF_ENGINE_VARIANT=bigframes FFT_ACTIVE_ENV=dev_bigquery_bigframes fft run . \ + --select tag:example:incremental_demo --select tag:engine:bigquery --cache rw +FF_ENGINE=bigquery FF_ENGINE_VARIANT=bigframes FFT_ACTIVE_ENV=dev_bigquery_bigframes fft test . \ + --select tag:example:incremental_demo +``` + +Ensure the service account credentials pointed to by `GOOGLE_APPLICATION_CREDENTIALS` can create/drop tables in the target dataset. -````markdown ### Databricks Spark ```bash diff --git a/docs/examples/Local_Engine_Setup.md b/docs/examples/Local_Engine_Setup.md index 70eb8c2..c75da91 100644 --- a/docs/examples/Local_Engine_Setup.md +++ b/docs/examples/Local_Engine_Setup.md @@ -25,3 +25,185 @@ `FF_DBR_ENABLE_HIVE=1`, `FF_DBR_WAREHOUSE_DIR=examples/api_demo/spark-warehouse`, `FF_DBR_DATABASE=api_demo`. - Switch the physical format by setting `FF_DBR_TABLE_FORMAT` (e.g. `delta`, requires the Delta Lake runtime); extra writer options can be supplied via `profiles.yml β†’ databricks_spark.table_options`. - Ensure your shell loads `.env.dev_databricks` (via `make`, `direnv`, or manual export) and run `make ENGINE=databricks_spark seed run`. + + +Yep, let’s bolt on a β€œhow to set it up in GCP” section that fits with what you already have. + +Here’s an extended BigQuery section you can drop into your docs (you can keep or trim the parts you already added): + +### BigQuery + +#### 1. One-time setup in Google Cloud + +You only need to do this once per project / environment. + +1. **Create (or pick) a GCP project** + + - Go to the *Google Cloud Console* β†’ **IAM & Admin β†’ Create project**. + - Give it a name, e.g. `FFT Basic Demo`, and note the **Project ID**, e.g. `fft-basic-demo`. + - All further steps refer to this project id. + +2. **Enable the BigQuery API** + + - In the console, go to **APIs & Services β†’ Library**. + - Search for **β€œBigQuery API”** and click **Enable**. + - (Optional but recommended) Also enable **BigQuery Storage API** for faster reads. + +3. **Create a BigQuery dataset** + + - Go to **BigQuery** in the console (left sidebar). + - Make sure your project `fft-basic-demo` is selected. + - Click **β€œ+ Create dataset”**: + - **Dataset ID**: e.g. `basic_demo` + - **Location type**: choose a **multi-region**, e.g.: + - `EU` or `US` + - Click **Create dataset**. + + ⚠️ **Important:** The dataset **location must match** the location you use in your env (`FF_BQ_LOCATION`). + - If your dataset is in `EU` (multi-region), then `FF_BQ_LOCATION=EU`. + - If the dataset is in a single region like `europe-west3`, use that exact region name. + +4. **Create a service account (for CI / non-interactive use)** + + For local dev you can use your own user credentials (see below), but for CI/CD or shared environments + a service account is better. + + - Go to **IAM & Admin β†’ Service Accounts β†’ Create service account**. + - Name it e.g. `fft-runner`. + - On the **Roles** step, add roles with BigQuery write access, for example: + - `BigQuery Job User` + - `BigQuery Data Editor` + - (Optionally) Restrict to dataset level later if you want stricter permissions. + + Then create a key: + + - Click your service account β†’ **Keys β†’ Add key β†’ Create new key**. + - Select **JSON**, download the file, and store it somewhere safe (e.g. `~/.config/gcloud/fft-sa.json`). + +5. **Authentication options** + + You have two ways to authenticate locally: + + **A) Application Default Credentials via gcloud (easy for dev)** + + ```bash + gcloud auth application-default login + ``` + +This opens a browser, you log in, and Google stores your ADC in +`~/.config/gcloud/application_default_credentials.json`. + +The BigQuery client in `fastflowtransform` will pick this up automatically **as long as** +`FF_BQ_PROJECT` points to a project you have access to. + +**B) Service account key (good for CI)** + +* Put the downloaded JSON key (from step 4) somewhere on disk. + +* Set the environment variable before running `fft`: + + ```bash + export GOOGLE_APPLICATION_CREDENTIALS=/path/to/fft-sa.json + ``` + +* Make sure the service account has at least: + + * `BigQuery Job User` + * `BigQuery Data Editor` + +* Optionally grant `BigQuery Data Viewer` if you’re only reading some tables. + +--- + +#### 2. Local configuration (env + profiles) + +1. **Environment file (`.env.dev_bigquery`)** + + ```env + # BigQuery connection + FF_BQ_PROJECT=fft-basic-demo # your GCP project id + FF_BQ_DATASET=basic_demo # dataset from step 3 + FF_BQ_LOCATION=EU # or europe-west3, US, etc. MUST match dataset location + + # Active fft environment name (must match profiles.yml) + FFT_ACTIVE_ENV=dev_bigquery + ``` + + Load this via `direnv`, `make`, or manual `export`. + +2. **profiles.yml** + + ```yaml + dev_bigquery: + engine: bigquery + bigquery: + project: ${FF_BQ_PROJECT} + dataset: ${FF_BQ_DATASET} + location: ${FF_BQ_LOCATION} + use_bigframes: true # Python models use BigQuery DataFrames (BigFrames) + ``` + +--- + +#### 3. Running seeds, models, and tests + +* **Seed BigQuery from `seeds/`:** + + ```bash + make ENGINE=bigquery seed + ``` + + This writes all `seeds/*.csv|parquet` to tables under + `${FF_BQ_PROJECT}.${FF_BQ_DATASET}.*`. + +* **Build models:** + + ```bash + make ENGINE=bigquery run + ``` + + * SQL models are executed as BigQuery queries. + * Python models with `only="bigquery"` run via `BigQueryBFExecutor` (BigQuery DataFrames) + and are written back into the same dataset. + +* **Run data-quality tests:** + + ```bash + make ENGINE=bigquery test + ``` + + `fft test` uses the BigQuery shim to run checks like `not_null`, `unique`, + `row_count_between`, `greater_equal`, etc. against + `${FF_BQ_PROJECT}.${FF_BQ_DATASET}.`. + +--- + +#### 4. Common BigQuery gotchas + +* **Location mismatch** + + * Error like `Location basic_demo does not support this operation` or `Not found: Dataset ...`: + + * Check the **dataset location** in the BigQuery UI. + * Make sure `FF_BQ_LOCATION` is exactly that value (`EU`, `US`, `europe-west3`, …). + * Ensure the executor is initialized with the same location (via `profiles.yml` β†’ `location`). + +* **Permission issues** + + * If you see `accessDenied` or `Permission denied`: + + * Confirm you authenticated (ADC or service account). + * Ensure your user / service account has at least: + + * `BigQuery Job User` + * `BigQuery Data Editor` on the project or dataset. + +* **Dataset not found** + + * Error `Not found: Dataset fft-basic-demo:basic_demo`: + + * Check that the dataset id matches exactly: + + * Project: `fft-basic-demo` + * Dataset: `basic_demo` + * Verify it exists and is in the same project you set in `FF_BQ_PROJECT`. diff --git a/docs/examples/Macros_Demo.md b/docs/examples/Macros_Demo.md index e296473..8dee673 100644 --- a/docs/examples/Macros_Demo.md +++ b/docs/examples/Macros_Demo.md @@ -1,6 +1,6 @@ # Macros Demo -**Goal:** Showcase **SQL Jinja macros** and **Python render-time macros** working together across engines (DuckDB, Postgres, Databricks Spark). +**Goal:** Showcase **SQL Jinja macros** and **Python render-time macros** working together across engines (DuckDB, Postgres, Databricks Spark, BigQuery). You’ll see reusable SQL helpers, engine-aware SQL generation, and Python functions exposed as Jinja globals/filters. --- @@ -20,24 +20,29 @@ examples/macros_demo/ seeds/ seed_users.csv seed_orders.csv - models/ - macros/ - utils.sql - star.sql - macros_py/ - helpers.py - common/ - stg_users.ff.sql - stg_orders.ff.sql - dim_users.ff.sql - fct_user_sales.ff.sql - engines/ - duckdb/ - py_example.ff.py - postgres/ - py_example.ff.py - databricks_spark/ - py_example.ff.py + models/ + macros/ + utils.sql + star.sql + macros_py/ + helpers.py + common/ + stg_users.ff.sql + stg_orders.ff.sql + dim_users.ff.sql + fct_user_sales.ff.sql + engines/ + duckdb/ + py_example.ff.py + postgres/ + py_example.ff.py + databricks_spark/ + py_example.ff.py + bigquery/ + bigframes/ + py_example.ff.py + pandas/ + py_example.ff.py ``` --- diff --git a/examples/_scripts/cleanup_env.py b/examples/_scripts/cleanup_env.py index fe7ae7c..90e7281 100644 --- a/examples/_scripts/cleanup_env.py +++ b/examples/_scripts/cleanup_env.py @@ -89,6 +89,58 @@ def cleanup_postgres(*, dsn: str | None, schema: str | None, dry_run: bool) -> N conn.execute(text(f'CREATE SCHEMA "{schema}"')) +def cleanup_bigquery( + *, + project_id: str | None, + dataset: str | None, + location: str | None, + dry_run: bool, +) -> None: + """ + Reset a BigQuery demo dataset by dropping and recreating it. + + Reads project/dataset/location from args/env/profile; this is meant for + isolated demo datasets (like fft-basic-demo.basic_demo), not shared prod. + """ + if not project_id: + raise ValueError("BigQuery cleanup requires FF_BQ_PROJECT or --bq-project") + if not dataset: + raise ValueError("BigQuery cleanup requires FF_BQ_DATASET or --bq-dataset") + + from google.cloud import bigquery # local import so other engines don't require it + + client = bigquery.Client(project=project_id, location=location) + full_id = f"{project_id}.{dataset}" + + if dry_run: + _log(f"[dry-run] Would delete and recreate BigQuery dataset {full_id}") + return + + # Try to preserve existing location if not explicitly given + ds_location = location + try: + ds = client.get_dataset(full_id) + if not ds_location: + ds_location = ds.location + except Exception: + # Dataset may not exist yet – that's fine, we'll just create it below. + pass + + _log(f"Deleting BigQuery dataset {full_id} (if exists, with contents)") + client.delete_dataset( + full_id, + delete_contents=True, + not_found_ok=True, + ) + + ds_obj = bigquery.Dataset(full_id) + if ds_location: + ds_obj.location = ds_location + + _log(f"Recreating BigQuery dataset {full_id} (location={ds_location or 'default'})") + client.create_dataset(ds_obj, exists_ok=True) + + def _env_flag(name: str, default: bool = False) -> bool: val = os.getenv(name) if val is None: @@ -234,7 +286,7 @@ def _load_profile(project: Path, env_name: str, engine: str | None): def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="Reset FastFlowTransform example environments.") parser.add_argument( - "--engine", required=True, choices=["duckdb", "postgres", "databricks_spark"] + "--engine", required=True, choices=["duckdb", "postgres", "databricks_spark", "bigquery"] ) parser.add_argument("--project", default=".") parser.add_argument("--env", help="Profile environment name (e.g. dev_duckdb).") @@ -249,6 +301,9 @@ def main(argv: list[str] | None = None) -> int: parser.add_argument( "--spark-use-hive", action="store_true", help="Force Hive metastore enablement for cleanup." ) + parser.add_argument("--bq-project", help="Override BigQuery project ID (FF_BQ_PROJECT).") + parser.add_argument("--bq-dataset", help="Override BigQuery dataset (FF_BQ_DATASET).") + parser.add_argument("--bq-location", help="Override BigQuery location (FF_BQ_LOCATION).") parser.add_argument("--dry-run", action="store_true") parser.add_argument( "--skip-artifacts", @@ -262,7 +317,11 @@ def main(argv: list[str] | None = None) -> int: env_name = ( args.env or os.getenv("FFT_ACTIVE_ENV") - or ("dev_" + args.engine if args.engine in {"duckdb", "postgres"} else "dev") + or ( + "dev_" + args.engine + if args.engine in {"duckdb", "postgres", "databricks_spark", "bigquery"} + else "dev" + ) ) os.environ["FFT_ACTIVE_ENV"] = env_name @@ -314,6 +373,22 @@ def main(argv: list[str] | None = None) -> int: use_hive=args.spark_use_hive or bool(profile_use_hive), dry_run=args.dry_run, ) + elif args.engine == "bigquery": + profile_bq = getattr(profile, "bigquery", None) if profile else None + profile_project = getattr(profile_bq, "project", None) if profile_bq else None + profile_dataset = getattr(profile_bq, "dataset", None) if profile_bq else None + profile_location = getattr(profile_bq, "location", None) if profile_bq else None + + project_id = args.bq_project or os.getenv("FF_BQ_PROJECT") or profile_project + dataset = args.bq_dataset or os.getenv("FF_BQ_DATASET") or profile_dataset + location = args.bq_location or os.getenv("FF_BQ_LOCATION") or profile_location + + cleanup_bigquery( + project_id=project_id, + dataset=dataset, + location=location, + dry_run=args.dry_run, + ) except Exception as exc: _log(f"Cleanup failed: {exc}") return 1 diff --git a/examples/api_demo/.env.dev_bigquery_bigframes b/examples/api_demo/.env.dev_bigquery_bigframes new file mode 100644 index 0000000..e3f91f7 --- /dev/null +++ b/examples/api_demo/.env.dev_bigquery_bigframes @@ -0,0 +1,7 @@ +# BigQuery profile for the basic demo +FF_BQ_PROJECT=fft-basic-demo +FF_BQ_DATASET=api_demo +FF_BQ_LOCATION=EU + +# Path to service account JSON key (or rely on gcloud / workload identity) +GOOGLE_APPLICATION_CREDENTIALS=../secrets/fft-bigquery-demo-key.json diff --git a/examples/api_demo/.env.dev_bigquery_pandas b/examples/api_demo/.env.dev_bigquery_pandas new file mode 100644 index 0000000..e3f91f7 --- /dev/null +++ b/examples/api_demo/.env.dev_bigquery_pandas @@ -0,0 +1,7 @@ +# BigQuery profile for the basic demo +FF_BQ_PROJECT=fft-basic-demo +FF_BQ_DATASET=api_demo +FF_BQ_LOCATION=EU + +# Path to service account JSON key (or rely on gcloud / workload identity) +GOOGLE_APPLICATION_CREDENTIALS=../secrets/fft-bigquery-demo-key.json diff --git a/examples/api_demo/Makefile b/examples/api_demo/Makefile index f6d4da5..ada8869 100644 --- a/examples/api_demo/Makefile +++ b/examples/api_demo/Makefile @@ -12,6 +12,9 @@ UV ?= uv # Engine selector (duckdb|postgres|databricks_spark) ENGINE ?= duckdb +# BigQuery frame type selector (pandas | bigframes) +BQ_FRAME ?= bigframes + # HTTP wrapper defaults (override per call if needed) # Allowed domains are comma-separated (no https://) FF_HTTP_ALLOWED_DOMAINS ?= jsonplaceholder.typicode.com,api.github.com @@ -41,9 +44,20 @@ ifeq ($(ENGINE),databricks_spark) PROFILE_ENV = dev_databricks ENGINE_TAG = engine:databricks_spark endif +ifeq ($(ENGINE),bigquery) + ENGINE_TAG = engine:bigquery + ifeq ($(BQ_FRAME),pandas) + PROFILE_ENV = dev_bigquery_pandas + else + PROFILE_ENV = dev_bigquery_bigframes + endif +endif BASE_ENV = FFT_ACTIVE_ENV=$(PROFILE_ENV) +ifeq ($(ENGINE),bigquery) + BASE_ENV := $(BASE_ENV) FF_ENGINE=$(ENGINE) FF_ENGINE_VARIANT=$(BQ_FRAME) +endif RUN_ENV = $(BASE_ENV) # Select only API demo models for the active engine (common models carry all engine tags) @@ -57,6 +71,8 @@ else ifeq ($(ENGINE),postgres) CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine postgres --env "$(PROFILE_ENV)" --project "$(PROJECT)" else ifeq ($(ENGINE),databricks_spark) CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine databricks_spark --env "$(PROFILE_ENV)" --project "$(PROJECT)" +else ifeq ($(ENGINE),bigquery) + CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine bigquery --env "$(PROFILE_ENV)" --project "$(PROJECT)" else CLEAN_CMD = $(error Unsupported ENGINE=$(ENGINE) for cleanup) endif @@ -108,7 +124,7 @@ demo-open: fi demo: clean - @echo "== πŸš€ API Demo (DuckDB) ==" + @echo "== πŸš€ API Demo ($(ENGINE)) ==" @echo "Profile=$(PROFILE_ENV) DB=$(DB) PROJECT=$(PROJECT)" +$(MAKE) seed +$(MAKE) run @@ -153,7 +169,7 @@ api-show-http: fi api-demo: clean - @echo "== 🌐 API Demo (DuckDB) ==" + @echo "== 🌐 API Demo ($(ENGINE)) ==" @echo "Profile=$(PROFILE_ENV) DB=$(DB) PROJECT=$(PROJECT)" +$(MAKE) run +$(MAKE) dag diff --git a/examples/api_demo/models/common/mart_users_join.ff.sql b/examples/api_demo/models/common/mart_users_join.ff.sql index 0197db7..ad6feef 100644 --- a/examples/api_demo/models/common/mart_users_join.ff.sql +++ b/examples/api_demo/models/common/mart_users_join.ff.sql @@ -5,7 +5,8 @@ 'scope:common', 'engine:duckdb', 'engine:postgres', - 'engine:databricks_spark' + 'engine:databricks_spark', + 'engine:bigquery' ], ) }} diff --git a/examples/api_demo/models/common/users.ff.sql b/examples/api_demo/models/common/users.ff.sql index ec68605..b170c5a 100644 --- a/examples/api_demo/models/common/users.ff.sql +++ b/examples/api_demo/models/common/users.ff.sql @@ -6,7 +6,8 @@ 'kind:seed-consumer', 'engine:duckdb', 'engine:postgres', - 'engine:databricks_spark' + 'engine:databricks_spark', + 'engine:bigquery' ], ) }} -- Simple staging table from seed diff --git a/examples/api_demo/models/engines/bigquery/bigframes/api_users_http.ff.py b/examples/api_demo/models/engines/bigquery/bigframes/api_users_http.ff.py new file mode 100644 index 0000000..a0c3c6b --- /dev/null +++ b/examples/api_demo/models/engines/bigquery/bigframes/api_users_http.ff.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +import bigframes.pandas as bpd +from fastflowtransform import engine_model +from fastflowtransform.api.http import get_df + +BFDataFrame = bpd.DataFrame + + +@engine_model( + env_match={ + "FF_ENGINE": "bigquery", + "FF_ENGINE_VARIANT": "bigframes", + }, + name="api_users_http", + deps=["users.ff"], + tags=["example:api_demo", "scope:engine", "engine:bigquery"], +) +def fetch(_: BFDataFrame) -> BFDataFrame: + """ + Fetch users via the FFT HTTP helper and return a BigFrames DataFrame. + """ + df = get_df( + url="https://jsonplaceholder.typicode.com/users", + record_path=None, + normalize=True, + output="bigframes", + ) + return df.loc[:, ["id", "email", "username", "name"]].rename(columns={"id": "api_user_id"}) # type: ignore[arg-type] diff --git a/examples/api_demo/models/engines/bigquery/bigframes/api_users_requests.ff.py b/examples/api_demo/models/engines/bigquery/bigframes/api_users_requests.ff.py new file mode 100644 index 0000000..962fb93 --- /dev/null +++ b/examples/api_demo/models/engines/bigquery/bigframes/api_users_requests.ff.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import bigframes.pandas as bpd +from fastflowtransform import engine_model + +BFDataFrame = bpd.DataFrame + +try: + import requests +except Exception as _e: # pragma: no cover + raise RuntimeError("Please install 'requests' to run this model") from _e + + +@engine_model( + env_match={ + "FF_ENGINE": "bigquery", + "FF_ENGINE_VARIANT": "bigframes", + }, + name="api_users_requests", + deps=["users.ff"], + tags=["example:api_demo", "scope:engine", "engine:bigquery"], +) +def fetch(_: BFDataFrame) -> BFDataFrame: + """Fetch users via plain requests and return a BigFrames DataFrame.""" + resp = requests.get("https://jsonplaceholder.typicode.com/users", timeout=30) + resp.raise_for_status() + df = bpd.DataFrame(resp.json()) # accepts a JSON-serialisable list of dicts + return df.loc[:, ["id", "email", "username", "name"]].rename( # type: ignore[arg-type] + columns={"id": "api_user_id"} + ) diff --git a/examples/api_demo/models/engines/bigquery/pandas/api_users_http.ff.py b/examples/api_demo/models/engines/bigquery/pandas/api_users_http.ff.py new file mode 100644 index 0000000..cd28db5 --- /dev/null +++ b/examples/api_demo/models/engines/bigquery/pandas/api_users_http.ff.py @@ -0,0 +1,26 @@ +from fastflowtransform import engine_model +from fastflowtransform.api.http import get_df +import pandas as pd + + +@engine_model( + env_match={ + "FF_ENGINE": "bigquery", + "FF_ENGINE_VARIANT": "pandas", + }, + name="api_users_http", + deps=["users.ff"], # at least one dependency is required by the executor contract + tags=["example:api_demo", "scope:engine", "engine:bigquery"], +) +def fetch(_: pd.DataFrame) -> pd.DataFrame: + """ + Fetch users from a public demo API using the built-in HTTP wrapper (pandas client). + """ + df = get_df( + url="https://jsonplaceholder.typicode.com/users", + record_path=None, # the outer JSON is already a list + normalize=True, # flatten objects to columns (address.*, company.*) + ) + + cols = [c for c in df.columns if c in ("id", "email", "username", "name")] + return df[cols].rename(columns={"id": "api_user_id"}) diff --git a/examples/api_demo/models/engines/bigquery/pandas/api_users_requests.ff.py b/examples/api_demo/models/engines/bigquery/pandas/api_users_requests.ff.py new file mode 100644 index 0000000..adf787c --- /dev/null +++ b/examples/api_demo/models/engines/bigquery/pandas/api_users_requests.ff.py @@ -0,0 +1,28 @@ +from fastflowtransform import engine_model +import pandas as pd + +try: + import requests +except Exception as _e: # pragma: no cover + raise RuntimeError("Please install 'requests' to run this model") from _e + + +@engine_model( + env_match={ + "FF_ENGINE": "bigquery", + "FF_ENGINE_VARIANT": "pandas", + }, + name="api_users_requests", + deps=["users.ff"], + tags=["example:api_demo", "scope:engine", "engine:bigquery"], +) +def fetch(_: pd.DataFrame) -> pd.DataFrame: + """Fetch users via plain requests (pandas client).""" + url = "https://jsonplaceholder.typicode.com/users" + resp = requests.get(url, timeout=30) + resp.raise_for_status() + data = resp.json() + + df = pd.DataFrame(data) + cols = [c for c in df.columns if c in ("id", "email", "username", "name")] + return df[cols].rename(columns={"id": "api_user_id"}) diff --git a/examples/api_demo/profiles.yml b/examples/api_demo/profiles.yml index 647fb16..88d7cd5 100644 --- a/examples/api_demo/profiles.yml +++ b/examples/api_demo/profiles.yml @@ -24,3 +24,21 @@ dev_databricks: spark.hadoop.datanucleus.schema.autoCreateAll: "true" spark.hadoop.javax.jdo.option.ConnectionDriverName: "org.apache.derby.jdbc.EmbeddedDriver" spark.driver.extraJavaOptions: "-Dderby.stream.error.file={{ project_dir() }}/.local/derby.log" + +dev_bigquery_bigframes: + engine: bigquery + bigquery: + project: "{{ env('FF_BQ_PROJECT') }}" + dataset: "{{ env('FF_BQ_DATASET', 'api_demo') }}" + location: "{{ env('FF_BQ_LOCATION', 'EU') }}" + use_bigframes: true + # allow_create_dataset: true # uncomment to auto-create dataset on first run + +dev_bigquery_pandas: + engine: bigquery + bigquery: + project: "{{ env('FF_BQ_PROJECT') }}" + dataset: "{{ env('FF_BQ_DATASET', 'api_demo') }}" + location: "{{ env('FF_BQ_LOCATION', 'EU') }}" + use_bigframes: false + # allow_create_dataset: true # uncomment to auto-create dataset on first run diff --git a/examples/basic_demo/.env.dev_bigquery_bigframes b/examples/basic_demo/.env.dev_bigquery_bigframes new file mode 100644 index 0000000..73a3d3e --- /dev/null +++ b/examples/basic_demo/.env.dev_bigquery_bigframes @@ -0,0 +1,7 @@ +# BigQuery profile for the basic demo +FF_BQ_PROJECT=fft-basic-demo +FF_BQ_DATASET=basic_demo +FF_BQ_LOCATION=EU + +# Path to service account JSON key (or rely on gcloud / workload identity) +GOOGLE_APPLICATION_CREDENTIALS=../secrets/fft-bigquery-demo-key.json diff --git a/examples/basic_demo/.env.dev_bigquery_pandas b/examples/basic_demo/.env.dev_bigquery_pandas new file mode 100644 index 0000000..73a3d3e --- /dev/null +++ b/examples/basic_demo/.env.dev_bigquery_pandas @@ -0,0 +1,7 @@ +# BigQuery profile for the basic demo +FF_BQ_PROJECT=fft-basic-demo +FF_BQ_DATASET=basic_demo +FF_BQ_LOCATION=EU + +# Path to service account JSON key (or rely on gcloud / workload identity) +GOOGLE_APPLICATION_CREDENTIALS=../secrets/fft-bigquery-demo-key.json diff --git a/examples/basic_demo/Makefile b/examples/basic_demo/Makefile index 8f5a0be..7c4c675 100644 --- a/examples/basic_demo/Makefile +++ b/examples/basic_demo/Makefile @@ -9,6 +9,9 @@ UV ?= uv # Engine selector (duckdb|postgres|databricks_spark) ENGINE ?= duckdb +# BigQuery frame type selector (pandas | bigframes) +BQ_FRAME ?= bigframes + # Resolve profile and tags per engine ifeq ($(ENGINE),duckdb) PROFILE_ENV = dev_duckdb @@ -22,8 +25,22 @@ ifeq ($(ENGINE),databricks_spark) PROFILE_ENV = dev_databricks ENGINE_TAG = engine:databricks_spark endif +ifeq ($(ENGINE),bigquery) + # Default tags / profile for BigQuery + ENGINE_TAG = engine:bigquery + ifeq ($(BQ_FRAME),pandas) + PROFILE_ENV = dev_bigquery_pandas + else + PROFILE_ENV = dev_bigquery_bigframes + endif +endif + +BASE_ENV = FFT_ACTIVE_ENV=$(PROFILE_ENV) FF_ENGINE=$(ENGINE) + +ifeq ($(ENGINE),bigquery) + BASE_ENV := $(BASE_ENV) FF_ENGINE_VARIANT=$(BQ_FRAME) +endif -BASE_ENV = FFT_ACTIVE_ENV=$(PROFILE_ENV) RUN_ENV = $(BASE_ENV) SELECT_FLAGS = --select tag:example:basic_demo --select tag:$(ENGINE_TAG) @@ -38,6 +55,8 @@ else ifeq ($(ENGINE),postgres) CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine postgres --env "$(PROFILE_ENV)" --project "$(PROJECT)" else ifeq ($(ENGINE),databricks_spark) CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine databricks_spark --env "$(PROFILE_ENV)" --project "$(PROJECT)" +else ifeq ($(ENGINE),bigquery) + CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine bigquery --env "$(PROFILE_ENV)" --project "$(PROJECT)" else $(error Unsupported ENGINE=$(ENGINE) - pick duckdb|postgres|databricks_spark) endif @@ -55,7 +74,7 @@ help: @echo " make demo ENGINE=$(ENGINE)" @echo " make clean ENGINE=$(ENGINE)" @echo - @echo "Variables: DB=$(DB) PROJECT=$(PROJECT) UV=$(UV)" + @echo "Variables: DB=$(DB) PROJECT=$(PROJECT) UV=$(UV) ENGINE=$(ENGINE) BQ_FRAME=$(BQ_FRAME)" seed: env $(BASE_ENV) $(UV) run fft seed "$(PROJECT)" --env $(PROFILE_ENV) diff --git a/examples/basic_demo/README.md b/examples/basic_demo/README.md index 106a814..adf92ec 100644 --- a/examples/basic_demo/README.md +++ b/examples/basic_demo/README.md @@ -1,52 +1,8 @@ # Basic demo -This project is a minimal FastFlowTransform pipeline that works unchanged on DuckDB, Postgres, and Databricks Spark. It ships with: -- `seeds/seed_users.csv` – three sample users that bootstrap the project. -- `models/staging/users_clean.ff.sql` – normalizes emails and signup timestamps. -- `models/marts/mart_users_by_domain.ff.sql` – aggregates users by email domain. -- `models/engines/*/mart_latest_signup.ff.py` – engine-scoped Python models (pandas for DuckDB/Postgres, PySpark for Databricks) that grab the latest signup per domain from the staging view. +Minimal FFT pipeline that runs unchanged on DuckDB, Postgres, Databricks Spark, and BigQuery. -## Quickstart - -1. Install the package and CLI (see repository root instructions). -2. `cd examples/basic_demo` (this folder) so relative paths line up. -3. Load one of the provided engine environments, then seed and run the project. - -> ⚠️ `make clean` (or direct calls to `cleanup_env.py`) rely on the same environment variables as the run commands. Always export the `.env.dev_*` file for the engine you are cleaning so paths, schemas, and credentials are available. - -### DuckDB - -```bash -cp .env.dev_duckdb .env.local # optional convenience copy -set -a; source .env.dev_duckdb; set +a # export FF_DUCKDB_PATH -fft seed basic_demo --env dev_duckdb -fft run basic_demo --env dev_duckdb -fft show basic_demo.mart_users_by_domain --env dev_duckdb -fft show basic_demo.mart_latest_signup --env dev_duckdb -``` - -### Postgres - -```bash -cp .env.dev_postgres .env.local # fill in FF_PG_DSN with your credentials -set -a; source .env.dev_postgres; set +a -fft seed basic_demo --env dev_postgres -fft run basic_demo --env dev_postgres -fft show basic_demo.mart_users_by_domain --env dev_postgres -fft show basic_demo.mart_latest_signup --env dev_postgres -``` - -### Databricks Spark (local or hosted) - -```bash -cp .env.dev_databricks .env.local # adjust Spark master / credentials as needed -set -a; source .env.dev_databricks; set +a -fft seed basic_demo --env dev_databricks -fft run basic_demo --env dev_databricks -fft show basic_demo.mart_users_by_domain --env dev_databricks -fft show basic_demo.mart_latest_signup --env dev_databricks -``` - -The resulting tables report user counts per email domain and spotlight the most recent signup per domain. Extend any of the CSV, SQL, or Python assets to explore more complex scenarios. - -Further background is documented in [`docs/examples/Basic_Demo.md`](../../docs/examples/Basic_Demo.md). +## How to use +- See the full walkthrough (env setup, Makefile targets, engine notes, DQ tests) in `docs/examples/Basic_Demo.md`. +- From this directory: set the desired `.env.dev_*` (for BigQuery choose `.env.dev_bigquery_pandas` or `.env.dev_bigquery_bigframes`), then run `make demo ENGINE=` (set `BQ_FRAME` to switch BigQuery client) to seed β†’ run β†’ dag β†’ test. +- To inspect results, open `site/dag/index.html` after a run or query the mart tables via your engine client. diff --git a/examples/basic_demo/models/engines/bigquery/bigframes/mart_latest_signup.ff.py b/examples/basic_demo/models/engines/bigquery/bigframes/mart_latest_signup.ff.py new file mode 100644 index 0000000..047a148 --- /dev/null +++ b/examples/basic_demo/models/engines/bigquery/bigframes/mart_latest_signup.ff.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from typing import Any + +import bigframes.pandas as bpd +from fastflowtransform import engine_model + +BFDataFrame = bpd.DataFrame + + +@engine_model( + env_match={ + "FF_ENGINE": "bigquery", + "FF_ENGINE_VARIANT": "bigframes", + }, + name="mart_latest_signup", + materialized="table", + tags=[ + "example:basic_demo", + "scope:mart", + "engine:bigquery", + ], + deps=["users_clean.ff"], + require={"users_clean.ff": ["user_id", "email", "email_domain", "signup_date"]}, +) +def build(users_clean: BFDataFrame) -> BFDataFrame: + latest = ( + users_clean.sort_values("signup_date", ascending=False) + .drop_duplicates(subset="email_domain") + .loc[:, ["email_domain", "user_id", "email", "signup_date"]] + .rename( + columns={ + "user_id": "latest_user_id", + "email": "latest_email", + "signup_date": "latest_signup_date", + } + ) # type: ignore[arg-type] + ) + return latest diff --git a/examples/basic_demo/models/engines/bigquery/pandas/mart_latest_signup.ff.py b/examples/basic_demo/models/engines/bigquery/pandas/mart_latest_signup.ff.py new file mode 100644 index 0000000..263aba9 --- /dev/null +++ b/examples/basic_demo/models/engines/bigquery/pandas/mart_latest_signup.ff.py @@ -0,0 +1,36 @@ +import pandas as pd + +from fastflowtransform import engine_model + + +@engine_model( + env_match={ + "FF_ENGINE": "bigquery", + "FF_ENGINE_VARIANT": "pandas", + }, + name="mart_latest_signup", + materialized="table", + tags=[ + "example:basic_demo", + "scope:mart", + "engine:bigquery", + ], + deps=["users_clean.ff"], + require={"users_clean.ff": ["user_id", "email", "email_domain", "signup_date"]}, +) +def build(users_clean: pd.DataFrame) -> pd.DataFrame: + """Return the latest signup per email domain using pandas (BigQuery).""" + latest = ( + users_clean.sort_values("signup_date", ascending=False) + .drop_duplicates("email_domain") + .loc[:, ["email_domain", "user_id", "email", "signup_date"]] + .rename( + columns={ + "user_id": "latest_user_id", + "email": "latest_email", + "signup_date": "latest_signup_date", + } + ) + .reset_index(drop=True) + ) + return latest diff --git a/examples/basic_demo/models/marts/mart_users_by_domain.ff.sql b/examples/basic_demo/models/marts/mart_users_by_domain.ff.sql index d74c06d..170632c 100644 --- a/examples/basic_demo/models/marts/mart_users_by_domain.ff.sql +++ b/examples/basic_demo/models/marts/mart_users_by_domain.ff.sql @@ -5,7 +5,8 @@ 'scope:mart', 'engine:duckdb', 'engine:postgres', - 'engine:databricks_spark' + 'engine:databricks_spark', + 'engine:bigquery' ], ) }} diff --git a/examples/basic_demo/models/staging/users_clean.ff.sql b/examples/basic_demo/models/staging/users_clean.ff.sql index 3795e73..b91afbb 100644 --- a/examples/basic_demo/models/staging/users_clean.ff.sql +++ b/examples/basic_demo/models/staging/users_clean.ff.sql @@ -5,7 +5,8 @@ 'scope:staging', 'engine:duckdb', 'engine:postgres', - 'engine:databricks_spark' + 'engine:databricks_spark', + 'engine:bigquery' ], ) }} diff --git a/examples/basic_demo/profiles.yml b/examples/basic_demo/profiles.yml index 514028f..f0d88bc 100644 --- a/examples/basic_demo/profiles.yml +++ b/examples/basic_demo/profiles.yml @@ -25,3 +25,21 @@ dev_databricks: spark.hadoop.datanucleus.schema.autoCreateAll: "true" spark.hadoop.javax.jdo.option.ConnectionDriverName: "org.apache.derby.jdbc.EmbeddedDriver" spark.driver.extraJavaOptions: "-Dderby.stream.error.file={{ project_dir() }}/.local/derby.log" + +dev_bigquery_bigframes: + engine: bigquery + bigquery: + project: "{{ env('FF_BQ_PROJECT') }}" + dataset: "{{ env('FF_BQ_DATASET', 'basic_demo') }}" + location: "{{ env('FF_BQ_LOCATION', 'EU') }}" + use_bigframes: true + # allow_create_dataset: true # uncomment to auto-create dataset on first run + +dev_bigquery_pandas: + engine: bigquery + bigquery: + project: "{{ env('FF_BQ_PROJECT') }}" + dataset: "{{ env('FF_BQ_DATASET', 'basic_demo') }}" + location: "{{ env('FF_BQ_LOCATION', 'EU') }}" + use_bigframes: false + # allow_create_dataset: true # uncomment to auto-create dataset on first run diff --git a/examples/cache_demo/.env.dev_bigquery_bigframes b/examples/cache_demo/.env.dev_bigquery_bigframes new file mode 100644 index 0000000..cc90396 --- /dev/null +++ b/examples/cache_demo/.env.dev_bigquery_bigframes @@ -0,0 +1,14 @@ +# BigQuery profile for the cache demo (BigFrames client) +FF_BQ_PROJECT=fft-basic-demo +FF_BQ_DATASET=cache_demo +FF_BQ_LOCATION=EU + +# Path to service account JSON key (or rely on gcloud / workload identity) +GOOGLE_APPLICATION_CREDENTIALS=../secrets/fft-bigquery-demo-key.json + +# HTTP cache knobs (optional; mirror duckdb defaults) +FF_HTTP_ALLOWED_DOMAINS=jsonplaceholder.typicode.com +FF_HTTP_CACHE_DIR=.local/http-cache +FF_HTTP_MAX_RPS=5 +FF_HTTP_MAX_RETRIES=2 +FF_HTTP_TIMEOUT=10 diff --git a/examples/cache_demo/.env.dev_bigquery_pandas b/examples/cache_demo/.env.dev_bigquery_pandas new file mode 100644 index 0000000..cc94b83 --- /dev/null +++ b/examples/cache_demo/.env.dev_bigquery_pandas @@ -0,0 +1,14 @@ +# BigQuery profile for the cache demo (pandas client) +FF_BQ_PROJECT=fft-basic-demo +FF_BQ_DATASET=cache_demo +FF_BQ_LOCATION=EU + +# Path to service account JSON key (or rely on gcloud / workload identity) +GOOGLE_APPLICATION_CREDENTIALS=../secrets/fft-bigquery-demo-key.json + +# HTTP cache knobs (optional; mirror duckdb defaults) +FF_HTTP_ALLOWED_DOMAINS=jsonplaceholder.typicode.com +FF_HTTP_CACHE_DIR=.local/http-cache +FF_HTTP_MAX_RPS=5 +FF_HTTP_MAX_RETRIES=2 +FF_HTTP_TIMEOUT=10 diff --git a/examples/cache_demo/.env.dev_databricks b/examples/cache_demo/.env.dev_databricks new file mode 100644 index 0000000..3b1fbc4 --- /dev/null +++ b/examples/cache_demo/.env.dev_databricks @@ -0,0 +1,7 @@ +# Databricks/Spark profile for the cache demo (local Spark defaults; adjust for real DBR) +FF_SPARK_MASTER=local[*] +FF_SPARK_APP_NAME=cache_demo +# FF_DBR_DATABASE=cache_demo +# FF_DBR_WAREHOUSE_DIR=.local/spark_warehouse + +FF_DBR_ENABLE_HIVE=1 \ No newline at end of file diff --git a/examples/cache_demo/.env.dev_postgres b/examples/cache_demo/.env.dev_postgres new file mode 100644 index 0000000..df0bb72 --- /dev/null +++ b/examples/cache_demo/.env.dev_postgres @@ -0,0 +1,3 @@ +# Postgres profile for the cache demo (replace with your own connection string) +FF_PG_DSN=postgresql+psycopg://postgres:postgres@localhost:5432 +FF_PG_SCHEMA=cache_demo diff --git a/examples/cache_demo/Makefile b/examples/cache_demo/Makefile index da6aa4c..ad8c144 100644 --- a/examples/cache_demo/Makefile +++ b/examples/cache_demo/Makefile @@ -3,15 +3,58 @@ http_first http_offline http_cache_clear artifacts dag clean \ demo -ENGINE ?= duckdb -PROFILE_ENV = dev_duckdb +ENGINE ?= duckdb # duckdb | postgres | databricks_spark | bigquery +# BigQuery frame selector (pandas | bigframes) +BQ_FRAME ?= bigframes PROJECT ?= . UV ?= uv - -BASE_ENV = FFT_ACTIVE_ENV=$(PROFILE_ENV) +DB ?= .local/cache_demo.duckdb + +ifeq ($(ENGINE),duckdb) + PROFILE_ENV = dev_duckdb + ENGINE_TAG = engine:duckdb +endif +ifeq ($(ENGINE),postgres) + PROFILE_ENV = dev_postgres + ENGINE_TAG = engine:postgres +endif +ifeq ($(ENGINE),databricks_spark) + PROFILE_ENV = dev_databricks + ENGINE_TAG = engine:databricks_spark +endif +ifeq ($(ENGINE),bigquery) + ENGINE_TAG = engine:bigquery + ifeq ($(BQ_FRAME),pandas) + PROFILE_ENV = dev_bigquery_pandas + else + PROFILE_ENV = dev_bigquery_bigframes + endif +endif +ifndef PROFILE_ENV + $(error Unsupported ENGINE=$(ENGINE) - pick duckdb|postgres|databricks_spark|bigquery) +endif + +BASE_ENV = FFT_ACTIVE_ENV=$(PROFILE_ENV) FF_ENGINE=$(ENGINE) +ifeq ($(ENGINE),bigquery) + BASE_ENV := $(BASE_ENV) FF_ENGINE_VARIANT=$(BQ_FRAME) +endif RUN_ENV = $(BASE_ENV) -SELECT_ALL = --select tag:example:cache_demo +SELECT_ALL = --select tag:example:cache_demo --select tag:$(ENGINE_TAG) + +CLEAN_SCRIPT = ../_scripts/cleanup_env.py + +ifeq ($(ENGINE),duckdb) + CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine duckdb --env "$(PROFILE_ENV)" --project "$(PROJECT)" --duckdb-path "$(DB)" +else ifeq ($(ENGINE),postgres) + CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine postgres --env "$(PROFILE_ENV)" --project "$(PROJECT)" +else ifeq ($(ENGINE),databricks_spark) + CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine databricks_spark --env "$(PROFILE_ENV)" --project "$(PROJECT)" +else ifeq ($(ENGINE),bigquery) + CLEAN_CMD = env $(BASE_ENV) $(UV) run python $(CLEAN_SCRIPT) --engine bigquery --env "$(PROFILE_ENV)" --project "$(PROJECT)" +else + $(error Unsupported ENGINE=$(ENGINE) - pick duckdb|postgres|databricks_spark|bigquery) +endif seed: env $(BASE_ENV) $(UV) run fft seed "$(PROJECT)" --env $(PROFILE_ENV) @@ -61,6 +104,7 @@ dag: env $(RUN_ENV) $(UV) run fft dag "$(PROJECT)" --env $(PROFILE_ENV) $(SELECT_ALL) --html clean: + $(CLEAN_CMD) rm -rf .local cache_demo.duckdb site .fastflowtransform demo: clean diff --git a/examples/cache_demo/README.md b/examples/cache_demo/README.md index 9482057..3a55566 100644 --- a/examples/cache_demo/README.md +++ b/examples/cache_demo/README.md @@ -10,17 +10,23 @@ This demo shows: ## Quickstart ```bash +# pick your engine (duckdb, postgres, databricks_spark, or bigquery); defaults to duckdb +cp .env.dev_duckdb .env +# or: cp .env.dev_postgres .env (then edit DSN/schema) +# or: cp .env.dev_databricks .env +# or: cp .env.dev_bigquery_pandas .env # or .env.dev_bigquery_bigframes + cd examples/cache_demo -make cache_first # builds and writes cache -make cache_second # should SKIP everything -make change_sql # touch SQL β†’ mart rebuilds -make change_seed # add a seed row β†’ staging + mart rebuild -make change_env # FF_* env change β†’ full rebuild -make change_py # edit constant in py_constants.ff.py β†’ it rebuilds - -make http_first # warms HTTP cache -make http_offline # reuses HTTP cache without network -make http_cache_clear # clears HTTP response cache +make cache_first ENGINE=duckdb # builds and writes cache +make cache_second ENGINE=duckdb # should SKIP everything +make change_sql ENGINE=duckdb # touch SQL β†’ mart rebuilds +make change_seed ENGINE=duckdb # add a seed row β†’ staging + mart rebuild +make change_env ENGINE=duckdb # FF_* env change β†’ full rebuild +make change_py ENGINE=duckdb # edit constant in py_constants.ff.py β†’ it rebuilds + +make http_first ENGINE=duckdb # warms HTTP cache +make http_offline ENGINE=duckdb # reuses HTTP cache without network +make http_cache_clear # clears HTTP response cache Inspect: site/dag/index.html @@ -32,6 +38,10 @@ Code kopieren --- +To run everything on Postgres, set `ENGINE=postgres` and copy/edit `.env.dev_postgres`, e.g. `make demo ENGINE=postgres`. +To run on Databricks/Spark locally, set `ENGINE=databricks_spark` and copy/edit `.env.dev_databricks`, e.g. `make demo ENGINE=databricks_spark`. +To run on BigQuery, set `ENGINE=bigquery` and copy/edit `.env.dev_bigquery_pandas` (or `.env.dev_bigquery_bigframes`), e.g. `make demo ENGINE=bigquery BQ_FRAME=bigframes` (default) or `BQ_FRAME=pandas`. + ## What this demo proves (in a minute) - **Cache hit/skip:** `make cache_second` should skip everything (if nothing changed). diff --git a/examples/cache_demo/models/engines/bigquery/bigframes/http_users.ff.py b/examples/cache_demo/models/engines/bigquery/bigframes/http_users.ff.py new file mode 100644 index 0000000..9e2418e --- /dev/null +++ b/examples/cache_demo/models/engines/bigquery/bigframes/http_users.ff.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +import bigframes.pandas as bpd +from fastflowtransform import engine_model +from fastflowtransform.api.http import get_df + +BFDataFrame = bpd.DataFrame + + +@engine_model( + env_match={ + "FF_ENGINE": "bigquery", + "FF_ENGINE_VARIANT": "bigframes", + }, + name="http_users", + deps=["stg_users.ff"], # dependency for cache invalidation symmetry + meta={ + "materialized": "table", + "tags": ["example:cache_demo", "engine:bigquery"], + }, +) +def fetch(_: BFDataFrame) -> BFDataFrame: + df = get_df( + url="https://jsonplaceholder.typicode.com/users", + record_path=None, + normalize=True, + output="bigframes", + ) + return df.loc[:, ["id", "email", "username"]].rename(columns={"id": "api_user_id"}) # type: ignore[arg-type] diff --git a/examples/cache_demo/models/engines/bigquery/bigframes/py_constants.ff.py b/examples/cache_demo/models/engines/bigquery/bigframes/py_constants.ff.py new file mode 100644 index 0000000..8e11125 --- /dev/null +++ b/examples/cache_demo/models/engines/bigquery/bigframes/py_constants.ff.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import bigframes.pandas as bpd +from fastflowtransform import engine_model + +BFDataFrame = bpd.DataFrame + + +@engine_model( + env_match={ + "FF_ENGINE": "bigquery", + "FF_ENGINE_VARIANT": "bigframes", + }, + name="py_constants", + materialized="table", + tags=[ + "example:cache_demo", + "engine:bigquery", + ], +) +def build() -> BFDataFrame: + """BigQuery (BigFrames) version returning a BigFrames DataFrame.""" + return bpd.DataFrame([{"k": "answer", "v": 42}]) diff --git a/examples/cache_demo/models/engines/bigquery/pandas/http_users.ff.py b/examples/cache_demo/models/engines/bigquery/pandas/http_users.ff.py new file mode 100644 index 0000000..28f3671 --- /dev/null +++ b/examples/cache_demo/models/engines/bigquery/pandas/http_users.ff.py @@ -0,0 +1,25 @@ +from fastflowtransform import engine_model +from fastflowtransform.api.http import get_df +import pandas as pd + + +@engine_model( + env_match={ + "FF_ENGINE": "bigquery", + "FF_ENGINE_VARIANT": "pandas", + }, + name="http_users", + deps=["stg_users.ff"], # dependency for cache invalidation symmetry + meta={ + "materialized": "table", + "tags": ["example:cache_demo", "engine:bigquery"], + }, +) +def fetch(_: pd.DataFrame) -> pd.DataFrame: + df = get_df( + url="https://jsonplaceholder.typicode.com/users", + record_path=None, + normalize=True, + ) + cols = [c for c in df.columns if c in ("id", "email", "username")] + return df[cols].rename(columns={"id": "api_user_id"}) diff --git a/examples/cache_demo/models/engines/bigquery/pandas/py_constants.ff.py b/examples/cache_demo/models/engines/bigquery/pandas/py_constants.ff.py new file mode 100644 index 0000000..367507d --- /dev/null +++ b/examples/cache_demo/models/engines/bigquery/pandas/py_constants.ff.py @@ -0,0 +1,20 @@ +import pandas as pd + +from fastflowtransform import engine_model + + +@engine_model( + env_match={ + "FF_ENGINE": "bigquery", + "FF_ENGINE_VARIANT": "pandas", + }, + name="py_constants", + materialized="table", + tags=[ + "example:cache_demo", + "engine:bigquery", + ], +) +def build() -> pd.DataFrame: + """BigQuery (pandas) version returning a pandas DataFrame.""" + return pd.DataFrame([{"k": "answer", "v": 42}]) diff --git a/examples/cache_demo/models/engines/databricks_spark/http_users.ff.py b/examples/cache_demo/models/engines/databricks_spark/http_users.ff.py new file mode 100644 index 0000000..6f5467f --- /dev/null +++ b/examples/cache_demo/models/engines/databricks_spark/http_users.ff.py @@ -0,0 +1,24 @@ +from pyspark.sql import DataFrame +from pyspark.sql.functions import col + +from fastflowtransform import engine_model +from fastflowtransform.api.http import get_df + + +@engine_model( + only="databricks_spark", + name="http_users", + deps=["stg_users.ff"], # just to show a dependency; not used in code + meta={ + "materialized": "table", + "tags": ["example:cache_demo", "engine:databricks_spark"], + }, +) +def fetch(_: DataFrame) -> DataFrame: + df = get_df( + url="https://jsonplaceholder.typicode.com/users", + record_path=None, + normalize=True, + output="spark", + ) + return df.select(col("id").alias("api_user_id"), col("email"), col("username")) diff --git a/examples/cache_demo/models/engines/databricks_spark/py_constants.ff.py b/examples/cache_demo/models/engines/databricks_spark/py_constants.ff.py new file mode 100644 index 0000000..a8ef9f0 --- /dev/null +++ b/examples/cache_demo/models/engines/databricks_spark/py_constants.ff.py @@ -0,0 +1,18 @@ +from pyspark.sql import SparkSession + +from fastflowtransform import engine_model + + +@engine_model( + only="databricks_spark", + name="py_constants", + materialized="table", + tags=[ + "example:cache_demo", + "engine:databricks_spark", + ], +) +def build(): + """Spark version returning a Spark DataFrame.""" + spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate() + return spark.createDataFrame([{"k": "answer", "v": 42}]) diff --git a/examples/cache_demo/models/http/http_users.ff.py b/examples/cache_demo/models/engines/duckdb/http_users.ff.py similarity index 88% rename from examples/cache_demo/models/http/http_users.ff.py rename to examples/cache_demo/models/engines/duckdb/http_users.ff.py index bf1bf80..d835814 100644 --- a/examples/cache_demo/models/http/http_users.ff.py +++ b/examples/cache_demo/models/engines/duckdb/http_users.ff.py @@ -1,9 +1,10 @@ -from fastflowtransform import model +from fastflowtransform import engine_model from fastflowtransform.api.http import get_df import pandas as pd -@model( +@engine_model( + only="duckdb", name="http_users", deps=["stg_users.ff"], # just to show a dependency; not used in code meta={ diff --git a/examples/cache_demo/models/engines/duckdb/py_constants.ff.py b/examples/cache_demo/models/engines/duckdb/py_constants.ff.py new file mode 100644 index 0000000..999a763 --- /dev/null +++ b/examples/cache_demo/models/engines/duckdb/py_constants.ff.py @@ -0,0 +1,17 @@ +import pandas as pd + +from fastflowtransform import engine_model + + +@engine_model( + only="duckdb", + name="py_constants", + materialized="table", + tags=[ + "example:cache_demo", + "engine:duckdb", + ], +) +def build() -> pd.DataFrame: + """DuckDB/Postgres-friendly version using pandas.""" + return pd.DataFrame([{"k": "answer", "v": 42}]) diff --git a/examples/cache_demo/models/engines/postgres/http_users.ff.py b/examples/cache_demo/models/engines/postgres/http_users.ff.py new file mode 100644 index 0000000..df33a90 --- /dev/null +++ b/examples/cache_demo/models/engines/postgres/http_users.ff.py @@ -0,0 +1,22 @@ +from fastflowtransform import engine_model +from fastflowtransform.api.http import get_df +import pandas as pd + + +@engine_model( + only="postgres", + name="http_users", + deps=["stg_users.ff"], # just to show a dependency; not used in code + meta={ + "materialized": "table", + "tags": ["example:cache_demo", "engine:postgres"], + }, +) +def fetch(_: pd.DataFrame) -> pd.DataFrame: + df = get_df( + url="https://jsonplaceholder.typicode.com/users", + record_path=None, + normalize=True, + ) + cols = [c for c in df.columns if c in ("id", "email", "username")] + return df[cols].rename(columns={"id": "api_user_id"}) diff --git a/examples/cache_demo/models/engines/postgres/py_constants.ff.py b/examples/cache_demo/models/engines/postgres/py_constants.ff.py new file mode 100644 index 0000000..89bb2b1 --- /dev/null +++ b/examples/cache_demo/models/engines/postgres/py_constants.ff.py @@ -0,0 +1,17 @@ +import pandas as pd + +from fastflowtransform import engine_model + + +@engine_model( + only="postgres", + name="py_constants", + materialized="table", + tags=[ + "example:cache_demo", + "engine:postgres", + ], +) +def build() -> pd.DataFrame: + """Postgres version using pandas.""" + return pd.DataFrame([{"k": "answer", "v": 42}]) diff --git a/examples/cache_demo/models/marts/mart_user_orders.ff.sql b/examples/cache_demo/models/marts/mart_user_orders.ff.sql index 406aa61..b5aa71b 100644 --- a/examples/cache_demo/models/marts/mart_user_orders.ff.sql +++ b/examples/cache_demo/models/marts/mart_user_orders.ff.sql @@ -1,4 +1,4 @@ -{{ config(materialized='table', tags=['example:cache_demo','engine:duckdb']) }} +{{ config(materialized='table', tags=['example:cache_demo','engine:duckdb','engine:postgres','engine:databricks_spark','engine:bigquery']) }} with u as ( select user_id, email from {{ ref('stg_users.ff') }} ), diff --git a/examples/cache_demo/models/python/py_constants.ff.py b/examples/cache_demo/models/python/py_constants.ff.py deleted file mode 100644 index 5e22b7a..0000000 --- a/examples/cache_demo/models/python/py_constants.ff.py +++ /dev/null @@ -1,16 +0,0 @@ -from fastflowtransform import model -import pandas as pd - - -@model( - name="py_constants", - deps=[], # independent - meta={ - "materialized": "table", - "tags": ["example:cache_demo", "engine:duckdb"], - }, -) -def build() -> pd.DataFrame: - # Change this constant to trigger a fingerprint change for a pure Python model. - CONSTANT = 42 - return pd.DataFrame([{"k": "answer", "v": CONSTANT}]) diff --git a/examples/cache_demo/models/seeds_consumers/stg_orders.ff.sql b/examples/cache_demo/models/seeds_consumers/stg_orders.ff.sql index 9444fcc..156f624 100644 --- a/examples/cache_demo/models/seeds_consumers/stg_orders.ff.sql +++ b/examples/cache_demo/models/seeds_consumers/stg_orders.ff.sql @@ -1,6 +1,6 @@ -{{ config(materialized='view', tags=['example:cache_demo','engine:duckdb']) }} +{{ config(materialized='view', tags=['example:cache_demo','engine:duckdb','engine:postgres','engine:databricks_spark','engine:bigquery']) }} select cast(order_id as int) as order_id, cast(customer_id as int) as user_id, - cast(amount as double) as amount + cast(amount as decimal) as amount from {{ source('crm', 'orders') }}; diff --git a/examples/cache_demo/models/seeds_consumers/stg_users.ff.sql b/examples/cache_demo/models/seeds_consumers/stg_users.ff.sql index 5d55e6a..7862f7b 100644 --- a/examples/cache_demo/models/seeds_consumers/stg_users.ff.sql +++ b/examples/cache_demo/models/seeds_consumers/stg_users.ff.sql @@ -1,3 +1,3 @@ -{{ config(materialized='table', tags=['example:cache_demo','engine:duckdb']) }} +{{ config(materialized='table', tags=['example:cache_demo','engine:duckdb','engine:postgres','engine:databricks_spark','engine:bigquery']) }} select cast(id as int) as user_id, lower(email) as email from {{ source('crm', 'users') }}; diff --git a/examples/cache_demo/profiles.yml b/examples/cache_demo/profiles.yml index b65dc2f..fccfda5 100644 --- a/examples/cache_demo/profiles.yml +++ b/examples/cache_demo/profiles.yml @@ -2,3 +2,38 @@ dev_duckdb: engine: duckdb duckdb: path: "{{ env('FF_DUCKDB_PATH', '.local/cache_demo.duckdb') }}" + +dev_postgres: + engine: postgres + postgres: + dsn: "{{ env('FF_PG_DSN') }}" + db_schema: "{{ env('FF_PG_SCHEMA', 'public') }}" + +dev_databricks: + engine: databricks_spark + databricks_spark: + master: "{{ env('FF_SPARK_MASTER', 'local[*]') }}" + app_name: "{{ env('FF_SPARK_APP_NAME', 'cache_demo') }}" + warehouse_dir: "{{ project_dir() }}/{{ env('FF_DBR_WAREHOUSE_DIR', '.local/spark_warehouse') }}" + database: "{{ env('FF_DBR_DATABASE', 'cache_demo') }}" + extra_conf: + spark.sql.shuffle.partitions: "{{ env('SPARK_SQL_SHUFFLE_PARTITIONS', '8') }}" + spark.driver.extraJavaOptions: "-Dderby.stream.error.file={{ project_dir() }}/.local/derby.log" + +dev_bigquery_bigframes: + engine: bigquery + bigquery: + project: "{{ env('FF_BQ_PROJECT') }}" + dataset: "{{ env('FF_BQ_DATASET', 'cache_demo') }}" + location: "{{ env('FF_BQ_LOCATION', 'EU') }}" + use_bigframes: true + # allow_create_dataset: true # uncomment to auto-create dataset on first run + +dev_bigquery_pandas: + engine: bigquery + bigquery: + project: "{{ env('FF_BQ_PROJECT') }}" + dataset: "{{ env('FF_BQ_DATASET', 'cache_demo') }}" + location: "{{ env('FF_BQ_LOCATION', 'EU') }}" + use_bigframes: false + # allow_create_dataset: true # uncomment to auto-create dataset on first run diff --git a/examples/cache_demo/project.yml b/examples/cache_demo/project.yml index 25b996d..fda1476 100644 --- a/examples/cache_demo/project.yml +++ b/examples/cache_demo/project.yml @@ -5,16 +5,23 @@ vars: {} models: storage: - stg_users.ff: { path: ".local/duck/users", format: parquet } - stg_orders.ff: { path: ".local/duck/orders", format: parquet } - mart_user_orders.ff: { path: ".local/duck/mart_user_orders", format: parquet } - py_constants: { path: ".local/duck/py_constants", format: parquet } - http_users: { path: ".local/duck/http_users", format: parquet } + stg_users.ff: + path: ".local/spark/stg_users" + stg_orders.ff: + path: ".local/spark/stg_orders" + mart_user_orders.ff: + path: ".local/spark/mart_user_orders" + py_constants: + path: ".local/spark/py_constants" + http_users: + path: ".local/spark/http_users" seeds: storage: - seed_users: { path: ".local/duck/seed_users", format: parquet } - seed_orders: { path: ".local/duck/seed_orders", format: parquet } + seed_users: + path: ".local/spark/seed_users" + seed_orders: + path: ".local/spark/seed_orders" tests: - type: row_count_between diff --git a/examples/cache_demo/seeds/seed_users.csv b/examples/cache_demo/seeds/seed_users.csv index 5559381..d372d92 100644 --- a/examples/cache_demo/seeds/seed_users.csv +++ b/examples/cache_demo/seeds/seed_users.csv @@ -4,3 +4,61 @@ id,email 3,carol@example.com 4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com + +4,dan@example.com diff --git a/examples/cache_demo/site/dag/http_users.html b/examples/cache_demo/site/dag/http_users.html index 220718b..952bbf3 100644 --- a/examples/cache_demo/site/dag/http_users.html +++ b/examples/cache_demo/site/dag/http_users.html @@ -90,7 +90,7 @@

Metadata

Path
- /Users/markolekic/Dev/FlowForge/fastflowtransform/examples/cache_demo/models/http/http_users.ff.py + /Users/markolekic/Dev/FlowForge/fastflowtransform/examples/cache_demo/models/engines/databricks_spark/http_users.ff.py
@@ -112,6 +112,91 @@

Metadata

+
+

Columns

+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
NameTypeNullableDescriptionLineage
api_user_idbigint + + yes + + + + β€” + + + + unknown + +
emailstring + + yes + + + + β€” + + + + unknown + +
usernamestring + + yes + + + + β€” + + + + unknown + +
+ +