Skip to content

Add row_filter support to BronzeDataflowSpec and SilverDataflowSpec #303

@ravi-databricks

Description

@ravi-databricks

Summary

Add support for Unity Catalog row-level security (RLS) via the row_filter parameter
of dp.create_streaming_table() and dp.table(), configurable through the onboarding
YAML/JSON spec.

Motivation

The SDP Python API exposes a row_filter parameter on create_streaming_table() and
@dp.table() that attaches a UC row filter function to the published table. There is
currently no way to set this through the DataflowSpec metadata — users who need RLS on
their bronze or silver tables must hand-code the pipeline, defeating the purpose of the
metadata-driven framework.

Note: row_filter is different from silver_where_clause.
where_clause filters rows at ingest time (ETL).
row_filter restricts row visibility at query time (governance). Both can coexist.

Proposed Changes

1. Onboarding YAML/JSON — new optional fields

- data_flow_id: '100'
  ...
  bronze_table: customers
  bronze_row_filter: "region = session_context('user_region')"   # NEW

  silver_table: customers
  silver_where_clause:
    - "status = 'ACTIVE'"
  silver_row_filter: "department = session_context('user_dept')" # NEW

2. dataflow_spec.py — add field to dataclasses

@dataclass
class BronzeDataflowSpec:
    ...
    rowFilter: Optional[str] = None  # NEW

@dataclass
class SilverDataflowSpec:
    ...
    rowFilter: Optional[str] = None  # NEW

3. dataflow_spec.py — parse from onboarding row

bronze_spec = BronzeDataflowSpec(
    ...,
    rowFilter=onboarding_row.get("bronze_row_filter", None),  # NEW
)

silver_spec = SilverDataflowSpec(
    ...,
    rowFilter=onboarding_row.get("silver_row_filter", None),  # NEW
)

4. dataflow_spec.py — Delta table schema

# Add to StructType for bronze_dataflowspec and silver_dataflowspec tables
StructField("rowFilter", StringType(), True),  # NEW

5. dataflow_pipeline.py — pass through at all write call sites

def _get_row_filter(self):
    return getattr(self.dataflowSpec, 'rowFilter', None) if self.uc_enabled else None

Pass row_filter=self._get_row_filter() to:

  • dp.create_streaming_table() in cdc_apply_changes()
  • dp.create_streaming_table() in write_bronze_with_dqe()
  • dp.table() in write_bronze() / write_silver()

Metadata

Metadata

Labels

enhancementNew feature or request
No fields configured for Feature.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions