Skip to content

next-insurance/data-tests-generator

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

data-tests-generator

Automating Data Engineering Data Quality with AI agents.

data-tests-generator is an end-to-end, open-source framework that uses AWS Bedrock agents — one per foundation model — to propose SQL data-quality tests for your tables, deduplicates the proposals with an overseer model, opens a GitHub Pull Request to append the new tests to a YAML file, and then runs them on a schedule via an Airflow DAG against Amazon Redshift.

The system is designed so that "one table per run" generates a handful of new, non-overlapping, human-reviewable tests that become DAG tasks the moment the PR is merged — no code changes required.


How it works (high-level)

                 ┌─────────────────────────────────────────────────────────┐
                 │  OpenMetadata: tables with descriptions (candidates)    │
                 └────────────────────┬────────────────────────────────────┘
                                      │ pick one random table
                                      ▼
     ┌────────────────────────────────────────────────────────────────────────┐
     │  Tool/main.py                                                          │
     │                                                                        │
     │   1. Fetch column descriptions from OpenMetadata                       │
     │   2. Build test-generation prompt with table metadata                  │
     │   3. For each Bedrock agent named "data-tests-generator-*":            │
     │        a. Invoke agent (SQL knowledge base grounds the query)          │
     │        b. Normalize free-form answer → strict JSON via Bedrock         │
     │           (test_name, test_description, test_query, expected_result)   │
     │   4. Dedupe collected tests via Bedrock                                │
     │      (signal = test_description; compares against existing YAML too)   │
     │   5. Open a GitHub PR appending the new rows to data-tests.yaml        │
     └────────────────────────────────────────────────────────────────────────┘
                                      │ PR merged
                                      ▼
     ┌────────────────────────────────────────────────────────────────────────┐
     │  Airflow DAG dag_data_tests                                            │
     │                                                                        │
     │   • One TaskGroup per table key in data-tests.yaml                     │
     │   • One PythonOperator per test → runs test_query on Redshift          │
     │   • Fails the task if the row count does not match expected_result     │
     │     (empty / not empty)                                                │
     │   • Failure callback publishes to an SNS topic                         │
     └────────────────────────────────────────────────────────────────────────┘

The generator keeps expected_result constrained to exactly **empty** or **not empty** — every test is a violation-finding query (healthy data → zero rows) or a required-presence query (healthy data → at least one row). No brittle numeric assertions.


Repository layout

.
├── Terraform/                    # AWS infrastructure (Bedrock agents, KB, SNS, IAM)
│   ├── agent.tf                  # One Bedrock agent per foundation model + aliases
│   ├── knowledge_base.tf         # Redshift-backed SQL Knowledge Base for agents
│   ├── knowledge_base_role.tf    # IAM role for the KB → Redshift / Secrets
│   ├── data.tf                   # Data sources (IAM, Redshift workgroup, model list)
│   ├── sns.tf                    # SNS topic + SSM parameter for failure alerts
│   ├── main.tf                   # Locals that fan out agents over models
│   ├── variables.tf              # Input variables (agents map, KB names, etc.)
│   ├── outputs.tf
│   ├── backend.tf                # S3 backend + assume-role provider
│   ├── versions.tf               # Pinned providers
│   ├── instructions/             # One .txt per agent template (system instruction)
│   │   └── data-tests-generator.txt
│   ├── scripts/
│   │   └── list_foundation_model_arns.py  # external data source (Bedrock CLI)
│   └── values/
│       └── dev-03.tfvars         # Example values file
│
├── Tool/                         # The generator CLI
│   ├── main.py                   # Orchestrates agents → dedupe → PR
│   ├── git_utils.py              # Thin GitHub REST helpers (PR, owner recommendation)
│   ├── open_metadata_utils.py    # Thin OpenMetadata client (table + columns)
│   └── prompts/
│       ├── test-generation-prompt.txt     # Sent to each Bedrock agent
│       ├── test-to-json-prompt.txt        # Free-form → structured JSON
│       └── deduplicate-tests-prompt.txt   # Cross-agent + YAML dedupe
│
└── Airflow/
    └── DAGs/
        └── dag_data_tests/
            ├── dag_data_tests.py      # Parse-time DAG generation from YAML
            ├── data-tests.yaml        # Source of truth (appended to by PRs)
            ├── helpers/helpers.py     # run_data_test: runs SQL, asserts row count
            └── README.md              # DAG-level docs (rendered as doc_md)

Components

1. Terraform — Terraform/

Provisions everything in AWS:

  • One Bedrock agent per foundation model. data.external.bedrock_foundation_model_arns calls aws bedrock list-inference-profiles (filtered to Amazon + Anthropic, ACTIVE, system-defined) and main.tf fans the agent template out over those ARNs. Each agent is named <template_key>-<short_model_id> (e.g. data-tests-generator-us-anthropic-claude-sonnet-4), which is exactly the prefix Tool/main.py looks for at runtime.
  • Aliases per agent (via the aliases map in var.agents), with a time_sleep between agent and alias creation so Bedrock finishes preparing.
  • A Redshift-backed SQL Knowledge Base (aws_bedrockagent_knowledge_base.redshift_staging, type = SQL / REDSHIFT, Serverless) and a REDSHIFT_METADATA data source, associated to every generated agent. An ingestion job is kicked off via local-exec so the KB is synced after apply.
  • IAM role for the KB (bedrock_knowledge_base_role) with Redshift Data / Serverless / Secrets Manager access.
  • An "overseer" agent reserved for multi-step tasks (e.g. deduping).
  • SNS topic bedrock-qg-notifications + email subscription, with its ARN stored at /bi/dev/quality-tests-sns-topic-arn in SSM so the Airflow DAG can publish on failure.

Key inputs (see variables.tf):

Variable Purpose
region, account_id, terraform_runtime_account_id Target AWS account & region
bedrock_agent_role_name Existing IAM role to be assumed by agents
agents Map of agent templates (description, idle TTL, aliases). One template → N concrete agents (one per foundation model). Requires instructions/<template_key>.txt
redshift_staging_* Workgroup / secret / database / KB name. KB is created only if these are set
sns_email_recipient Email subscribed to the failure SNS topic
tags Tags propagated to all resources

Terraform/backend.tf and Terraform/values/dev-03.tfvars intentionally ship with "..." placeholders for the S3 backend, assume-role ARN, Redshift names, etc. Fill these in for your environment before running terraform init / apply.

2. Generator CLI — Tool/

Tool/main.py is a single-shot script that produces one PR per invocation:

  1. Discover agents. Lists every Bedrock agent whose name starts with data-tests-generator- in us-west-2 and resolves its first PREPARED alias.
  2. Pick a table. Calls OpenMetadata (list_table_fqns_with_metadata) and chooses a random FQN that has either a table-level description or at least one column description. The redshift_prod_public.prod. prefix is stripped to build the YAML key.
  3. Build the prompt. Fetches the column list for that table and injects it into prompts/test-generation-prompt.txt at the <table_description> tag.
  4. Invoke each agent. Every agent sees the same prompt; the SQL KB grounds the test_query against real schema.
  5. Normalize to JSON. Each free-form agent response is passed through Bedrock (TEST_TO_JSON_MODEL_ID = us.anthropic.claude-sonnet-4-5-...) with JSON-schema structured output into:
 { test_name, test_description, test_query, expected_result ∈ {"empty","not empty"} }

Responses that self-admit failure (error, cannot, sample in the test name) are dropped. 6. Deduplicate. All proposed tests plus the existing rows for that table in data-tests.yaml (fetched from GitHub master) are sent to Bedrock with prompts/deduplicate-tests-prompt.txt. test_description is the primary signal; merged rows keep a comma-separated generating_agent_name. 7. Resolve owner. git_utils.recommend_pr_owner scans the last 500 commits of the target repo, keeps those whose diff mentions table_name_key, and recommends the most-frequent author (ties → most recent). 8. Open a PR. A new branch data-tests-generator/<table_key>-<timestamp> is created off master, the YAML is rewritten (preserving key order) with new entries stamped generated_at, generating_agent_name, owner, moved_to_pipeline: false, and a PR is opened against master.

Dependencies: boto3, requests, pyyaml. Python 3.10+ (uses list[dict] PEP 604 syntax).

Environment variables:

Variable Required Used for
AWS_PROFILE / default AWS creds yes Bedrock agent + runtime calls (region us-west-2)
OPEN_METADATA_API_KEY yes JWT for the OpenMetadata REST API
OPEN_METADATA_VERIFY_SSL no Set 0 to skip TLS verification (internal CAs)
GITHUB_TOKEN yes PR creation and owner lookup

Run:

# From the repo root
python Tool/main.py \
    --yaml-path Airflow/DAGs/dag_data_tests/data-tests.yaml

The --yaml-path flag is the path inside the target GitHub repository (configured via GitUtils.BASE_GITHUB_URL and the repo arg, default "bi") where data-tests.yaml lives.

Two things you will need to change in Tool/git_utils.py before using it against your own org:

  • BASE_GITHUB_URL = "https://api.github.com/repos/<your-organization>"
  • The default repo="bi" parameters, if your Airflow repo has another name.

Likewise, Tool/open_metadata_utils.py defaults OPEN_METADATA_BASE_URL_DEFAULT = "https://open-metadata.my-organization.io". Either edit the constant or pass base_url= when constructing OpenMetadataUtils.

3. Airflow DAG — Airflow/DAGs/dag_data_tests/

dag_data_tests.py parses data-tests.yaml at DAG-load time and builds the graph:

  • One TaskGroup per top-level YAML key (table FQN-style name).
  • One PythonOperator per test inside the group, calling helpers.run_data_test, which runs test_query via PostgresHook and raises AirflowException when the result set does not match expected_result.
  • All table groups fan out in parallel from start_dag (EmptyOperator); tasks within a group also run in parallel.
  • On failure, _on_failure_callback reads the SNS topic ARN from SSM (/bi/dev/quality-tests-sns-topic-arn) and publishes a structured message (dag id, task id, run id, first 500 chars of the exception).
  • Tests whose expected_result is missing or not exactly empty / not empty are skipped at parse time with a warning — they won't appear in the graph.
  • Redshift connection id defaults to redshift_<environment> where environment is an Airflow Variable.

This directory also ships its own README.md, which Airflow renders as the DAG's doc_md.

YAML schema (per test entry):

my_schema.my_table:
  - test_name: my_first_test
    test_description: my test description
    test_query: SELECT * FROM my_schema.my_table WHERE xyz IS NULL LIMIT 100
    expected_result: empty            # or "not empty"
    generated_at: '2026-04-06T09:11:35Z'
    generating_agent_name: data-tests-generator-us-amazon-nova-2-lite-v1-0
    owner: Jane Doe
    moved_to_pipeline: false

generated_at, generating_agent_name, owner, moved_to_pipeline are produced by the generator; only test_name, test_query, and expected_result affect DAG execution.


Getting started

Prerequisites

  • AWS account with Bedrock model access (Anthropic / Amazon inference profiles) in your chosen region.
  • Amazon Redshift Serverless (staging) for the Knowledge Base.
  • An OpenMetadata deployment with tables catalogued and a JWT API key.
  • A GitHub repository hosting your Airflow DAGs (the one containing Airflow/DAGs/dag_data_tests/data-tests.yaml) and a personal access token with repo scope.
  • Terraform =1.7.5, AWS provider =6.31.0 (pinned in versions.tf), AWS CLI v2 for list-inference-profiles.
  • Python 3.10+ with boto3, requests, pyyaml.

1. Deploy infrastructure

cd Terraform
# edit backend.tf (S3 backend + assume-role ARN) for your setup
# edit values/dev-03.tfvars with your account_id, Redshift names, etc.
terraform init
terraform apply -var-file=values/dev-03.tfvars

After apply, terraform output bedrock_agents lists every agent that was created (one per foundation model).

2. Configure the generator

Edit Tool/git_utils.py and Tool/open_metadata_utils.py to point at your organization's GitHub and OpenMetadata.

Export credentials:

export AWS_PROFILE=<your-profile>           # must have Bedrock access in us-west-2
export OPEN_METADATA_API_KEY=<jwt>
export GITHUB_TOKEN=<github-pat>

3. Generate tests

python Tool/main.py

Each run selects one table, invokes every agent, dedupes the results, and opens a single PR in your Airflow repo. Typical output is a short list of generated and merged tests and a link to the PR.

4. Merge and let Airflow do the rest

Once the PR is merged into master, dag_data_tests picks up the new rows on its next parse cycle and the new tasks show up under the table's TaskGroup. Failures are published to the SNS topic / emailed to sns_email_recipient.


Design choices

  • One agent per model, not one agent per table. Different models produce different ideas; Bedrock deduplication keeps only the distinct ones. This is cheap horizontal variety for free.
  • Violation-row pattern, not aggregates. Both the agent prompt and the JSON-normalization prompt explicitly rewrite COUNT(*) tests into SELECT ... WHERE <failure> patterns so empty / not empty is always a meaningful assertion about a row set, not a scalar.
  • Deduplication compares against the YAML file, not just in-run output. This prevents the generator from endlessly re-proposing the same test on subsequent runs.
  • PR-based, human-in-the-loop. Nothing is auto-merged. owner defaults to the most-frequent recent author for table_name_key so the right person is tagged for review.
  • Parse-time DAG expansion. Adding or removing tests is a YAML edit — no DAG code changes — and a lint-level expected_result check keeps malformed entries out of the schedule.

Limitations & things to tweak for your org

  • GitHub URL and default repo name are hard-coded to a <your-organization> / "bi" scheme in Tool/git_utils.py — edit before use.
  • OpenMetadata base URL defaults to https://open-metadata.my-organization.io in Tool/open_metadata_utils.py.
  • Redshift FQN prefix stripping (PREFIX_FOR_OPEN_METADATA = "redshift_prod_public.prod." in Tool/main.py) matches one specific catalogue convention.
  • Bedrock region is pinned to us-west-2 in Tool/main.py; change REGION if you deploy elsewhere.
  • The overseer agent is created but not yet invoked by the CLI — dedupe currently goes through the Claude Sonnet model directly.
  • Terraform/backend.tf ships with "..." placeholders; terraform init won't succeed until you fill them in.

About

Automating DE Data Quality with AI agents

Resources

License

Code of conduct

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors