Skip to content

feat: validate terminal steps with unmaterialized stream output at build time#37

Merged
mvallebr merged 1 commit into
mainfrom
feat/validate-terminal-stream-output
Jun 18, 2026
Merged

feat: validate terminal steps with unmaterialized stream output at build time#37
mvallebr merged 1 commit into
mainfrom
feat/validate-terminal-stream-output

Conversation

@mvallebr

Copy link
Copy Markdown
Contributor

Summary

Reject at build time terminal steps whose output is a stream that will never be consumed. This is the root-cause fix for the SyncFanout._put_terminal data-loss/deadlock issue identified in #34.

The problem

A terminal step (no consumers, not hidden) whose output type is Iterator/Generator (sync) or AsyncIterator/AsyncGenerator (async) and whose output is not materialized (needs_materialize is False) produces a stream that nobody drains. At runtime this causes either:

  • deadlock — the bounded handoff pump blocks forever trying to deliver the EOF marker, or
  • silent data loss — the pump discards items via get_nowait() to force the EOF through

The validation

validate_no_unmaterialized_terminal_streams in dag_steps.py, called from build_dag after materialized_deps are computed. Uses the existing dag.needs_materialize() flag — no new materialization logic.

The exports step of a sub-pipeline is skipped because it will have consumers in the parent pipeline.

Bug fix: force_materialize propagation

force_materialize was not propagated through sub-pipeline expansion (dag_expansion.py). Fixed by adding force_materialize=sub_step.force_materialize to the expanded Step.

Test fixes

Existing tests with latent terminal Iterator-returning steps (the output was silently lost):

  • corpus complex_parallel_mixed: step3 changed from -> Generator to -> list[int]
  • observer runtime: lazy_consumer/passthrough changed from -> Iterator to -> None (drain input in body)
  • dag_materializer: added force_materialize=True (tests target UNRUNNABLE error, not terminal stream)
  • dag_expansion: added force_materialize=True (tests target DAG structure, not execution)
  • materializers_ergonomics: added force_materialize=True on sub-pipeline gen step
  • async_runner_basic: added force_materialize=True (test targets RuntimeError, not terminal stream)

New tests

6 new tests in test_dag_builder_validation.py covering:

  • terminal Iterator output → raises
  • terminal AsyncIterator output → raises
  • terminal Iterator + force_materialize=True → builds
  • terminal None output (consumer drains) → builds
  • non-terminal Iterator output → builds
  • exported Iterator in child pipeline → builds

Documentation

Updated DESIGN_PHILOSOPHY.md observer contract: when output is an Iterator/AsyncIterator, the observer receives it directly (via tee) and must consume it fully. An unconsumed iterator causes tee buffer growth and silently lost data. This is application responsibility.

Verification

  • ruff format / ruff check --select F401: clean
  • pytest tests/: 451 passed in 2.23s
  • Patch coverage: 100% (14/14 trackable lines)

…ild time

A terminal step (no consumers) whose output is Iterator/Generator or
AsyncIterator/AsyncGenerator and whose output is not materialized
(needs_materialize is False) produces a stream that nobody drains. At
runtime this causes either a deadlock (bounded handoff pump blocks
forever) or silent data loss (the pump discards items to deliver the
EOF marker).

Add validate_no_unmaterialized_terminal_streams in dag_steps.py, called
from build_dag after materialized_deps are computed. Uses the existing
needs_materialize flag -- no new materialization logic. Steps exported
via sub-pipeline exports are skipped because they will have consumers
in the parent. Also propagate force_materialize through sub-pipeline
expansion (was missing).

Fix existing tests with latent terminal Iterator-returning steps:
- corpus complex_parallel_mixed: step3 -> list[int]
- observer runtime: lazy_consumer/passthrough -> None (drain input)
- dag_materializer: add force_materialize=True (test targets UNRUNNABLE)
- dag_expansion: add force_materialize=True (test targets DAG structure)
- materializers_ergonomics: add force_materialize=True on sub-pipeline gen
- async_runner_basic: add force_materialize=True (test targets RuntimeError)

Document in DESIGN_PHILOSOPHY that observers receiving an Iterator must
consume it fully (application responsibility, causes tee buffer growth
otherwise).
@mvallebr mvallebr merged commit 43e5787 into main Jun 18, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant