Skip to content

CDC (insert/update/delete) flows could not use Data Quality Expectations (DQE) #265

@rsleedbx

Description

@rsleedbx

CDC (insert/update/delete) flows could not use Data Quality Expectations (DQE)

Problem

When a table was configured for CDC (insert/update/delete via bronze_cdc_apply_changes or silver_cdc_apply_changes), it could not also use Data Quality Expectations (DQE). The pipeline treated the two as mutually exclusive:

  • If dataQualityExpectations was set, the pipeline used the DQE path and never ran cdc_apply_changes.
  • If cdcApplyChanges was set (and DQE was omitted so the CDC path ran), the table had no expectations applied.

So for CDC tables (e.g. Lakeflow Connect intpk with insert/update/delete), we had to omit bronze_data_quality_expectations_json_prod and silver_data_quality_expectations_json_prod and could not run expectations on those flows.

Root cause

In src/dataflow_pipeline.py, write_layer_table() branched as follows:

  • When dataQualityExpectations was set → call write_layer_with_dqe() and return (never reached the CDC branch).
  • When cdcApplyChanges was set and DQE was not set → run cdc_apply_changes().

There was no path that ran both DQE and CDC for the same table.

Fix

Support a DQE-then-CDC path when both dataQualityExpectations and cdcApplyChanges are set for a table:

  1. New combined path
    When both are set, call a new method write_layer_with_dqe_then_cdc() instead of returning after DQE.

  2. write_layer_with_dqe_then_cdc()

    • Creates a DQE table with name <table>_dq (e.g. intpk_dq) by calling write_layer_with_dqe(dqe_only=True, suffix="_dq"). Only rows that pass expectations are written to this table.
    • Then runs cdc_apply_changes(source_table=dq_table_name) so that create_auto_cdc_flow uses the _dq table as the stream source and merges into the final table.
  3. _get_target_table_info(suffix=None)
    Extended to accept an optional suffix so the DQE table can be targeted at <table>_dq (e.g. intpk_dq).

  4. write_layer_with_dqe(dqe_only=False, suffix=None)

    • dqe_only=True and suffix="_dq": only the DQE table is created (with the suffixed name); cdc_apply_changes is not invoked inside this method.
    • Otherwise, behavior is unchanged (either DQE-only or CDC-only when only one is set).
  5. cdc_apply_changes(source_table=None)
    Optional source_table argument. When provided (e.g. "intpk_dq"), create_auto_cdc_flow(..., source=source_table, ...) uses that table as the stream source instead of the raw view.

Resulting flow when both DQE and CDC are set:

View (raw) → DQE table <table>_dq (rows passing expectations) → create_auto_cdc_flow from <table>_dq → final table <table>.

Usage

For a table that should have both CDC and DQE, set in onboarding (or equivalent config):

  • bronze_cdc_apply_changes / silver_cdc_apply_changes (as before), and
  • bronze_data_quality_expectations_json_prod / silver_data_quality_expectations_json_prod (as before).

The pipeline detects both and uses the DQE-then-CDC path. An intermediate table <table>_dq appears in the same schema; the final table is the CDC target.

References

  • Demo: demo/launch_lfc_demo.py, demo/lfcdemo-database.ipynb (intpk now has both DQE and CDC).
  • Docs: docs/content/demo/LakeflowConnectDemo.md — section CDC and DQE together.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request
No fields configured for Feature.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions