diff --git a/docs/DESIGN_PHILOSOPHY.md b/docs/DESIGN_PHILOSOPHY.md index 8f2c1d5..e492121 100644 --- a/docs/DESIGN_PHILOSOPHY.md +++ b/docs/DESIGN_PHILOSOPHY.md @@ -237,6 +237,7 @@ For uneven multi-stream each-mode, exhaustion is modeled with `None` padding rat - in mixed lazy/eager fan-out, observing the producer must not force all consumers eager - when a stream fails under `OnError.CONTINUE`, observers see the valid prefix that was already produced - observer behavior is a public contract and is covered by corpus/spec tests, not only unit tests +- when the output is an `Iterator`/`AsyncIterator`, the observer receives the iterator directly (via `tee`) and **must consume it fully**; an unconsumed iterator causes memory growth (the `tee` buffer retains all items) and the observed data is silently lost. This is application responsibility, not framework responsibility. ### 3.15. PipelineStopException with Context **Decision:** `PipelineStopException` carries `step_name` and `cause` (the original exception). It uses `raise ... from` to preserve the full stack trace. diff --git a/synaflow/core/dag_builder.py b/synaflow/core/dag_builder.py index 456b67e..4ed689b 100644 --- a/synaflow/core/dag_builder.py +++ b/synaflow/core/dag_builder.py @@ -29,6 +29,7 @@ from synaflow.core.dag_steps import ( validate_and_compile_step, validate_no_duplicate_base_datasets, + validate_no_unmaterialized_terminal_streams, validate_step_is_callable, validate_sync_async_consistency, validate_unique_step_name, @@ -302,6 +303,7 @@ def build_dag( is_default_factory: bool = False, error_materializer_factory: Any = None, pipeline_observers: list[Observer] | None = None, + exports: str | None = None, ) -> Dag: if error_materializer_factory is None: error_materializer_factory = log_error_materializer_factory @@ -331,6 +333,8 @@ def build_dag( check_circular_dependencies(dag_obj, pipeline_name) + validate_no_unmaterialized_terminal_streams(dag_obj, pipeline_name, exports) + validate_sync_async_consistency( dag_obj, pipeline_name, diff --git a/synaflow/core/dag_expansion.py b/synaflow/core/dag_expansion.py index c7d79cf..5ed63d8 100644 --- a/synaflow/core/dag_expansion.py +++ b/synaflow/core/dag_expansion.py @@ -146,6 +146,7 @@ def _expand_sub_pipeline_steps( params=sub_step.params, materializer=materializer, error_materializer=error_materializer, + force_materialize=sub_step.force_materialize, description=sub_step.description, pipeline=sub_pipeline.name, parent_pipeline=new_parent_chain, diff --git a/synaflow/core/dag_steps.py b/synaflow/core/dag_steps.py index a79450c..c6ee508 100644 --- a/synaflow/core/dag_steps.py +++ b/synaflow/core/dag_steps.py @@ -227,3 +227,41 @@ def _validate_max_in_flight(step: Step, pipeline_name: str) -> None: f"Pipeline '{pipeline_name}': step '{step.name}' " f"max_in_flight must be >= 1, got {value}" ) + + +def validate_no_unmaterialized_terminal_streams( + dag: Dag, pipeline_name: str, exports: str | None = None +) -> None: + """Reject terminal steps whose output is a stream that will never be consumed. + + A terminal step (no consumers, not hidden) whose output type is + ``Iterator``/``Generator`` (sync) or ``AsyncIterator``/``AsyncGenerator`` + (async) and whose output is not materialized (``needs_materialize`` is + False) produces a stream that nobody drains. At runtime this causes + either a deadlock (bounded handoff pump blocks forever) or silent data + loss (the pump discards items to deliver the EOF marker). + + The fix is to set ``force_materialize=True`` on the step, return a + materialized type (``list``, ``None``, etc.), or add a downstream + consumer. + + The ``exports`` step is skipped because it will have a consumer when the + pipeline is included in a parent. + """ + for step_name, node in dag.steps.items(): + if not dag.is_terminal_step(step_name): + continue + if step_name == exports: + continue + if node.output is None: + continue + if not (is_sync_stream_type(node.output) or is_async_stream_type(node.output)): + continue + if dag.needs_materialize(step_name): + continue + raise ValueError( + f"Pipeline '{pipeline_name}': terminal step '{step_name}' " + f"returns a stream type ({node.output}) but its output is never " + f"materialized. Use force_materialize=True, return a materialized " + f"type (e.g. list), or add a downstream consumer." + ) diff --git a/synaflow/core/definition.py b/synaflow/core/definition.py index c9bb0a4..63606a7 100644 --- a/synaflow/core/definition.py +++ b/synaflow/core/definition.py @@ -62,6 +62,7 @@ def __post_init__(self) -> None: is_default_factory=(self.materializer is None), error_materializer_factory=self.error_materializer, pipeline_observers=self.observers, + exports=self.exports, ) self.requires_sync_runner = self.dag.requires_sync_runner self.requires_async_runner = self.dag.requires_async_runner diff --git a/tests/core/test_dag_builder_validation.py b/tests/core/test_dag_builder_validation.py index 3b0d0c7..a2748e4 100644 --- a/tests/core/test_dag_builder_validation.py +++ b/tests/core/test_dag_builder_validation.py @@ -188,3 +188,129 @@ def transform(item: int) -> int: assert "items" in p.dag.steps["transform"].deps assert p.dag.steps["transform"].dataset_param_names == {"items": "item"} assert p.dag.consumers_of("items") == ["transform"] + + +# --------------------------------------------------------------------------- +# Terminal stream validation +# --------------------------------------------------------------------------- + + +def test_given_terminal_step_returning_iterator_when_not_materialized_then_raises(): + from collections.abc import Iterator + + class P(NamedTuple): + items: list[int] + + def gen(items: list[int]) -> Iterator[int]: + yield from items + + with pytest.raises(ValueError, match="terminal step 'gen' returns a stream type"): + pipeline( + name="test", + params=P, + steps=[step("gen", fn=gen)], + ) + + +def test_given_terminal_step_returning_async_iterator_when_not_materialized_then_raises(): + from collections.abc import AsyncIterator + + class P(NamedTuple): + items: list[int] + + async def gen(items: list[int]) -> AsyncIterator[int]: + for item in items: + yield item + + with pytest.raises(ValueError, match="terminal step 'gen' returns a stream type"): + pipeline( + name="test", + params=P, + steps=[step("gen", fn=gen)], + ) + + +def test_given_terminal_step_returning_iterator_when_force_materialize_then_builds(): + from collections.abc import Iterator + + class P(NamedTuple): + items: list[int] + + def gen(items: list[int]) -> Iterator[int]: + yield from items + + p = pipeline( + name="test", + params=P, + steps=[step("gen", fn=gen, force_materialize=True)], + ) + + assert "gen" in p.dag.steps + + +def test_given_terminal_step_returning_none_when_not_materialized_then_builds(): + from collections.abc import Iterator + + class P(NamedTuple): + items: list[int] + + def gen(items: list[int]) -> Iterator[int]: + yield from items + + def consume(gen: Iterator[int]) -> None: + for _item in gen: + pass + + p = pipeline( + name="test", + params=P, + steps=[ + step("gen", fn=gen), + step("consume", fn=consume), + ], + ) + + assert "consume" in p.dag.steps + + +def test_given_non_terminal_step_returning_iterator_when_not_materialized_then_builds(): + from collections.abc import Iterator + + class P(NamedTuple): + items: list[int] + + def gen(items: list[int]) -> Iterator[int]: + yield from items + + def transform(gen: Iterator[int]) -> list[int]: + return list(gen) + + p = pipeline( + name="test", + params=P, + steps=[ + step("gen", fn=gen), + step("transform", fn=transform), + ], + ) + + assert "gen" in p.dag.steps + + +def test_given_exported_step_returning_iterator_when_in_child_pipeline_then_builds(): + from collections.abc import Iterator + + class ChildParams(NamedTuple): + items: list[int] + + def emit(items: list[int]) -> Iterator[int]: + yield from items + + child = pipeline( + name="Child", + params=ChildParams, + exports="emit", + steps=[step("emit", fn=emit)], + ) + + assert "emit" in child.dag.steps diff --git a/tests/core/test_dag_expansion.py b/tests/core/test_dag_expansion.py index cfbd37e..ef73c79 100644 --- a/tests/core/test_dag_expansion.py +++ b/tests/core/test_dag_expansion.py @@ -219,7 +219,13 @@ def emit(items: list[int]) -> Iterator[int]: exports="emit", materializer=pipeline_mat, steps=[ - step("emit", fn=emit, materializer=step_mat, error_materializer=step_err) + step( + "emit", + fn=emit, + materializer=step_mat, + error_materializer=step_err, + force_materialize=True, + ) ], ) @@ -296,7 +302,7 @@ def emit(items: list[int]) -> Iterator[int]: name="Child", params=ChildParams, exports="emit", - steps=[step("emit", fn=emit, max_in_flight=30)], + steps=[step("emit", fn=emit, max_in_flight=30, force_materialize=True)], ) class ParentParams(NamedTuple): @@ -325,7 +331,7 @@ def emit(items: list[int]) -> Iterator[int]: name="Child", params=ChildParams, exports="emit", - steps=[step("emit", fn=emit)], + steps=[step("emit", fn=emit, force_materialize=True)], ) class ParentParams(NamedTuple): diff --git a/tests/core/test_dag_materializer.py b/tests/core/test_dag_materializer.py index c627181..bc4eeac 100644 --- a/tests/core/test_dag_materializer.py +++ b/tests/core/test_dag_materializer.py @@ -24,7 +24,7 @@ def gen() -> Generator[int, None, None]: name="test", params=P, materializer=factory, - steps=[step("items", fn=gen)], + steps=[step("items", fn=gen, force_materialize=True)], ) @@ -43,7 +43,7 @@ async def async_gen() -> AsyncGenerator[int, None]: name="test", params=P, materializer=factory, - steps=[step("items", fn=async_gen)], + steps=[step("items", fn=async_gen, force_materialize=True)], ) @@ -58,5 +58,12 @@ async def async_gen() -> AsyncGenerator[int, None]: pipeline( name="test", params=P, - steps=[step("items", fn=async_gen, materializer=to_materializer(sync_mat))], + steps=[ + step( + "items", + fn=async_gen, + materializer=to_materializer(sync_mat), + force_materialize=True, + ) + ], ) diff --git a/tests/execution/async_engine/corpus/complex_parallel_mixed.py b/tests/execution/async_engine/corpus/complex_parallel_mixed.py index 54e6c97..84753b7 100644 --- a/tests/execution/async_engine/corpus/complex_parallel_mixed.py +++ b/tests/execution/async_engine/corpus/complex_parallel_mixed.py @@ -18,9 +18,8 @@ async def step2(step1: AsyncIterator[int]) -> AsyncGenerator[int, None, None]: yield x * 10 -async def step3(step2: AsyncIterator[int]) -> AsyncGenerator[int, None, None]: - async for x in step2: - yield x + 1 +async def step3(step2: AsyncIterator[int]) -> list[int]: + return [x + 1 async for x in step2] async def step4(step1: AsyncIterator[int]) -> AsyncGenerator[int, None, None]: @@ -86,7 +85,7 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None: }, "step3": { "deps": {"step2": "Stream[int]"}, - "output": "Stream[int, None, None]", + "output": "list[int]", "fn": "step3", "on_error": "continue", "mode": "all", diff --git a/tests/execution/async_engine/test_async_runner_basic.py b/tests/execution/async_engine/test_async_runner_basic.py index 3f03355..7c9540f 100644 --- a/tests/execution/async_engine/test_async_runner_basic.py +++ b/tests/execution/async_engine/test_async_runner_basic.py @@ -103,7 +103,9 @@ def s1(items: list[int]) -> Iterator[int]: for i in items: yield i - my_pipeline = pipeline(name="t", params=P, steps=[step("s1", fn=s1)]) + my_pipeline = pipeline( + name="t", params=P, steps=[step("s1", fn=s1, force_materialize=True)] + ) with pytest.raises(RuntimeError, match="must be executed with run"): await async_run(my_pipeline, params=P()) diff --git a/tests/execution/async_engine/test_observer_runtime.py b/tests/execution/async_engine/test_observer_runtime.py index 979f904..4d5cfb7 100644 --- a/tests/execution/async_engine/test_observer_runtime.py +++ b/tests/execution/async_engine/test_observer_runtime.py @@ -359,7 +359,7 @@ def gen(values: list[int]) -> Iterator[int]: p = pipeline( name="p", params=Params, - steps=[step("gen", fn=gen)], + steps=[step("gen", fn=gen, force_materialize=True)], ) executor = AsyncPipelineExecutor( @@ -594,8 +594,9 @@ async def test_given_lazy_consumer_when_no_materialization_then_no_materializati def gen(values: list[int]) -> Iterator[int]: yield from values - def passthrough(gen: Iterator[int]) -> Iterator[int]: - yield from gen + def passthrough(gen: Iterator[int]) -> None: + for _item in gen: + pass p = pipeline( name="p", @@ -677,8 +678,9 @@ async def test_given_observers_when_lazy_step_then_output_remains_iterator(): def gen(values: list[int]) -> Iter[int]: yield from values - def lazy_consumer(gen: Iter[int]) -> Iter[int]: - yield from gen + def lazy_consumer(gen: Iter[int]) -> None: + for _item in gen: + pass p = pipeline( name="p", @@ -698,8 +700,9 @@ async def test_given_materialization_observer_when_lazy_step_then_materializatio def gen(values: list[int]) -> Iterator[int]: yield from values - def lazy_consumer(gen: Iterator[int]) -> Iterator[int]: - yield from gen + def lazy_consumer(gen: Iterator[int]) -> None: + for _item in gen: + pass p = pipeline( name="p", @@ -727,8 +730,9 @@ async def test_given_step_output_observer_and_bounded_lazy_stream_then_observer_ def gen(values: list[int]) -> Iterator[int]: yield from values - def lazy_consumer(gen: Iterator[int]) -> Iterator[int]: - yield from gen + def lazy_consumer(gen: Iterator[int]) -> None: + for _item in gen: + pass p = pipeline( name="p", diff --git a/tests/execution/sync_engine/corpus/complex_parallel_mixed.py b/tests/execution/sync_engine/corpus/complex_parallel_mixed.py index 8b041e8..733b0bd 100644 --- a/tests/execution/sync_engine/corpus/complex_parallel_mixed.py +++ b/tests/execution/sync_engine/corpus/complex_parallel_mixed.py @@ -18,9 +18,8 @@ def step2(step1: Iterator[int]) -> Generator[int, None, None]: yield x * 10 -def step3(step2: Iterator[int]) -> Generator[int, None, None]: - for x in step2: - yield x + 1 +def step3(step2: Iterator[int]) -> list[int]: + return [x + 1 for x in step2] def step4(step1: Iterator[int]) -> Generator[int, None, None]: @@ -86,7 +85,7 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None: }, "step3": { "deps": {"step2": "Stream[int]"}, - "output": "Stream[int, None, None]", + "output": "list[int]", "fn": "step3", "on_error": "continue", "mode": "all", diff --git a/tests/execution/sync_engine/test_observer_runtime.py b/tests/execution/sync_engine/test_observer_runtime.py index f670aef..a1c6291 100644 --- a/tests/execution/sync_engine/test_observer_runtime.py +++ b/tests/execution/sync_engine/test_observer_runtime.py @@ -345,7 +345,7 @@ def gen(values: list[int]) -> Iterator[int]: p = pipeline( name="p", params=Params, - steps=[step("gen", fn=gen)], + steps=[step("gen", fn=gen, force_materialize=True)], ) executor = PipelineExecutor( @@ -405,8 +405,9 @@ def test_given_step_output_observer_and_bounded_lazy_stream_then_observer_does_n def gen(values: list[int]) -> Iterator[int]: yield from values - def lazy_consumer(gen: Iterator[int]) -> Iterator[int]: - yield from gen + def lazy_consumer(gen: Iterator[int]) -> None: + for _item in gen: + pass p = pipeline( name="p", @@ -619,8 +620,9 @@ def test_given_lazy_consumer_when_no_materialization_then_no_materialization_eve def gen(values: list[int]) -> Iterator[int]: yield from values - def passthrough(gen: Iterator[int]) -> Iterator[int]: - yield from gen + def passthrough(gen: Iterator[int]) -> None: + for _item in gen: + pass p = pipeline( name="p", @@ -699,8 +701,9 @@ def test_given_observers_when_lazy_step_then_output_remains_iterator(): def gen(values: list[int]) -> Iter[int]: yield from values - def lazy_consumer(gen: Iter[int]) -> Iter[int]: - yield from gen + def lazy_consumer(gen: Iter[int]) -> None: + for _item in gen: + pass p = pipeline( name="p", @@ -719,8 +722,9 @@ def test_given_materialization_observer_when_lazy_step_then_materialization_not_ def gen(values: list[int]) -> Iterator[int]: yield from values - def lazy_consumer(gen: Iterator[int]) -> Iterator[int]: - yield from gen + def lazy_consumer(gen: Iterator[int]) -> None: + for _item in gen: + pass p = pipeline( name="p", diff --git a/tests/execution/test_materializers_ergonomics.py b/tests/execution/test_materializers_ergonomics.py index cfba5f2..d7f21bb 100644 --- a/tests/execution/test_materializers_ergonomics.py +++ b/tests/execution/test_materializers_ergonomics.py @@ -731,7 +731,7 @@ def my_pipeline_err(ctx): sub_pipe = pipeline( name="sub_pipe", params=P, - steps=[step("gen", fn=sub_gen)], + steps=[step("gen", fn=sub_gen, force_materialize=True)], exports="gen", error_materializer=my_pipeline_err, ) @@ -768,7 +768,14 @@ def my_step_err(ctx): sub_pipe = pipeline( name="sub_pipe", params=P, - steps=[step("gen", fn=sub_gen, error_materializer=my_step_err)], + steps=[ + step( + "gen", + fn=sub_gen, + error_materializer=my_step_err, + force_materialize=True, + ) + ], exports="gen", error_materializer=my_pipeline_err, )