Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/DESIGN_PHILOSOPHY.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ For uneven multi-stream each-mode, exhaustion is modeled with `None` padding rat
- in mixed lazy/eager fan-out, observing the producer must not force all consumers eager
- when a stream fails under `OnError.CONTINUE`, observers see the valid prefix that was already produced
- observer behavior is a public contract and is covered by corpus/spec tests, not only unit tests
- when the output is an `Iterator`/`AsyncIterator`, the observer receives the iterator directly (via `tee`) and **must consume it fully**; an unconsumed iterator causes memory growth (the `tee` buffer retains all items) and the observed data is silently lost. This is application responsibility, not framework responsibility.

### 3.15. PipelineStopException with Context
**Decision:** `PipelineStopException` carries `step_name` and `cause` (the original exception). It uses `raise ... from` to preserve the full stack trace.
Expand Down
4 changes: 4 additions & 0 deletions synaflow/core/dag_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from synaflow.core.dag_steps import (
validate_and_compile_step,
validate_no_duplicate_base_datasets,
validate_no_unmaterialized_terminal_streams,
validate_step_is_callable,
validate_sync_async_consistency,
validate_unique_step_name,
Expand Down Expand Up @@ -302,6 +303,7 @@ def build_dag(
is_default_factory: bool = False,
error_materializer_factory: Any = None,
pipeline_observers: list[Observer] | None = None,
exports: str | None = None,
) -> Dag:
if error_materializer_factory is None:
error_materializer_factory = log_error_materializer_factory
Expand Down Expand Up @@ -331,6 +333,8 @@ def build_dag(

check_circular_dependencies(dag_obj, pipeline_name)

validate_no_unmaterialized_terminal_streams(dag_obj, pipeline_name, exports)

validate_sync_async_consistency(
dag_obj,
pipeline_name,
Expand Down
1 change: 1 addition & 0 deletions synaflow/core/dag_expansion.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def _expand_sub_pipeline_steps(
params=sub_step.params,
materializer=materializer,
error_materializer=error_materializer,
force_materialize=sub_step.force_materialize,
description=sub_step.description,
pipeline=sub_pipeline.name,
parent_pipeline=new_parent_chain,
Expand Down
38 changes: 38 additions & 0 deletions synaflow/core/dag_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,41 @@ def _validate_max_in_flight(step: Step, pipeline_name: str) -> None:
f"Pipeline '{pipeline_name}': step '{step.name}' "
f"max_in_flight must be >= 1, got {value}"
)


def validate_no_unmaterialized_terminal_streams(
dag: Dag, pipeline_name: str, exports: str | None = None
) -> None:
"""Reject terminal steps whose output is a stream that will never be consumed.

A terminal step (no consumers, not hidden) whose output type is
``Iterator``/``Generator`` (sync) or ``AsyncIterator``/``AsyncGenerator``
(async) and whose output is not materialized (``needs_materialize`` is
False) produces a stream that nobody drains. At runtime this causes
either a deadlock (bounded handoff pump blocks forever) or silent data
loss (the pump discards items to deliver the EOF marker).

The fix is to set ``force_materialize=True`` on the step, return a
materialized type (``list``, ``None``, etc.), or add a downstream
consumer.

The ``exports`` step is skipped because it will have a consumer when the
pipeline is included in a parent.
"""
for step_name, node in dag.steps.items():
if not dag.is_terminal_step(step_name):
continue
if step_name == exports:
continue
if node.output is None:
continue
if not (is_sync_stream_type(node.output) or is_async_stream_type(node.output)):
continue
if dag.needs_materialize(step_name):
continue
raise ValueError(
f"Pipeline '{pipeline_name}': terminal step '{step_name}' "
f"returns a stream type ({node.output}) but its output is never "
f"materialized. Use force_materialize=True, return a materialized "
f"type (e.g. list), or add a downstream consumer."
)
1 change: 1 addition & 0 deletions synaflow/core/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __post_init__(self) -> None:
is_default_factory=(self.materializer is None),
error_materializer_factory=self.error_materializer,
pipeline_observers=self.observers,
exports=self.exports,
)
self.requires_sync_runner = self.dag.requires_sync_runner
self.requires_async_runner = self.dag.requires_async_runner
Expand Down
126 changes: 126 additions & 0 deletions tests/core/test_dag_builder_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,129 @@ def transform(item: int) -> int:
assert "items" in p.dag.steps["transform"].deps
assert p.dag.steps["transform"].dataset_param_names == {"items": "item"}
assert p.dag.consumers_of("items") == ["transform"]


# ---------------------------------------------------------------------------
# Terminal stream validation
# ---------------------------------------------------------------------------


def test_given_terminal_step_returning_iterator_when_not_materialized_then_raises():
from collections.abc import Iterator

class P(NamedTuple):
items: list[int]

def gen(items: list[int]) -> Iterator[int]:
yield from items

with pytest.raises(ValueError, match="terminal step 'gen' returns a stream type"):
pipeline(
name="test",
params=P,
steps=[step("gen", fn=gen)],
)


def test_given_terminal_step_returning_async_iterator_when_not_materialized_then_raises():
from collections.abc import AsyncIterator

class P(NamedTuple):
items: list[int]

async def gen(items: list[int]) -> AsyncIterator[int]:
for item in items:
yield item

with pytest.raises(ValueError, match="terminal step 'gen' returns a stream type"):
pipeline(
name="test",
params=P,
steps=[step("gen", fn=gen)],
)


def test_given_terminal_step_returning_iterator_when_force_materialize_then_builds():
from collections.abc import Iterator

class P(NamedTuple):
items: list[int]

def gen(items: list[int]) -> Iterator[int]:
yield from items

p = pipeline(
name="test",
params=P,
steps=[step("gen", fn=gen, force_materialize=True)],
)

assert "gen" in p.dag.steps


def test_given_terminal_step_returning_none_when_not_materialized_then_builds():
from collections.abc import Iterator

class P(NamedTuple):
items: list[int]

def gen(items: list[int]) -> Iterator[int]:
yield from items

def consume(gen: Iterator[int]) -> None:
for _item in gen:
pass

p = pipeline(
name="test",
params=P,
steps=[
step("gen", fn=gen),
step("consume", fn=consume),
],
)

assert "consume" in p.dag.steps


def test_given_non_terminal_step_returning_iterator_when_not_materialized_then_builds():
from collections.abc import Iterator

class P(NamedTuple):
items: list[int]

def gen(items: list[int]) -> Iterator[int]:
yield from items

def transform(gen: Iterator[int]) -> list[int]:
return list(gen)

p = pipeline(
name="test",
params=P,
steps=[
step("gen", fn=gen),
step("transform", fn=transform),
],
)

assert "gen" in p.dag.steps


def test_given_exported_step_returning_iterator_when_in_child_pipeline_then_builds():
from collections.abc import Iterator

class ChildParams(NamedTuple):
items: list[int]

def emit(items: list[int]) -> Iterator[int]:
yield from items

child = pipeline(
name="Child",
params=ChildParams,
exports="emit",
steps=[step("emit", fn=emit)],
)

assert "emit" in child.dag.steps
12 changes: 9 additions & 3 deletions tests/core/test_dag_expansion.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,13 @@ def emit(items: list[int]) -> Iterator[int]:
exports="emit",
materializer=pipeline_mat,
steps=[
step("emit", fn=emit, materializer=step_mat, error_materializer=step_err)
step(
"emit",
fn=emit,
materializer=step_mat,
error_materializer=step_err,
force_materialize=True,
)
],
)

Expand Down Expand Up @@ -296,7 +302,7 @@ def emit(items: list[int]) -> Iterator[int]:
name="Child",
params=ChildParams,
exports="emit",
steps=[step("emit", fn=emit, max_in_flight=30)],
steps=[step("emit", fn=emit, max_in_flight=30, force_materialize=True)],
)

class ParentParams(NamedTuple):
Expand Down Expand Up @@ -325,7 +331,7 @@ def emit(items: list[int]) -> Iterator[int]:
name="Child",
params=ChildParams,
exports="emit",
steps=[step("emit", fn=emit)],
steps=[step("emit", fn=emit, force_materialize=True)],
)

class ParentParams(NamedTuple):
Expand Down
13 changes: 10 additions & 3 deletions tests/core/test_dag_materializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def gen() -> Generator[int, None, None]:
name="test",
params=P,
materializer=factory,
steps=[step("items", fn=gen)],
steps=[step("items", fn=gen, force_materialize=True)],
)


Expand All @@ -43,7 +43,7 @@ async def async_gen() -> AsyncGenerator[int, None]:
name="test",
params=P,
materializer=factory,
steps=[step("items", fn=async_gen)],
steps=[step("items", fn=async_gen, force_materialize=True)],
)


Expand All @@ -58,5 +58,12 @@ async def async_gen() -> AsyncGenerator[int, None]:
pipeline(
name="test",
params=P,
steps=[step("items", fn=async_gen, materializer=to_materializer(sync_mat))],
steps=[
step(
"items",
fn=async_gen,
materializer=to_materializer(sync_mat),
force_materialize=True,
)
],
)
7 changes: 3 additions & 4 deletions tests/execution/async_engine/corpus/complex_parallel_mixed.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ async def step2(step1: AsyncIterator[int]) -> AsyncGenerator[int, None, None]:
yield x * 10


async def step3(step2: AsyncIterator[int]) -> AsyncGenerator[int, None, None]:
async for x in step2:
yield x + 1
async def step3(step2: AsyncIterator[int]) -> list[int]:
return [x + 1 async for x in step2]


async def step4(step1: AsyncIterator[int]) -> AsyncGenerator[int, None, None]:
Expand Down Expand Up @@ -86,7 +85,7 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None:
},
"step3": {
"deps": {"step2": "Stream[int]"},
"output": "Stream[int, None, None]",
"output": "list[int]",
"fn": "step3",
"on_error": "continue",
"mode": "all",
Expand Down
4 changes: 3 additions & 1 deletion tests/execution/async_engine/test_async_runner_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ def s1(items: list[int]) -> Iterator[int]:
for i in items:
yield i

my_pipeline = pipeline(name="t", params=P, steps=[step("s1", fn=s1)])
my_pipeline = pipeline(
name="t", params=P, steps=[step("s1", fn=s1, force_materialize=True)]
)

with pytest.raises(RuntimeError, match="must be executed with run"):
await async_run(my_pipeline, params=P())
Expand Down
22 changes: 13 additions & 9 deletions tests/execution/async_engine/test_observer_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def gen(values: list[int]) -> Iterator[int]:
p = pipeline(
name="p",
params=Params,
steps=[step("gen", fn=gen)],
steps=[step("gen", fn=gen, force_materialize=True)],
)

executor = AsyncPipelineExecutor(
Expand Down Expand Up @@ -594,8 +594,9 @@ async def test_given_lazy_consumer_when_no_materialization_then_no_materializati
def gen(values: list[int]) -> Iterator[int]:
yield from values

def passthrough(gen: Iterator[int]) -> Iterator[int]:
yield from gen
def passthrough(gen: Iterator[int]) -> None:
for _item in gen:
pass

p = pipeline(
name="p",
Expand Down Expand Up @@ -677,8 +678,9 @@ async def test_given_observers_when_lazy_step_then_output_remains_iterator():
def gen(values: list[int]) -> Iter[int]:
yield from values

def lazy_consumer(gen: Iter[int]) -> Iter[int]:
yield from gen
def lazy_consumer(gen: Iter[int]) -> None:
for _item in gen:
pass

p = pipeline(
name="p",
Expand All @@ -698,8 +700,9 @@ async def test_given_materialization_observer_when_lazy_step_then_materializatio
def gen(values: list[int]) -> Iterator[int]:
yield from values

def lazy_consumer(gen: Iterator[int]) -> Iterator[int]:
yield from gen
def lazy_consumer(gen: Iterator[int]) -> None:
for _item in gen:
pass

p = pipeline(
name="p",
Expand Down Expand Up @@ -727,8 +730,9 @@ async def test_given_step_output_observer_and_bounded_lazy_stream_then_observer_
def gen(values: list[int]) -> Iterator[int]:
yield from values

def lazy_consumer(gen: Iterator[int]) -> Iterator[int]:
yield from gen
def lazy_consumer(gen: Iterator[int]) -> None:
for _item in gen:
pass

p = pipeline(
name="p",
Expand Down
7 changes: 3 additions & 4 deletions tests/execution/sync_engine/corpus/complex_parallel_mixed.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ def step2(step1: Iterator[int]) -> Generator[int, None, None]:
yield x * 10


def step3(step2: Iterator[int]) -> Generator[int, None, None]:
for x in step2:
yield x + 1
def step3(step2: Iterator[int]) -> list[int]:
return [x + 1 for x in step2]


def step4(step1: Iterator[int]) -> Generator[int, None, None]:
Expand Down Expand Up @@ -86,7 +85,7 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None:
},
"step3": {
"deps": {"step2": "Stream[int]"},
"output": "Stream[int, None, None]",
"output": "list[int]",
"fn": "step3",
"on_error": "continue",
"mode": "all",
Expand Down
Loading
Loading