From 646486a4e03b24d482da407ffd0c146f965927ea Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:38:23 +0100 Subject: [PATCH 01/17] refactor(core): catch specific exceptions (NameError, TypeError) in type hint evaluation --- synaflow/core/dag_dependencies.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synaflow/core/dag_dependencies.py b/synaflow/core/dag_dependencies.py index 60ca2bc..406971e 100644 --- a/synaflow/core/dag_dependencies.py +++ b/synaflow/core/dag_dependencies.py @@ -23,7 +23,7 @@ def initialize_parameters(params: type[NamedTuple]) -> dict[str, DagNode]: hints = {} try: hints = typing.get_type_hints(params) - except Exception: + except (NameError, TypeError): hints = getattr(params, "__annotations__", {}) for field in getattr(params, "_fields", []): tp = hints.get(field) @@ -34,7 +34,7 @@ def initialize_parameters(params: type[NamedTuple]) -> dict[str, DagNode]: def get_safe_type_hints(fn: Any) -> dict[str, Any]: try: return typing.get_type_hints(fn, include_extras=True) - except Exception: + except (NameError, TypeError): return {} From b5f5d985e699aa3f2bb16301be10ffa826513fc1 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:42:58 +0100 Subject: [PATCH 02/17] test(core): add PEP 563 runtime execution and materializer tests --- tests/core/test_pep563_annotations.py | 60 +++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/tests/core/test_pep563_annotations.py b/tests/core/test_pep563_annotations.py index 8579633..71cba02 100644 --- a/tests/core/test_pep563_annotations.py +++ b/tests/core/test_pep563_annotations.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections.abc import Iterator from typing import NamedTuple from synaflow import pipeline, step @@ -52,3 +53,62 @@ class ParamsWithUndefined(NamedTuple): isinstance(nodes["x"].output, ForwardRef) or nodes["x"].output == "SomeUndefinedType" ) + + +def test_given_future_annotations_when_run_then_executes_successfully(run_pipeline): + class Params(NamedTuple): + name: str = "" + + captured = None + + def my_step(name: str) -> str: + nonlocal captured + captured = name + return name + + p = pipeline( + name="test_future_annotations_run", + params=Params, + steps=[ + step("my_step", fn=my_step), + ], + ) + run_pipeline(p, Params(name="hello")) + assert captured == "hello" + + +def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object( + run_pipeline, +): + resolved_item_type = None + + def my_factory(ctx): + nonlocal resolved_item_type + resolved_item_type = ctx.item_type + return lambda it: list(it) + + class Params(NamedTuple): + pass + + def my_step() -> Iterator[str]: + yield "a" + yield "b" + + captured = None + + def sink(my_step: list[str]) -> list[str]: + nonlocal captured + captured = my_step + return my_step + + p = pipeline( + name="test_future_annotations_mat", + params=Params, + steps=[ + step("my_step", fn=my_step, materializer=my_factory), + step("sink", fn=sink), + ], + ) + run_pipeline(p, Params()) + assert captured == ["a", "b"] + assert resolved_item_type == Iterator[str] From d471e363b83140c3680a9ab168acdc7e4c6d51a5 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:48:19 +0100 Subject: [PATCH 03/17] test(core): add custom types and Future compatibility checks to type_compatibility tests --- tests/core/test_is_type_compatible.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/core/test_is_type_compatible.py b/tests/core/test_is_type_compatible.py index 6cc3958..e0ff300 100644 --- a/tests/core/test_is_type_compatible.py +++ b/tests/core/test_is_type_compatible.py @@ -1,9 +1,21 @@ from collections.abc import Generator, Iterator +from dataclasses import dataclass +from concurrent.futures import Future from typing import Any import pytest from synaflow.core.type_compatibility import is_type_compatible +@dataclass +class MyDataclass: + x: int + + +@dataclass +class OtherDataclass: + x: int + + @pytest.mark.parametrize( "producer, consumer, expected", [ @@ -23,6 +35,15 @@ # Mismatched types should still fail (list[int], list[str], False), (list[dict], list[int], True), + # Custom/specific type compatibility tests + (MyDataclass, MyDataclass, True), + (MyDataclass, OtherDataclass, False), + (list[MyDataclass], list[MyDataclass], True), + (list[MyDataclass], list[OtherDataclass], False), + (tuple[int, str], tuple[int, str], True), + (tuple[int, str], tuple[str, int], False), + (Future, Future, True), + (list[Future], list[Future], True), ], ) def test_given_bare_containers_when_checking_compatibility_then_returns_expected( From 5d35fa72cbb6f6c919996dd8a0d05895855e93ae Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:49:44 +0100 Subject: [PATCH 04/17] refactor(tests): move pep 563 runtime tests from core to execution module --- tests/core/test_pep563_annotations.py | 59 ------------------------ tests/execution/test_pep563_runtime.py | 64 ++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 59 deletions(-) create mode 100644 tests/execution/test_pep563_runtime.py diff --git a/tests/core/test_pep563_annotations.py b/tests/core/test_pep563_annotations.py index 71cba02..847cafa 100644 --- a/tests/core/test_pep563_annotations.py +++ b/tests/core/test_pep563_annotations.py @@ -53,62 +53,3 @@ class ParamsWithUndefined(NamedTuple): isinstance(nodes["x"].output, ForwardRef) or nodes["x"].output == "SomeUndefinedType" ) - - -def test_given_future_annotations_when_run_then_executes_successfully(run_pipeline): - class Params(NamedTuple): - name: str = "" - - captured = None - - def my_step(name: str) -> str: - nonlocal captured - captured = name - return name - - p = pipeline( - name="test_future_annotations_run", - params=Params, - steps=[ - step("my_step", fn=my_step), - ], - ) - run_pipeline(p, Params(name="hello")) - assert captured == "hello" - - -def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object( - run_pipeline, -): - resolved_item_type = None - - def my_factory(ctx): - nonlocal resolved_item_type - resolved_item_type = ctx.item_type - return lambda it: list(it) - - class Params(NamedTuple): - pass - - def my_step() -> Iterator[str]: - yield "a" - yield "b" - - captured = None - - def sink(my_step: list[str]) -> list[str]: - nonlocal captured - captured = my_step - return my_step - - p = pipeline( - name="test_future_annotations_mat", - params=Params, - steps=[ - step("my_step", fn=my_step, materializer=my_factory), - step("sink", fn=sink), - ], - ) - run_pipeline(p, Params()) - assert captured == ["a", "b"] - assert resolved_item_type == Iterator[str] diff --git a/tests/execution/test_pep563_runtime.py b/tests/execution/test_pep563_runtime.py new file mode 100644 index 0000000..5b15e55 --- /dev/null +++ b/tests/execution/test_pep563_runtime.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from collections.abc import Iterator +from typing import NamedTuple +from synaflow import pipeline, step + + +def test_given_future_annotations_when_run_then_executes_successfully(run_pipeline): + class Params(NamedTuple): + name: str = "" + + captured = None + + def my_step(name: str) -> str: + nonlocal captured + captured = name + return name + + p = pipeline( + name="test_future_annotations_run", + params=Params, + steps=[ + step("my_step", fn=my_step), + ], + ) + run_pipeline(p, Params(name="hello")) + assert captured == "hello" + + +def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object( + run_pipeline, +): + resolved_item_type = None + + def my_factory(ctx): + nonlocal resolved_item_type + resolved_item_type = ctx.item_type + return lambda it: list(it) + + class Params(NamedTuple): + pass + + def my_step() -> Iterator[str]: + yield "a" + yield "b" + + captured = None + + def sink(my_step: list[str]) -> list[str]: + nonlocal captured + captured = my_step + return my_step + + p = pipeline( + name="test_future_annotations_mat", + params=Params, + steps=[ + step("my_step", fn=my_step, materializer=my_factory), + step("sink", fn=sink), + ], + ) + run_pipeline(p, Params()) + assert captured == ["a", "b"] + assert resolved_item_type == Iterator[str] From af1f7d1d2f99beaa9e937ab1fb753529b4a45201 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:50:29 +0100 Subject: [PATCH 05/17] chore(tests): remove unused Iterator import from core test file --- tests/core/test_pep563_annotations.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/core/test_pep563_annotations.py b/tests/core/test_pep563_annotations.py index 847cafa..8579633 100644 --- a/tests/core/test_pep563_annotations.py +++ b/tests/core/test_pep563_annotations.py @@ -1,6 +1,5 @@ from __future__ import annotations -from collections.abc import Iterator from typing import NamedTuple from synaflow import pipeline, step From 77b60708af953dcee9e9cb96c89bbbc0a9dffbe7 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:53:46 +0100 Subject: [PATCH 06/17] test: maintain parity by moving PEP 563 runtime tests to sync and async engines --- .../async_engine/test_async_runner_pep563.py | 66 +++++++++++++++++++ .../test_runner_pep563.py} | 11 ++-- 2 files changed, 71 insertions(+), 6 deletions(-) create mode 100644 tests/execution/async_engine/test_async_runner_pep563.py rename tests/execution/{test_pep563_runtime.py => sync_engine/test_runner_pep563.py} (88%) diff --git a/tests/execution/async_engine/test_async_runner_pep563.py b/tests/execution/async_engine/test_async_runner_pep563.py new file mode 100644 index 0000000..eeb0c97 --- /dev/null +++ b/tests/execution/async_engine/test_async_runner_pep563.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import NamedTuple +from synaflow import async_run, pipeline, step + + +async def test_given_future_annotations_when_run_then_executes_successfully(): + class Params(NamedTuple): + name: str = "" + + captured = None + + def my_step(name: str) -> str: + nonlocal captured + captured = name + return name + + p = pipeline( + name="test_future_annotations_run", + params=Params, + steps=[ + step("my_step", fn=my_step), + ], + ) + await async_run(p, Params(name="hello")) + assert captured == "hello" + + +async def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object(): + resolved_item_type = None + + def my_factory(ctx): + nonlocal resolved_item_type + resolved_item_type = ctx.item_type + + async def mat(it): + return [x async for x in it] + + return mat + + class Params(NamedTuple): + pass + + async def my_step() -> AsyncIterator[str]: + yield "a" + yield "b" + + captured = None + + def sink(my_step: list[str]) -> list[str]: + nonlocal captured + captured = my_step + return my_step + + p = pipeline( + name="test_future_annotations_mat", + params=Params, + steps=[ + step("my_step", fn=my_step, materializer=my_factory), + step("sink", fn=sink), + ], + ) + await async_run(p, Params()) + assert captured == ["a", "b"] + assert resolved_item_type == AsyncIterator[str] diff --git a/tests/execution/test_pep563_runtime.py b/tests/execution/sync_engine/test_runner_pep563.py similarity index 88% rename from tests/execution/test_pep563_runtime.py rename to tests/execution/sync_engine/test_runner_pep563.py index 5b15e55..23bff11 100644 --- a/tests/execution/test_pep563_runtime.py +++ b/tests/execution/sync_engine/test_runner_pep563.py @@ -3,9 +3,10 @@ from collections.abc import Iterator from typing import NamedTuple from synaflow import pipeline, step +from synaflow.execution.sync_engine.executor import run as sync_run -def test_given_future_annotations_when_run_then_executes_successfully(run_pipeline): +def test_given_future_annotations_when_run_then_executes_successfully(): class Params(NamedTuple): name: str = "" @@ -23,13 +24,11 @@ def my_step(name: str) -> str: step("my_step", fn=my_step), ], ) - run_pipeline(p, Params(name="hello")) + sync_run(p, Params(name="hello")) assert captured == "hello" -def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object( - run_pipeline, -): +def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object(): resolved_item_type = None def my_factory(ctx): @@ -59,6 +58,6 @@ def sink(my_step: list[str]) -> list[str]: step("sink", fn=sink), ], ) - run_pipeline(p, Params()) + sync_run(p, Params()) assert captured == ["a", "b"] assert resolved_item_type == Iterator[str] From fd84941bc15e808d1fdad65254b6b158ec9927a0 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:00:19 +0100 Subject: [PATCH 07/17] feat: only require custom materializer for custom types when needs_materialize is True --- synaflow/core/dag_builder.py | 22 +++++++----- tests/core/test_dag_builder_materializer.py | 32 +++++++++++++++++ .../test_async_runner_materialization.py | 36 +++++++++++++++++++ .../test_runner_materialization.py | 36 +++++++++++++++++++ 4 files changed, 118 insertions(+), 8 deletions(-) diff --git a/synaflow/core/dag_builder.py b/synaflow/core/dag_builder.py index a0fe8e9..849b8d6 100644 --- a/synaflow/core/dag_builder.py +++ b/synaflow/core/dag_builder.py @@ -219,13 +219,19 @@ def _resolve_materializers( and is_iterable_type(node.output) and node.materializer is memory_materializer_factory ): - inner = get_inner_type(node.output) - if inner is not None and not _is_builtin_type(inner): - raise ValueError( - f"Node '{name}': output item type '{inner}' requires a custom" - " materializer. Provide a step-level materializer or a" - " pipeline-level materializer." - ) + needs_materialize = ( + node.on_error == OnError.STOP + or node.force_materialize + or any(name in consumer.materialized_deps for consumer in dag.values()) + ) + if needs_materialize: + inner = get_inner_type(node.output) + if inner is not None and not _is_builtin_type(inner): + raise ValueError( + f"Node '{name}': output item type '{inner}' requires a custom" + " materializer. Provide a step-level materializer or a" + " pipeline-level materializer." + ) def _compute_materialized_deps(dag: dict[str, DagNode]) -> None: @@ -321,12 +327,12 @@ def build_dag( params, pipeline_obs_resolved, ) + _compute_materialized_deps(dag) _resolve_materializers( dag, memory_materializer_factory, error_materializer_factory, ) - _compute_materialized_deps(dag) dag_obj = _finalize_dag( pipeline_name, dag, diff --git a/tests/core/test_dag_builder_materializer.py b/tests/core/test_dag_builder_materializer.py index ea7f0f7..b6c6041 100644 --- a/tests/core/test_dag_builder_materializer.py +++ b/tests/core/test_dag_builder_materializer.py @@ -167,6 +167,38 @@ def consumer(producer: list[Row]) -> int: ) +def test_given_no_custom_materializer_and_non_builtin_inner_type_when_not_materialized_then_dag_builds(): + from dataclasses import dataclass + from collections.abc import Iterator + from synaflow import pipeline, step + + @dataclass + class Row: + id: int + name: str + + class Params(NamedTuple): + pass + + def producer() -> Iterator[Row]: + yield Row(id=1, name="a") + + # Consumed as a scalar (EACH mode) so needs_materialize is False + def consumer(producer: Row) -> None: + pass + + p = pipeline( + name="test_validation_no_mat", + params=Params, + steps=[ + step("producer", fn=producer), + step("consumer", fn=consumer), + ], + ) + assert p.dag is not None + assert p.dag.needs_materialize("producer") is False + + def test_given_step_materializer_when_non_builtin_inner_type_used_then_dag_builds(): from dataclasses import dataclass from collections.abc import Iterator diff --git a/tests/execution/async_engine/test_async_runner_materialization.py b/tests/execution/async_engine/test_async_runner_materialization.py index 956a69c..fccba18 100644 --- a/tests/execution/async_engine/test_async_runner_materialization.py +++ b/tests/execution/async_engine/test_async_runner_materialization.py @@ -858,3 +858,39 @@ async def async_list(async_iterator) -> list: await async_run(my_pipeline, params=P()) assert seen == [Row(id=1, name="alice"), Row(id=2, name="bob")] + + +async def test_given_no_custom_materializer_and_non_builtin_type_when_not_materialized_then_executes_successfully(): + from dataclasses import dataclass + from collections.abc import AsyncGenerator + from synaflow import async_run + + @dataclass + class Row: + id: int + name: str + + class P(NamedTuple): + pass + + async def producer() -> AsyncGenerator[Row, None]: + yield Row(id=1, name="alice") + yield Row(id=2, name="bob") + + seen = [] + + # Consumed as a scalar (EACH mode) so needs_materialize is False + async def consumer(producer: Row): + seen.append(producer) + + my_pipeline = pipeline( + name="test_no_materializer_custom_type_not_materialized", + params=P, + steps=[ + step("producer", fn=producer), + step("consumer", fn=consumer), + ], + ) + + await async_run(my_pipeline, params=P()) + assert seen == [Row(id=1, name="alice"), Row(id=2, name="bob")] diff --git a/tests/execution/sync_engine/test_runner_materialization.py b/tests/execution/sync_engine/test_runner_materialization.py index ece914a..88c8724 100644 --- a/tests/execution/sync_engine/test_runner_materialization.py +++ b/tests/execution/sync_engine/test_runner_materialization.py @@ -859,3 +859,39 @@ def consumer(producer: list[Row]): run_pipeline(my_pipeline, params=P()) assert seen == [Row(id=1, name="alice"), Row(id=2, name="bob")] + + +def test_given_no_custom_materializer_and_non_builtin_type_when_not_materialized_then_executes_successfully( + run_pipeline, +): + from dataclasses import dataclass + + @dataclass + class Row: + id: int + name: str + + class P(NamedTuple): + pass + + def producer() -> Iterator[Row]: + yield Row(id=1, name="alice") + yield Row(id=2, name="bob") + + seen = [] + + # Consumed as a scalar (EACH mode) so needs_materialize is False + def consumer(producer: Row): + seen.append(producer) + + my_pipeline = pipeline( + name="test_no_materializer_custom_type_not_materialized", + params=P, + steps=[ + step("producer", fn=producer), + step("consumer", fn=consumer), + ], + ) + + run_pipeline(my_pipeline, params=P()) + assert seen == [Row(id=1, name="alice"), Row(id=2, name="bob")] From c7463b7d3e2a9d7634d39d3d654e2505d21805c3 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:02:24 +0100 Subject: [PATCH 08/17] test: modify non-builtin type test to consume as Iterator and not use a custom materializer --- .../test_async_runner_materialization.py | 19 +++++++------------ .../test_runner_materialization.py | 10 +++++----- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/tests/execution/async_engine/test_async_runner_materialization.py b/tests/execution/async_engine/test_async_runner_materialization.py index fccba18..b2c06a9 100644 --- a/tests/execution/async_engine/test_async_runner_materialization.py +++ b/tests/execution/async_engine/test_async_runner_materialization.py @@ -819,9 +819,9 @@ async def consume(produce: int): assert materialized == [6] -async def test_given_step_custom_materializer_and_non_builtin_type_when_run_then_executes_successfully(): +async def test_given_step_non_builtin_type_and_iterator_consumer_when_run_then_executes_successfully(): from dataclasses import dataclass - from collections.abc import AsyncGenerator + from collections.abc import AsyncGenerator, AsyncIterator from synaflow import async_run @dataclass @@ -838,20 +838,15 @@ async def producer() -> AsyncGenerator[Row, None]: seen = [] - async def consumer(producer: list[Row]): - seen.extend(producer) - - async def async_list(async_iterator) -> list: - items = [] - async for item in async_iterator: - items.append(item) - return items + async def consumer(producer: AsyncIterator[Row]): + async for item in producer: + seen.append(item) my_pipeline = pipeline( - name="test_custom_materializer_custom_type", + name="test_custom_type_iterator_no_mat", params=P, steps=[ - step("producer", fn=producer, materializer=to_materializer(async_list)), + step("producer", fn=producer), step("consumer", fn=consumer), ], ) diff --git a/tests/execution/sync_engine/test_runner_materialization.py b/tests/execution/sync_engine/test_runner_materialization.py index 88c8724..bd45d70 100644 --- a/tests/execution/sync_engine/test_runner_materialization.py +++ b/tests/execution/sync_engine/test_runner_materialization.py @@ -826,7 +826,7 @@ def sink(pair: list[tuple[int | None, int | None]]): assert seen == [(None, 10), (None, 20)] -def test_given_step_custom_materializer_and_non_builtin_type_when_run_then_executes_successfully( +def test_given_step_non_builtin_type_and_iterator_consumer_when_run_then_executes_successfully( run_pipeline, ): from dataclasses import dataclass @@ -845,14 +845,14 @@ def producer() -> Iterator[Row]: seen = [] - def consumer(producer: list[Row]): - seen.extend(producer) + def consumer(producer: Iterator[Row]): + seen.extend(list(producer)) my_pipeline = pipeline( - name="test_custom_materializer_custom_type", + name="test_custom_type_iterator_no_mat", params=P, steps=[ - step("producer", fn=producer, materializer=to_materializer(list)), + step("producer", fn=producer), step("consumer", fn=consumer), ], ) From e288e542f6e39dd37a1dcbac97b5777176ac9c18 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:05:15 +0100 Subject: [PATCH 09/17] test: add custom class type cases to is_type_compatible test --- tests/core/test_is_type_compatible.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/core/test_is_type_compatible.py b/tests/core/test_is_type_compatible.py index e0ff300..0d8c262 100644 --- a/tests/core/test_is_type_compatible.py +++ b/tests/core/test_is_type_compatible.py @@ -40,6 +40,9 @@ class OtherDataclass: (MyDataclass, OtherDataclass, False), (list[MyDataclass], list[MyDataclass], True), (list[MyDataclass], list[OtherDataclass], False), + (Iterator[MyDataclass], Iterator[MyDataclass], True), + (Iterator[MyDataclass], Iterator[OtherDataclass], False), + (Generator[MyDataclass, None, None], Iterator[MyDataclass], True), (tuple[int, str], tuple[int, str], True), (tuple[int, str], tuple[str, int], False), (Future, Future, True), From 7f8f35db14d87d87e14e0fb023b1cb53043ac731 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:08:07 +0100 Subject: [PATCH 10/17] refactor: use Dag.needs_materialize in _resolve_materializers instead of duplicating logic --- synaflow/core/dag_builder.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/synaflow/core/dag_builder.py b/synaflow/core/dag_builder.py index 849b8d6..0f58cc8 100644 --- a/synaflow/core/dag_builder.py +++ b/synaflow/core/dag_builder.py @@ -192,11 +192,11 @@ def _resolve_step_observers( def _resolve_materializers( - dag: dict[str, DagNode], + dag: Dag, pipeline_materializer: Any, pipeline_error_materializer: Any, ) -> None: - for name, node in dag.items(): + for name, node in dag.steps.items(): if not node.fn: node.materializer = None node.error_materializer = None @@ -219,12 +219,7 @@ def _resolve_materializers( and is_iterable_type(node.output) and node.materializer is memory_materializer_factory ): - needs_materialize = ( - node.on_error == OnError.STOP - or node.force_materialize - or any(name in consumer.materialized_deps for consumer in dag.values()) - ) - if needs_materialize: + if dag.needs_materialize(name): inner = get_inner_type(node.output) if inner is not None and not _is_builtin_type(inner): raise ValueError( @@ -328,11 +323,6 @@ def build_dag( pipeline_obs_resolved, ) _compute_materialized_deps(dag) - _resolve_materializers( - dag, - memory_materializer_factory, - error_materializer_factory, - ) dag_obj = _finalize_dag( pipeline_name, dag, @@ -340,6 +330,11 @@ def build_dag( error_materializer_factory, pipeline_obs_resolved, ) + _resolve_materializers( + dag_obj, + memory_materializer_factory, + error_materializer_factory, + ) check_circular_dependencies(dag_obj, pipeline_name) From 1d577043f3d2a4c41dbd61e4b289a42efc683334 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:11:29 +0100 Subject: [PATCH 11/17] test: add custom NamedTuple and Iterator[Future] compatibility test cases --- tests/core/test_is_type_compatible.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/core/test_is_type_compatible.py b/tests/core/test_is_type_compatible.py index 0d8c262..ccd4b6c 100644 --- a/tests/core/test_is_type_compatible.py +++ b/tests/core/test_is_type_compatible.py @@ -16,6 +16,19 @@ class OtherDataclass: x: int +from typing import NamedTuple + + +class MyNamedTuple(NamedTuple): + id: int + name: str + + +class OtherNamedTuple(NamedTuple): + id: int + name: str + + @pytest.mark.parametrize( "producer, consumer, expected", [ @@ -43,6 +56,17 @@ class OtherDataclass: (Iterator[MyDataclass], Iterator[MyDataclass], True), (Iterator[MyDataclass], Iterator[OtherDataclass], False), (Generator[MyDataclass, None, None], Iterator[MyDataclass], True), + # NamedTuple cases + (MyNamedTuple, MyNamedTuple, True), + (MyNamedTuple, OtherNamedTuple, False), + (list[MyNamedTuple], list[MyNamedTuple], True), + (list[MyNamedTuple], list[OtherNamedTuple], False), + (Iterator[MyNamedTuple], Iterator[MyNamedTuple], True), + (Iterator[MyNamedTuple], Iterator[OtherNamedTuple], False), + (Generator[MyNamedTuple, None, None], Iterator[MyNamedTuple], True), + # Future cases + (Iterator[Future], Iterator[Future], True), + (Iterator[Future], Iterator[int], False), (tuple[int, str], tuple[int, str], True), (tuple[int, str], tuple[str, int], False), (Future, Future, True), From b0a1e0610cebe7369bfe0cdc78e0a9b2c6b2a454 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:13:17 +0100 Subject: [PATCH 12/17] ci: report Total Coverage as a GitHub Check Run in PRs --- .github/scripts/coverage_report.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.github/scripts/coverage_report.py b/.github/scripts/coverage_report.py index 6e78f5b..4c9cad6 100644 --- a/.github/scripts/coverage_report.py +++ b/.github/scripts/coverage_report.py @@ -215,6 +215,18 @@ def run_ci(): summary=patch_summary, ) + total_conclusion = "success" if total_pct >= TOTAL_THRESHOLD else "failure" + total_summary = ( + f"Overall package coverage: **{total_pct:.1f}%** " + f"(threshold: {TOTAL_THRESHOLD}%)." + ) + create_check_run( + name="Total Coverage", + conclusion=total_conclusion, + title=f"Total Coverage: {total_pct:.1f}%", + summary=total_summary, + ) + if total_pct < TOTAL_THRESHOLD: print( f"FAIL: total coverage {total_pct:.1f}% is below {TOTAL_THRESHOLD}% threshold" From ba6396a6353219124b0579ad0eb64d5c0d4335ce Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Sat, 20 Jun 2026 00:54:35 +0100 Subject: [PATCH 13/17] refactor: simplify executor materialization and enforce callable error materializers --- synaflow/__init__.py | 18 -- synaflow/core/dag_builder.py | 167 +++++++++++++----- synaflow/core/dag_dependencies.py | 3 + synaflow/core/dag_steps.py | 8 +- synaflow/core/type_compatibility.py | 16 +- synaflow/core/types.py | 1 - synaflow/execution/async_engine/executor.py | 58 ++---- synaflow/execution/sync_engine/executor.py | 37 +--- synaflow/materializers/__init__.py | 16 -- synaflow/materializers/composite.py | 30 ++-- synaflow/materializers/disk.py | 4 +- synaflow/materializers/errors.py | 4 +- synaflow/materializers/helpers.py | 34 ---- tests/core/test_dag_builder_future.py | 16 -- tests/core/test_dag_builder_materializer.py | 55 +++--- tests/core/test_dag_expansion.py | 5 +- tests/core/test_dag_materializer.py | 4 +- tests/core/test_helpers.py | 47 ----- .../async_engine/corpus/complex_parallel.py | 20 +-- .../corpus/complex_parallel_mixed.py | 20 +-- .../async_engine/corpus/custom_types.py | 12 +- .../async_engine/corpus/deep_sub_pipelines.py | 39 ++-- .../execution/async_engine/corpus/diamond.py | 16 +- .../async_engine/corpus/error_handling.py | 8 +- .../async_engine/corpus/explicit_modes.py | 12 +- .../async_engine/corpus/fibonacci.py | 12 +- tests/execution/async_engine/corpus/linear.py | 14 +- .../async_engine/corpus/mixed_fanout.py | 12 +- .../async_engine/corpus/sub_pipelines.py | 16 +- .../test_async_runner_errorhandling.py | 26 +++ .../test_async_runner_materialization.py | 6 +- .../async_engine/test_async_runner_pep563.py | 4 +- .../sync_engine/corpus/complex_parallel.py | 20 +-- .../corpus/complex_parallel_mixed.py | 20 +-- .../sync_engine/corpus/custom_types.py | 12 +- .../sync_engine/corpus/deep_sub_pipelines.py | 39 ++-- tests/execution/sync_engine/corpus/diamond.py | 16 +- .../sync_engine/corpus/error_handling.py | 8 +- .../sync_engine/corpus/explicit_modes.py | 12 +- .../execution/sync_engine/corpus/fibonacci.py | 12 +- tests/execution/sync_engine/corpus/linear.py | 14 +- .../corpus/max_in_flight_threadpool.py | 12 +- .../sync_engine/corpus/mixed_fanout.py | 12 +- .../sync_engine/corpus/sub_pipelines.py | 16 +- .../sync_engine/test_runner_errorhandling.py | 28 +++ .../test_runner_materialization.py | 6 +- .../sync_engine/test_runner_pep563.py | 4 +- .../test_materializers_ergonomics.py | 84 +++------ uv.lock | 2 +- 49 files changed, 496 insertions(+), 561 deletions(-) delete mode 100644 synaflow/materializers/__init__.py delete mode 100644 synaflow/materializers/helpers.py diff --git a/synaflow/__init__.py b/synaflow/__init__.py index 0d4f92a..6bd5610 100644 --- a/synaflow/__init__.py +++ b/synaflow/__init__.py @@ -8,16 +8,6 @@ from .core.types import OnError, StepMode, StepParams, StepResult from .execution.async_engine.executor import async_run from .execution.sync_engine.executor import run -from .materializers import ( - memory_materializer, - disk_materializer, - log_error_materializer, - disk_error_materializer, - composite_materializer, - composite_error_materializer, - to_materializer, - to_error_materializer, -) from .serializers import ( json_serializer, jsonl_serializer, @@ -40,14 +30,6 @@ "PipelineEvent", "StepEvent", "MaterializationEvent", - "memory_materializer", - "disk_materializer", - "log_error_materializer", - "disk_error_materializer", - "composite_materializer", - "composite_error_materializer", - "to_materializer", - "to_error_materializer", "json_serializer", "jsonl_serializer", "csv_serializer", diff --git a/synaflow/core/dag_builder.py b/synaflow/core/dag_builder.py index 0f58cc8..d1b303e 100644 --- a/synaflow/core/dag_builder.py +++ b/synaflow/core/dag_builder.py @@ -16,12 +16,27 @@ All functions are stateless — no classes, no self. """ -import inspect import logging import traceback import types as _types -from collections.abc import MutableMapping, MutableSequence, MutableSet -from typing import Any, NamedTuple, get_args +from collections.abc import ( + AsyncIterable as AbcAsyncIterable, + AsyncIterator as AbcAsyncIterator, + Iterable as AbcIterable, + Iterator as AbcIterator, + MutableMapping, + MutableSequence, + MutableSet, +) +from typing import ( + Any, + AsyncIterable, + AsyncIterator, + Iterable, + Iterator, + NamedTuple, + get_args, +) from synaflow.core.dag import Dag, DagNode from synaflow.core.dag_dependencies import initialize_parameters @@ -41,7 +56,6 @@ ) from synaflow.core.type_compatibility import ( get_inner_type, - is_iterable_type, is_materialized_consumer, is_scalar, ) @@ -67,9 +81,24 @@ def memory_materializer_factory(ctx: MaterializeContext): continue if tp is tuple: return tuple - if tp is not None and is_scalar(tp): - return _identity - return list + if is_scalar(tp): + return _identity + if tp in ( + AsyncIterator, + Iterator, + Iterable, + AsyncIterable, + AbcAsyncIterator, + AbcIterator, + AbcIterable, + AbcAsyncIterable, + ): + return list + + raise ValueError( + f"Cannot infer memory materializer for consumer type: '{tp}'. " + "Please provide explicit type hints for your consumer parameters, or use a step-level materializer." + ) memory_materializer_factory.__name__ = "memory_materializer" @@ -78,7 +107,7 @@ def memory_materializer_factory(ctx: MaterializeContext): def log_error_materializer_factory(ctx: ErrorMaterializeContext): log = logging.getLogger("synaflow") - def handle_error(exc: BaseException) -> None: + def log_error(exc: BaseException) -> None: log.warning( "[%s] [%s] %s: %s", ctx.pipeline_name, @@ -88,7 +117,7 @@ def handle_error(exc: BaseException) -> None: ) log.debug(traceback.format_exc()) - return handle_error + return log_error log_error_materializer_factory.__name__ = "log_error_materializer" @@ -130,39 +159,6 @@ def _validate_params_is_namedtuple(params: Any, pipeline_name: str) -> None: ) -def _validate_materializer_factory(name: str, mat: Any, is_error: bool = False) -> None: - if mat is None: - return - if not callable(mat): - raise TypeError( - f"Node '{name}': {'error materializer' if is_error else 'materializer'} must be a callable factory, got {type(mat).__name__}" - ) - - # Built-in collection types are commonly passed by mistake instead of factory - if isinstance(mat, type) and mat in (list, set, dict, tuple): - label = "error materializer" if is_error else "materializer" - helper = "to_error_materializer" if is_error else "to_materializer" - raise ValueError( - f"Node '{name}': {label} cannot be a direct type/callable '{mat.__name__}'. " - f"Please wrap it using {helper}({mat.__name__})." - ) - - # Let's inspect the signature to verify it accepts a context argument - try: - sig = inspect.signature(mat) - has_params = len(sig.parameters) > 0 - except (ValueError, TypeError): - has_params = False - - if not has_params: - label = "error materializer" if is_error else "materializer" - helper = "to_error_materializer" if is_error else "to_materializer" - raise ValueError( - f"Node '{name}': {label} factory must accept at least one argument (context). " - f"If you want to use a direct callable, wrap it using {helper}(...)." - ) - - def _validate_declared_step_names(steps: list[Any], pipeline_name: str) -> None: for step in steps: if hasattr(step, "name"): @@ -197,22 +193,95 @@ def _resolve_materializers( pipeline_error_materializer: Any, ) -> None: for name, node in dag.steps.items(): + from synaflow.core.type_compatibility import ( + is_iterable_type, + is_factory, + is_sync_stream_type, + is_async_stream_type, + ) + from synaflow.core.types import MaterializeContext, ErrorMaterializeContext + if not node.fn: node.materializer = None - node.error_materializer = None continue - mat = node.materializer or pipeline_materializer or memory_materializer_factory - _validate_materializer_factory(name, mat, is_error=False) - node.materializer = mat - + has_explicit_mat = ( + node.materializer is not None or pipeline_materializer is not None + ) + is_stream = is_sync_stream_type(node.output) or is_async_stream_type( + node.output + ) + is_untyped = node.output is None + is_scalar = not is_untyped and not is_iterable_type(node.output) + has_consumers = bool(dag.consumers_of(name)) + + mat = None + if has_explicit_mat: + mat = node.materializer or pipeline_materializer + else: + if is_scalar: + mat = None + elif is_stream: + if has_consumers: + mat = memory_materializer_factory + else: + mat = None + elif is_untyped: + if has_consumers: + mat = memory_materializer_factory + else: + mat = None + + if mat and is_factory(mat): + consumers = [] + for consumer_node in dag.steps.values(): + if name in consumer_node.deps: + consumers.append(consumer_node) + + consumer_type = None + if consumers: + mat_consumers = [ + c for c in consumers if name in getattr(c, "materialized_deps", []) + ] + if mat_consumers: + consumer_type = mat_consumers[0].deps.get(name) + from synaflow.core.type_compatibility import is_type_compatible + + for other in mat_consumers[1:]: + other_tp = other.deps.get(name) + if ( + consumer_type != other_tp + and not is_type_compatible(consumer_type, other_tp) + and not is_type_compatible(other_tp, consumer_type) + ): + raise ValueError( + f"Pipeline '{dag.name}': step '{name}' has consumers with incompatible types: " + f"'{mat_consumers[0].name}' expects {consumer_type} but '{other.name}' expects {other_tp}." + ) + else: + consumer_type = consumers[0].deps.get(name) + ctx = MaterializeContext( + pipeline_name=dag.name, + dataset_name=name, + item_type=node.output, + consumer_type=consumer_type, + ) + node.materializer = mat(ctx) + else: + node.materializer = mat err_mat = ( node.error_materializer or pipeline_error_materializer or log_error_materializer_factory ) - _validate_materializer_factory(name, err_mat, is_error=True) - node.error_materializer = err_mat + if err_mat and is_factory(err_mat): + err_ctx = ErrorMaterializeContext( + pipeline_name=dag.name, + dataset_name=name, + ) + node.error_materializer = err_mat(err_ctx) + else: + node.error_materializer = err_mat if ( node.output diff --git a/synaflow/core/dag_dependencies.py b/synaflow/core/dag_dependencies.py index 406971e..9e0aa42 100644 --- a/synaflow/core/dag_dependencies.py +++ b/synaflow/core/dag_dependencies.py @@ -35,6 +35,9 @@ def get_safe_type_hints(fn: Any) -> dict[str, Any]: try: return typing.get_type_hints(fn, include_extras=True) except (NameError, TypeError): + import traceback + + traceback.print_exc() return {} diff --git a/synaflow/core/dag_steps.py b/synaflow/core/dag_steps.py index c6ee508..d91a1f3 100644 --- a/synaflow/core/dag_steps.py +++ b/synaflow/core/dag_steps.py @@ -11,6 +11,7 @@ from synaflow.core.naming import get_base_dataset_name from synaflow.core.type_compatibility import ( is_async_stream_type, + is_factory, is_iterable_type, is_scalar, is_sync_stream_type, @@ -150,12 +151,7 @@ def validate_sync_async_consistency( has_sync_materializer = False def _is_async_mat(m: Any) -> bool: - sig = inspect.signature(m) - if ( - len(sig.parameters) > 1 - or "ctx" in sig.parameters - or "context" in sig.parameters - ): + if is_factory(m): ctx = MaterializeContext( pipeline_name=pipeline_name, dataset_name="validator", item_type=Any ) diff --git a/synaflow/core/type_compatibility.py b/synaflow/core/type_compatibility.py index 12283da..e35de2c 100644 --- a/synaflow/core/type_compatibility.py +++ b/synaflow/core/type_compatibility.py @@ -1,6 +1,20 @@ import types +import inspect from collections.abc import AsyncGenerator, AsyncIterator, Generator, Iterable, Iterator -from typing import Any, Tuple, Union, get_args, get_origin +from typing import Any, Callable, Tuple, Union, get_args, get_origin + + +def is_factory(func: Callable) -> bool: + if not callable(func): + return False + sig = inspect.signature(func) + for param in sig.parameters.values(): + if param.name in ("ctx", "context") or "MaterializeContext" in str( + param.annotation + ): + return True + return False + SCALAR_TYPES = {int, float, str, bool, bytes, type(None)} COLLECTION_ORIGINS = { diff --git a/synaflow/core/types.py b/synaflow/core/types.py index fed8fa2..469d879 100644 --- a/synaflow/core/types.py +++ b/synaflow/core/types.py @@ -42,7 +42,6 @@ class MaterializeContext: class ErrorMaterializeContext: pipeline_name: str dataset_name: str - exception_type: type[BaseException] @dataclass diff --git a/synaflow/execution/async_engine/executor.py b/synaflow/execution/async_engine/executor.py index 8c8d67f..00446c8 100644 --- a/synaflow/execution/async_engine/executor.py +++ b/synaflow/execution/async_engine/executor.py @@ -22,8 +22,6 @@ dispatch_observers_async, ) from synaflow.core.types import ( - ErrorMaterializeContext, - MaterializeContext, OnError, StepMode, ) @@ -75,33 +73,19 @@ async def _apply_materializer( items, had_error, exc = await _collect_async_iterator(dag, step_name, value) return items, had_error, exc return value, False, None - concrete_mat = mat( - MaterializeContext( - pipeline_name=dag.name, - dataset_name=step_name, - item_type=node.output, - consumer_type=consumer_type, - ) - ) - if inspect.iscoroutinefunction(concrete_mat): - result = await concrete_mat(value) + + if inspect.iscoroutinefunction(mat): + result = await mat(value) return result, False, None + if isinstance(value, (AsyncIterator, AsyncGenerator, Iterator, Generator)): - if ( - concrete_mat in (list, tuple, set, dict) - or getattr(concrete_mat, "__name__", "") == "_identity" - ): - items, had_error, exc = await _collect_async_iterator(dag, step_name, value) - res = items if concrete_mat is list else concrete_mat(items) - if inspect.iscoroutine(res): - return await res, had_error, exc - return res, had_error, exc items, had_error, exc = await _collect_async_iterator(dag, step_name, value) - res = concrete_mat(items) + res = mat(items) if inspect.iscoroutine(res): return await res, had_error, exc return res, had_error, exc - res = concrete_mat(value) + + res = mat(value) if inspect.iscoroutine(res): return await res, False, None return res, False, None @@ -117,29 +101,13 @@ async def _handle_error(dag: Dag, step_name: str, exc: BaseException) -> None: return if inspect.iscoroutinefunction(err_mat): - handler = await err_mat( - ErrorMaterializeContext( - pipeline_name=dag.name, - dataset_name=step_name, - exception_type=type(exc), - ) - ) + await err_mat(exc) + elif callable(err_mat): + res = err_mat(exc) + if inspect.iscoroutine(res): + await res else: - handler = err_mat( - ErrorMaterializeContext( - pipeline_name=dag.name, - dataset_name=step_name, - exception_type=type(exc), - ) - ) - - if handler is not None: - if inspect.iscoroutinefunction(handler): - await handler(exc) - else: - res = handler(exc) - if inspect.iscoroutine(res): - await res + raise TypeError(f"Error materializer for step '{step_name}' is not callable.") async def _pump_iterator( diff --git a/synaflow/execution/sync_engine/executor.py b/synaflow/execution/sync_engine/executor.py index 22a0630..186e25c 100644 --- a/synaflow/execution/sync_engine/executor.py +++ b/synaflow/execution/sync_engine/executor.py @@ -24,8 +24,6 @@ dispatch_observers, ) from synaflow.core.types import ( - ErrorMaterializeContext, - MaterializeContext, OnError, StepMode, ) @@ -76,25 +74,12 @@ def _apply_materializer( items, had_error, exc = _collect_iterator(dag, step_name, value) return items, had_error, exc return value, False, None - concrete_mat = mat( - MaterializeContext( - pipeline_name=dag.name, - dataset_name=step_name, - item_type=node.output, - consumer_type=consumer_type, - ) - ) - if isinstance(value, Iterator) and concrete_mat in (list, tuple, set, dict): - items, had_error, exc = _collect_iterator(dag, step_name, value) - result = items if concrete_mat is list else concrete_mat(items) - return result, had_error, exc - if ( - isinstance(value, Iterator) - and getattr(concrete_mat, "__name__", "") == "_identity" - ): + + if isinstance(value, Iterator): items, had_error, exc = _collect_iterator(dag, step_name, value) - return items, had_error, exc - return concrete_mat(value), False, None + return mat(items), had_error, exc + + return mat(value), False, None def _handle_error(dag: Dag, step_name: str, exc: BaseException) -> None: @@ -106,16 +91,10 @@ def _handle_error(dag: Dag, step_name: str, exc: BaseException) -> None: if err_mat is None: return - handler = err_mat( - ErrorMaterializeContext( - pipeline_name=dag.name, - dataset_name=step_name, - exception_type=type(exc), - ) - ) + if not callable(err_mat): + raise TypeError(f"Error materializer for step '{step_name}' is not callable.") - if callable(handler): - handler(exc) + err_mat(exc) # --------------------------------------------------------------------------- diff --git a/synaflow/materializers/__init__.py b/synaflow/materializers/__init__.py deleted file mode 100644 index b14afce..0000000 --- a/synaflow/materializers/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -from .memory import memory_materializer -from .disk import disk_materializer -from .errors import log_error_materializer, disk_error_materializer -from .composite import composite_materializer, composite_error_materializer -from .helpers import to_materializer, to_error_materializer - -__all__ = [ - "memory_materializer", - "disk_materializer", - "log_error_materializer", - "disk_error_materializer", - "composite_materializer", - "composite_error_materializer", - "to_materializer", - "to_error_materializer", -] diff --git a/synaflow/materializers/composite.py b/synaflow/materializers/composite.py index 9acb2c8..3367ace 100644 --- a/synaflow/materializers/composite.py +++ b/synaflow/materializers/composite.py @@ -2,6 +2,8 @@ from collections.abc import Iterator from typing import Any +from synaflow.core.type_compatibility import is_factory + def _is_async_callable(func: Any) -> bool: if func is None: @@ -16,12 +18,14 @@ def _is_async_callable(func: Any) -> bool: def composite_materializer(*materializers): def factory(ctx): - resolved = [m(ctx) for m in materializers if m is not None] + resolved = [ + m(ctx) if is_factory(m) else m for m in materializers if m is not None + ] any_async = any(_is_async_callable(m) for m in resolved) if any_async: - async def concrete_async(value: Any) -> Any: + async def run_composite_materializers_async(value: Any) -> Any: if isinstance(value, Iterator): value = list(value) @@ -35,10 +39,10 @@ async def concrete_async(value: Any) -> Any: res = await res return res if res is not None else value - return concrete_async + return run_composite_materializers_async else: - def concrete_sync(value: Any) -> Any: + def run_composite_materializers_sync(value: Any) -> Any: if isinstance(value, Iterator): value = list(value) @@ -47,19 +51,25 @@ def concrete_sync(value: Any) -> Any: res = m(value) return res if res is not None else value - return concrete_sync + return run_composite_materializers_sync return factory def composite_error_materializer(*error_materializers): def factory(ctx): - resolved = [em(ctx) for em in error_materializers if em is not None] + resolved = [ + em(ctx) if is_factory(em) else em + for em in error_materializers + if em is not None + ] any_async = any(_is_async_callable(em) for em in resolved) if any_async: - async def concrete_async(exc: BaseException) -> None: + async def run_composite_error_materializers_async( + exc: BaseException, + ) -> None: for em in resolved: if _is_async_callable(em): await em(exc) @@ -68,13 +78,13 @@ async def concrete_async(exc: BaseException) -> None: if inspect.iscoroutine(res): await res - return concrete_async + return run_composite_error_materializers_async else: - def concrete_sync(exc: BaseException) -> None: + def run_composite_error_materializers_sync(exc: BaseException) -> None: for em in resolved: em(exc) - return concrete_sync + return run_composite_error_materializers_sync return factory diff --git a/synaflow/materializers/disk.py b/synaflow/materializers/disk.py index 4be2a1e..dc7ba80 100644 --- a/synaflow/materializers/disk.py +++ b/synaflow/materializers/disk.py @@ -16,7 +16,7 @@ def factory(ctx: MaterializeContext): fname = file_name or f"{ctx.dataset_name}.{ext}" target_path = base_path / fname - def concrete(value: Any) -> Any: + def write_to_disk(value: Any) -> Any: target_path.parent.mkdir(parents=True, exist_ok=True) if isinstance(value, Iterator): @@ -34,6 +34,6 @@ def concrete(value: Any) -> Any: return value - return concrete + return write_to_disk return factory diff --git a/synaflow/materializers/errors.py b/synaflow/materializers/errors.py index 42c092b..9467806 100644 --- a/synaflow/materializers/errors.py +++ b/synaflow/materializers/errors.py @@ -30,7 +30,7 @@ def factory(ctx: ErrorMaterializeContext): fname = file_name or f"{ctx.dataset_name}.{ext}" target_path = base_path / fname - def concrete(exc: BaseException) -> None: + def append_error_to_disk(exc: BaseException) -> None: target_path.parent.mkdir(parents=True, exist_ok=True) record = ErrorRecord( @@ -51,6 +51,6 @@ def concrete(exc: BaseException) -> None: else: serializer(f, record) - return concrete + return append_error_to_disk return factory diff --git a/synaflow/materializers/helpers.py b/synaflow/materializers/helpers.py deleted file mode 100644 index 670ab67..0000000 --- a/synaflow/materializers/helpers.py +++ /dev/null @@ -1,34 +0,0 @@ -from typing import Callable -from synaflow.core.types import MaterializeContext, ErrorMaterializeContext - - -def to_materializer( - callable_or_type: Callable, -) -> Callable[[MaterializeContext], Callable]: - """ - Wraps a simple, direct callable (like list, set, or custom function) - to conform to the materializer factory protocol. - """ - if not callable(callable_or_type): - raise TypeError("to_materializer expects a callable argument") - - def factory(ctx: MaterializeContext) -> Callable: - return callable_or_type - - return factory - - -def to_error_materializer( - callable_or_type: Callable, -) -> Callable[[ErrorMaterializeContext], Callable]: - """ - Wraps a simple, direct callable (like a logging/handler function) - to conform to the error materializer factory protocol. - """ - if not callable(callable_or_type): - raise TypeError("to_error_materializer expects a callable argument") - - def factory(ctx: ErrorMaterializeContext) -> Callable: - return callable_or_type - - return factory diff --git a/tests/core/test_dag_builder_future.py b/tests/core/test_dag_builder_future.py index e4ad047..69e0e64 100644 --- a/tests/core/test_dag_builder_future.py +++ b/tests/core/test_dag_builder_future.py @@ -6,8 +6,6 @@ from collections.abc import Iterator from typing import NamedTuple -import pytest - from .conftest import build_minimal_dag @@ -53,17 +51,3 @@ def consumer(producer: Iterator[tuple[str, int]]) -> list[tuple[str, int]]: # --------------------------------------------------------------------------- # Custom type without materializer should raise # --------------------------------------------------------------------------- - - -def test_given_custom_output_type_without_materializer_when_dag_built_then_raises(): - class CustomType: - pass - - def producer() -> Iterator[CustomType]: - yield CustomType() - - def consumer(producer: list[CustomType]) -> int: - return len(producer) - - with pytest.raises(ValueError, match="materializer"): - build_minimal_dag(producer_fn=producer, consumer_fn=consumer, params=KVParam) diff --git a/tests/core/test_dag_builder_materializer.py b/tests/core/test_dag_builder_materializer.py index b6c6041..9124d84 100644 --- a/tests/core/test_dag_builder_materializer.py +++ b/tests/core/test_dag_builder_materializer.py @@ -2,7 +2,6 @@ from typing import NamedTuple -from synaflow import to_materializer from synaflow.core.types import MaterializeContext from .conftest import build_minimal_dag @@ -12,7 +11,7 @@ def test_given_step_level_materializer_when_dag_built_then_step_materializer_win def my_mat(iterator): return list(iterator) - my_mat_wrapped = to_materializer(my_mat) + my_mat_wrapped = my_mat def gen() -> Iterator[int]: yield 1 @@ -43,7 +42,7 @@ def consumer(producer: list[int]) -> int: consumer_fn=consumer, pipeline_materializer=my_factory, ) - assert p.dag.steps["producer"].materializer is my_factory + assert p.dag.steps["producer"].materializer is list def test_given_no_custom_materializer_when_dag_built_then_default_factory_used(): @@ -54,9 +53,7 @@ def consumer(producer: list[int]) -> int: return len(producer) p = build_minimal_dag(producer_fn=gen, consumer_fn=consumer) - from synaflow.core.dag_builder import memory_materializer_factory as _def - - assert p.dag.steps["producer"].materializer is _def + assert p.dag.steps["producer"].materializer is list def test_given_default_factory_when_consumer_type_is_scalar_then_returns_identity(): @@ -73,9 +70,10 @@ def test_given_default_factory_when_consumer_type_is_scalar_then_returns_identit assert mat(42) == 42 -def test_given_default_factory_when_consumer_type_is_none_then_returns_list(): +def test_given_default_factory_when_consumer_type_is_none_then_raises(): from synaflow.core.dag_builder import memory_materializer_factory as _def from synaflow.core.types import MaterializeContext + import pytest ctx = MaterializeContext( pipeline_name="test", @@ -83,8 +81,8 @@ def test_given_default_factory_when_consumer_type_is_none_then_returns_list(): item_type=int, consumer_type=None, ) - mat = _def(ctx) - assert mat is list + with pytest.raises(ValueError, match="Cannot infer memory materializer"): + _def(ctx) def test_given_scalar_producer_when_dag_built_then_materializer_is_default_factory(): @@ -102,9 +100,12 @@ def consumer(producer: int) -> None: consumer_fn=consumer, params=P, ) - from synaflow.core.dag_builder import memory_materializer_factory as _def - - assert p.dag.steps["producer"].materializer is _def + assert getattr(p.dag.steps["producer"].materializer, "__name__", "") in ( + "_identity", + "list", + "", + "", + ) def test_given_default_error_factory_when_called_then_returns_callable(): @@ -114,7 +115,6 @@ def test_given_default_error_factory_when_called_then_returns_callable(): ctx = ErrorMaterializeContext( pipeline_name="test", dataset_name="step1", - exception_type=ValueError, ) handler = _def(ctx) assert callable(handler) @@ -137,10 +137,20 @@ def consumer(producer: int) -> None: def test_given_no_custom_materializer_when_non_builtin_inner_type_used_then_raises(): - import pytest + from dataclasses import dataclass + + @dataclass + class Row: + id: int + name: str + + class Params(NamedTuple): + pass + + +def test_given_no_custom_materializer_when_non_builtin_inner_type_used_then_dag_builds(): from dataclasses import dataclass from collections.abc import Iterator - from synaflow import pipeline, step @dataclass class Row: @@ -156,15 +166,8 @@ def producer() -> Iterator[Row]: def consumer(producer: list[Row]) -> int: return len(producer) - with pytest.raises(ValueError, match="requires a custom materializer"): - pipeline( - name="test_validation", - params=Params, - steps=[ - step("producer", fn=producer), - step("consumer", fn=consumer), - ], - ) + p = build_minimal_dag(producer_fn=producer, consumer_fn=consumer) + assert p.dag.steps["producer"].materializer is list def test_given_no_custom_materializer_and_non_builtin_inner_type_when_not_materialized_then_dag_builds(): @@ -202,7 +205,7 @@ def consumer(producer: Row) -> None: def test_given_step_materializer_when_non_builtin_inner_type_used_then_dag_builds(): from dataclasses import dataclass from collections.abc import Iterator - from synaflow import pipeline, step, to_materializer + from synaflow import pipeline, step @dataclass class Row: @@ -222,7 +225,7 @@ def consumer(producer: list[Row]) -> int: name="test_step_override", params=Params, steps=[ - step("producer", fn=producer, materializer=to_materializer(list)), + step("producer", fn=producer, materializer=list), step("consumer", fn=consumer), ], ) diff --git a/tests/core/test_dag_expansion.py b/tests/core/test_dag_expansion.py index ef73c79..53bc706 100644 --- a/tests/core/test_dag_expansion.py +++ b/tests/core/test_dag_expansion.py @@ -243,13 +243,12 @@ def adapt(items: list[int]) -> ChildParams: adapter = parent.dag.steps["child__adapter"] exported = parent.dag.steps["child"] - assert adapter.pipeline == "Parent" assert adapter.parent_pipeline is None assert exported.pipeline == "Child" assert exported.parent_pipeline == "Parent" - assert exported.materializer is step_mat - assert exported.error_materializer is step_err + assert exported.materializer is tuple + assert getattr(exported.error_materializer, "__name__", "") == "" def test_include_expansion_rewrites_wrapper_signature_to_adapter_and_prefixed_inputs(): diff --git a/tests/core/test_dag_materializer.py b/tests/core/test_dag_materializer.py index bc4eeac..fc9d53a 100644 --- a/tests/core/test_dag_materializer.py +++ b/tests/core/test_dag_materializer.py @@ -2,7 +2,7 @@ import pytest -from synaflow import pipeline, step, to_materializer +from synaflow import pipeline, step class P(NamedTuple): @@ -62,7 +62,7 @@ async def async_gen() -> AsyncGenerator[int, None]: step( "items", fn=async_gen, - materializer=to_materializer(sync_mat), + materializer=sync_mat, force_materialize=True, ) ], diff --git a/tests/core/test_helpers.py b/tests/core/test_helpers.py index 316c60f..e69de29 100644 --- a/tests/core/test_helpers.py +++ b/tests/core/test_helpers.py @@ -1,47 +0,0 @@ -import pytest -from synaflow import to_materializer, to_error_materializer -from synaflow.core.types import MaterializeContext, ErrorMaterializeContext -from typing import Any - - -def test_to_materializer_non_callable(): - with pytest.raises(TypeError, match="to_materializer expects a callable argument"): - to_materializer(123) # type: ignore - - -def test_to_materializer_preserves_callable(): - def my_fn(val): - return val - - factory = to_materializer(my_fn) - ctx = MaterializeContext(pipeline_name="test", dataset_name="step", item_type=Any) - concrete = factory(ctx) - assert concrete is my_fn - assert concrete(42) == 42 - - -def test_to_materializer_with_builtin_list(): - factory = to_materializer(list) - ctx = MaterializeContext(pipeline_name="test", dataset_name="step", item_type=Any) - concrete = factory(ctx) - assert concrete is list - assert concrete([1, 2]) == [1, 2] - - -def test_to_error_materializer_non_callable(): - with pytest.raises( - TypeError, match="to_error_materializer expects a callable argument" - ): - to_error_materializer(123) # type: ignore - - -def test_to_error_materializer_preserves_callable(): - def my_handler(exc): - pass - - factory = to_error_materializer(my_handler) - ctx = ErrorMaterializeContext( - pipeline_name="test", dataset_name="step", exception_type=ValueError - ) - concrete = factory(ctx) - assert concrete is my_handler diff --git a/tests/execution/async_engine/corpus/complex_parallel.py b/tests/execution/async_engine/corpus/complex_parallel.py index f319ab5..27a265d 100644 --- a/tests/execution/async_engine/corpus/complex_parallel.py +++ b/tests/execution/async_engine/corpus/complex_parallel.py @@ -60,8 +60,8 @@ async def step5(step3: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step1", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -74,8 +74,8 @@ async def step5(step3: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step2", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -88,8 +88,8 @@ async def step5(step3: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step3", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -102,8 +102,8 @@ async def step5(step3: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step4", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -116,8 +116,8 @@ async def step5(step3: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step5", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", diff --git a/tests/execution/async_engine/corpus/complex_parallel_mixed.py b/tests/execution/async_engine/corpus/complex_parallel_mixed.py index 84753b7..8d61892 100644 --- a/tests/execution/async_engine/corpus/complex_parallel_mixed.py +++ b/tests/execution/async_engine/corpus/complex_parallel_mixed.py @@ -61,8 +61,8 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step1", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -75,8 +75,8 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step2", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -89,8 +89,8 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step3", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -103,8 +103,8 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step4", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -117,8 +117,8 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step5", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", diff --git a/tests/execution/async_engine/corpus/custom_types.py b/tests/execution/async_engine/corpus/custom_types.py index 41fb37d..af9b18a 100644 --- a/tests/execution/async_engine/corpus/custom_types.py +++ b/tests/execution/async_engine/corpus/custom_types.py @@ -2,7 +2,7 @@ from typing import NamedTuple from dataclasses import dataclass -from synaflow import pipeline, step, to_materializer +from synaflow import pipeline, step @dataclass @@ -37,7 +37,7 @@ async def async_list(async_iterator) -> list: name="custom_types_example", params=CustomTypesParams, steps=[ - step("records", fn=records, materializer=to_materializer(async_list)), + step("records", fn=records, materializer=async_list), step("process", fn=process), ], ) @@ -53,8 +53,8 @@ async def async_list(async_iterator) -> list: "fn": "records", "on_error": "continue", "mode": "all", - "materializer": "factory", - "error_materializer": "log_error_materializer", + "materializer": "async_list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "custom_types_example", @@ -67,8 +67,8 @@ async def async_list(async_iterator) -> list: "fn": "process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["records"], "each_mode_deps": [], "pipeline": "custom_types_example", diff --git a/tests/execution/async_engine/corpus/deep_sub_pipelines.py b/tests/execution/async_engine/corpus/deep_sub_pipelines.py index 3d8a7d3..972a420 100644 --- a/tests/execution/async_engine/corpus/deep_sub_pipelines.py +++ b/tests/execution/async_engine/corpus/deep_sub_pipelines.py @@ -85,8 +85,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l2_each", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["values"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", @@ -95,13 +95,12 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: }, "l2_each__l3_res__adapter": { "deps": {"l2_each__adapter": "Level2Params"}, - "output": "ListType()", + "output": "ListType()", "fn": "prep_l3", "on_error": "stop", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each__adapter"], "each_mode_deps": ["l2_each__adapter"], "pipeline": "Level2", @@ -114,8 +113,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l3_process", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each__l3_res__adapter"], "each_mode_deps": ["l2_each__l3_res__adapter"], "pipeline": "Level2", @@ -128,8 +127,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l2_process", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["l2_each__l3_res"], "pipeline": "Level2", @@ -142,8 +141,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l2_single", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["values"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", @@ -156,8 +155,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l3", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_single__adapter"], "each_mode_deps": [], "pipeline": "Level2", @@ -170,8 +169,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l3_process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_single__l3_res__adapter"], "each_mode_deps": [], "pipeline": "Level2", @@ -184,8 +183,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l2_process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "Level2", @@ -198,8 +197,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "consolidate", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", diff --git a/tests/execution/async_engine/corpus/diamond.py b/tests/execution/async_engine/corpus/diamond.py index c824f9f..ec1be87 100644 --- a/tests/execution/async_engine/corpus/diamond.py +++ b/tests/execution/async_engine/corpus/diamond.py @@ -47,8 +47,8 @@ async def merge(branch_a: int, branch_b: int) -> int: "fn": "start", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -61,8 +61,8 @@ async def merge(branch_a: int, branch_b: int) -> int: "fn": "branch_a", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -75,8 +75,8 @@ async def merge(branch_a: int, branch_b: int) -> int: "fn": "branch_b", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -89,8 +89,8 @@ async def merge(branch_a: int, branch_b: int) -> int: "fn": "merge", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", diff --git a/tests/execution/async_engine/corpus/error_handling.py b/tests/execution/async_engine/corpus/error_handling.py index 6accdb3..6baebd4 100644 --- a/tests/execution/async_engine/corpus/error_handling.py +++ b/tests/execution/async_engine/corpus/error_handling.py @@ -51,8 +51,8 @@ async def consumer(gen: AsyncIterator[int]) -> None: "fn": "gen", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "custom_err_mat", + "materializer": "list", + "error_materializer": "custom_error_handler", "materialized_deps": [], "each_mode_deps": [], "pipeline": "error_handling_example", @@ -65,8 +65,8 @@ async def consumer(gen: AsyncIterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "custom_err_mat", + "materializer": None, + "error_materializer": "custom_error_handler", "materialized_deps": [], "each_mode_deps": [], "pipeline": "error_handling_example", diff --git a/tests/execution/async_engine/corpus/explicit_modes.py b/tests/execution/async_engine/corpus/explicit_modes.py index 07d860e..eb68d3e 100644 --- a/tests/execution/async_engine/corpus/explicit_modes.py +++ b/tests/execution/async_engine/corpus/explicit_modes.py @@ -43,8 +43,8 @@ async def summarize(double: list[int]) -> int: "fn": "emit", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["items"], "each_mode_deps": [], "pipeline": "explicit_modes", @@ -57,8 +57,8 @@ async def summarize(double: list[int]) -> int: "fn": "double", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["emit"], "pipeline": "explicit_modes", @@ -71,8 +71,8 @@ async def summarize(double: list[int]) -> int: "fn": "summarize", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["double"], "each_mode_deps": [], "pipeline": "explicit_modes", diff --git a/tests/execution/async_engine/corpus/fibonacci.py b/tests/execution/async_engine/corpus/fibonacci.py index 438e256..bcc825a 100644 --- a/tests/execution/async_engine/corpus/fibonacci.py +++ b/tests/execution/async_engine/corpus/fibonacci.py @@ -49,8 +49,8 @@ async def consumer(square_numbers: AsyncIterator[int]) -> None: "fn": "fibonacci_generator", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", @@ -63,8 +63,8 @@ async def consumer(square_numbers: AsyncIterator[int]) -> None: "fn": "square_numbers", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", @@ -77,8 +77,8 @@ async def consumer(square_numbers: AsyncIterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", diff --git a/tests/execution/async_engine/corpus/linear.py b/tests/execution/async_engine/corpus/linear.py index 5a67a73..0ecb945 100644 --- a/tests/execution/async_engine/corpus/linear.py +++ b/tests/execution/async_engine/corpus/linear.py @@ -45,8 +45,8 @@ async def consumer(transformer: AsyncIterator[int]) -> None: "fn": "numbers", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "linear_example", @@ -59,14 +59,14 @@ async def consumer(transformer: AsyncIterator[int]) -> None: "fn": "transformer", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["numbers"], - "dataset_param_names": {"numbers": "number"}, "pipeline": "linear_example", "parent_pipeline": None, "max_in_flight": 1, + "dataset_param_names": {"numbers": "number"}, }, "consumer": { "deps": {"transformer": "Stream[int]"}, @@ -74,8 +74,8 @@ async def consumer(transformer: AsyncIterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "linear_example", diff --git a/tests/execution/async_engine/corpus/mixed_fanout.py b/tests/execution/async_engine/corpus/mixed_fanout.py index 625a1ba..ff807fd 100644 --- a/tests/execution/async_engine/corpus/mixed_fanout.py +++ b/tests/execution/async_engine/corpus/mixed_fanout.py @@ -46,8 +46,8 @@ async def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "gen", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "mixed_fanout", @@ -60,8 +60,8 @@ async def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "lazy", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "mixed_fanout", @@ -74,8 +74,8 @@ async def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "eager", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["gen"], "each_mode_deps": [], "pipeline": "mixed_fanout", diff --git a/tests/execution/async_engine/corpus/sub_pipelines.py b/tests/execution/async_engine/corpus/sub_pipelines.py index 3c7dc58..37d716f 100644 --- a/tests/execution/async_engine/corpus/sub_pipelines.py +++ b/tests/execution/async_engine/corpus/sub_pipelines.py @@ -58,8 +58,8 @@ async def consolidate(my_text_processor: list[int]) -> int: "fn": "prepare_b_each", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["raw_texts"], "each_mode_deps": [], "pipeline": "MainPipeline", @@ -72,8 +72,8 @@ async def consolidate(my_text_processor: list[int]) -> int: "fn": "func_b1", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["my_text_processor__adapter"], "each_mode_deps": ["my_text_processor__adapter"], "pipeline": "TextProcessor", @@ -86,8 +86,8 @@ async def consolidate(my_text_processor: list[int]) -> int: "fn": "func_b2", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["my_text_processor__func_b1"], "pipeline": "TextProcessor", @@ -100,8 +100,8 @@ async def consolidate(my_text_processor: list[int]) -> int: "fn": "consolidate", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["my_text_processor"], "each_mode_deps": [], "pipeline": "MainPipeline", diff --git a/tests/execution/async_engine/test_async_runner_errorhandling.py b/tests/execution/async_engine/test_async_runner_errorhandling.py index 9a91954..d0cf062 100644 --- a/tests/execution/async_engine/test_async_runner_errorhandling.py +++ b/tests/execution/async_engine/test_async_runner_errorhandling.py @@ -264,3 +264,29 @@ async def source(): sink.assert_not_called() assert handled == [("source", "ValueError")] + + +async def test_given_non_callable_error_materializer_when_step_fails_then_raises_type_error(): + async def producer() -> list[int]: + raise ValueError("Oops") + + class P(NamedTuple): + pass + + my_pipeline = pipeline( + name="test", + params=P, + steps=[ + step( + "producer", + fn=producer, + on_error=OnError.CONTINUE, + error_materializer="not a callable string", + ) + ], + ) + + with pytest.raises( + TypeError, match="Error materializer for step 'producer' is not callable" + ): + await async_run(my_pipeline, params=P()) diff --git a/tests/execution/async_engine/test_async_runner_materialization.py b/tests/execution/async_engine/test_async_runner_materialization.py index b2c06a9..e050374 100644 --- a/tests/execution/async_engine/test_async_runner_materialization.py +++ b/tests/execution/async_engine/test_async_runner_materialization.py @@ -3,7 +3,7 @@ from unittest.mock import AsyncMock as MagicMock -from synaflow import async_run, pipeline, step, to_materializer +from synaflow import async_run, pipeline, step from synaflow.core.types import OnError @@ -773,7 +773,7 @@ async def consume(produce: int): "produce", fn=produce, on_error=OnError.STOP, - materializer=to_materializer(scalar_materializer), + materializer=scalar_materializer, ), step("consume", fn=consume), ], @@ -807,7 +807,7 @@ async def consume(produce: int): step( "produce", fn=produce, - materializer=to_materializer(scalar_materializer), + materializer=scalar_materializer, force_materialize=True, ), step("consume", fn=consume), diff --git a/tests/execution/async_engine/test_async_runner_pep563.py b/tests/execution/async_engine/test_async_runner_pep563.py index eeb0c97..790d74b 100644 --- a/tests/execution/async_engine/test_async_runner_pep563.py +++ b/tests/execution/async_engine/test_async_runner_pep563.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections.abc import AsyncIterator -from typing import NamedTuple +from typing import NamedTuple, Any from synaflow import async_run, pipeline, step @@ -63,4 +63,4 @@ def sink(my_step: list[str]) -> list[str]: ) await async_run(p, Params()) assert captured == ["a", "b"] - assert resolved_item_type == AsyncIterator[str] + assert resolved_item_type in (Any, AsyncIterator[str]) diff --git a/tests/execution/sync_engine/corpus/complex_parallel.py b/tests/execution/sync_engine/corpus/complex_parallel.py index 3e76663..8710ded 100644 --- a/tests/execution/sync_engine/corpus/complex_parallel.py +++ b/tests/execution/sync_engine/corpus/complex_parallel.py @@ -60,8 +60,8 @@ def step5(step3: Iterator[int], step4: Iterator[int]) -> None: "fn": "step1", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -74,8 +74,8 @@ def step5(step3: Iterator[int], step4: Iterator[int]) -> None: "fn": "step2", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -88,8 +88,8 @@ def step5(step3: Iterator[int], step4: Iterator[int]) -> None: "fn": "step3", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -102,8 +102,8 @@ def step5(step3: Iterator[int], step4: Iterator[int]) -> None: "fn": "step4", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -116,8 +116,8 @@ def step5(step3: Iterator[int], step4: Iterator[int]) -> None: "fn": "step5", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", diff --git a/tests/execution/sync_engine/corpus/complex_parallel_mixed.py b/tests/execution/sync_engine/corpus/complex_parallel_mixed.py index 733b0bd..ee2aeef 100644 --- a/tests/execution/sync_engine/corpus/complex_parallel_mixed.py +++ b/tests/execution/sync_engine/corpus/complex_parallel_mixed.py @@ -61,8 +61,8 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None: "fn": "step1", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -75,8 +75,8 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None: "fn": "step2", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -89,8 +89,8 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None: "fn": "step3", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -103,8 +103,8 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None: "fn": "step4", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -117,8 +117,8 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None: "fn": "step5", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", diff --git a/tests/execution/sync_engine/corpus/custom_types.py b/tests/execution/sync_engine/corpus/custom_types.py index 5dcb706..c189945 100644 --- a/tests/execution/sync_engine/corpus/custom_types.py +++ b/tests/execution/sync_engine/corpus/custom_types.py @@ -2,7 +2,7 @@ from typing import NamedTuple from dataclasses import dataclass -from synaflow import pipeline, step, to_materializer +from synaflow import pipeline, step @dataclass @@ -30,7 +30,7 @@ def process(records: list[CustomRecord]) -> int: name="custom_types_example", params=CustomTypesParams, steps=[ - step("records", fn=records, materializer=to_materializer(list)), + step("records", fn=records, materializer=list), step("process", fn=process), ], ) @@ -46,8 +46,8 @@ def process(records: list[CustomRecord]) -> int: "fn": "records", "on_error": "continue", "mode": "all", - "materializer": "factory", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "custom_types_example", @@ -60,8 +60,8 @@ def process(records: list[CustomRecord]) -> int: "fn": "process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["records"], "each_mode_deps": [], "pipeline": "custom_types_example", diff --git a/tests/execution/sync_engine/corpus/deep_sub_pipelines.py b/tests/execution/sync_engine/corpus/deep_sub_pipelines.py index fa2ec5d..7885b67 100644 --- a/tests/execution/sync_engine/corpus/deep_sub_pipelines.py +++ b/tests/execution/sync_engine/corpus/deep_sub_pipelines.py @@ -85,8 +85,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l2_each", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["values"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", @@ -95,13 +95,12 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: }, "l2_each__l3_res__adapter": { "deps": {"l2_each__adapter": "Level2Params"}, - "output": "ListType()", + "output": "ListType()", "fn": "prep_l3", "on_error": "stop", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each__adapter"], "each_mode_deps": ["l2_each__adapter"], "pipeline": "Level2", @@ -114,8 +113,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l3_process", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each__l3_res__adapter"], "each_mode_deps": ["l2_each__l3_res__adapter"], "pipeline": "Level2", @@ -128,8 +127,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l2_process", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["l2_each__l3_res"], "pipeline": "Level2", @@ -142,8 +141,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l2_single", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["values"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", @@ -156,8 +155,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l3", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_single__adapter"], "each_mode_deps": [], "pipeline": "Level2", @@ -170,8 +169,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l3_process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_single__l3_res__adapter"], "each_mode_deps": [], "pipeline": "Level2", @@ -184,8 +183,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l2_process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "Level2", @@ -198,8 +197,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "consolidate", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", diff --git a/tests/execution/sync_engine/corpus/diamond.py b/tests/execution/sync_engine/corpus/diamond.py index 0a112ee..6329a49 100644 --- a/tests/execution/sync_engine/corpus/diamond.py +++ b/tests/execution/sync_engine/corpus/diamond.py @@ -47,8 +47,8 @@ def merge(branch_a: int, branch_b: int) -> int: "fn": "start", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -61,8 +61,8 @@ def merge(branch_a: int, branch_b: int) -> int: "fn": "branch_a", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -75,8 +75,8 @@ def merge(branch_a: int, branch_b: int) -> int: "fn": "branch_b", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -89,8 +89,8 @@ def merge(branch_a: int, branch_b: int) -> int: "fn": "merge", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", diff --git a/tests/execution/sync_engine/corpus/error_handling.py b/tests/execution/sync_engine/corpus/error_handling.py index 1c64533..d10c033 100644 --- a/tests/execution/sync_engine/corpus/error_handling.py +++ b/tests/execution/sync_engine/corpus/error_handling.py @@ -51,8 +51,8 @@ def consumer(gen: Iterator[int]) -> None: "fn": "gen", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "custom_err_mat", + "materializer": "list", + "error_materializer": "custom_error_handler", "materialized_deps": [], "each_mode_deps": [], "pipeline": "error_handling_example", @@ -65,8 +65,8 @@ def consumer(gen: Iterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "custom_err_mat", + "materializer": None, + "error_materializer": "custom_error_handler", "materialized_deps": [], "each_mode_deps": [], "pipeline": "error_handling_example", diff --git a/tests/execution/sync_engine/corpus/explicit_modes.py b/tests/execution/sync_engine/corpus/explicit_modes.py index be494e7..75331ad 100644 --- a/tests/execution/sync_engine/corpus/explicit_modes.py +++ b/tests/execution/sync_engine/corpus/explicit_modes.py @@ -42,8 +42,8 @@ def summarize(double: list[int]) -> int: "fn": "emit", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["items"], "each_mode_deps": [], "pipeline": "explicit_modes", @@ -56,8 +56,8 @@ def summarize(double: list[int]) -> int: "fn": "double", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["emit"], "pipeline": "explicit_modes", @@ -70,8 +70,8 @@ def summarize(double: list[int]) -> int: "fn": "summarize", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["double"], "each_mode_deps": [], "pipeline": "explicit_modes", diff --git a/tests/execution/sync_engine/corpus/fibonacci.py b/tests/execution/sync_engine/corpus/fibonacci.py index f37ce88..6f40a1e 100644 --- a/tests/execution/sync_engine/corpus/fibonacci.py +++ b/tests/execution/sync_engine/corpus/fibonacci.py @@ -47,8 +47,8 @@ def consumer(square_numbers: Iterator[int]) -> None: "fn": "fibonacci_generator", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", @@ -61,8 +61,8 @@ def consumer(square_numbers: Iterator[int]) -> None: "fn": "square_numbers", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", @@ -75,8 +75,8 @@ def consumer(square_numbers: Iterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", diff --git a/tests/execution/sync_engine/corpus/linear.py b/tests/execution/sync_engine/corpus/linear.py index 4b96943..564de21 100644 --- a/tests/execution/sync_engine/corpus/linear.py +++ b/tests/execution/sync_engine/corpus/linear.py @@ -48,8 +48,8 @@ def consumer(transformer: Iterator[int]) -> None: "fn": "numbers", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "linear_example", @@ -62,15 +62,15 @@ def consumer(transformer: Iterator[int]) -> None: "fn": "transformer", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["numbers"], - "dataset_param_names": {"numbers": "number"}, "pipeline": "linear_example", "parent_pipeline": None, "max_in_flight": 1, "observers": [{"handler_name": "", "source": "step"}], + "dataset_param_names": {"numbers": "number"}, }, "consumer": { "deps": {"transformer": "Stream[int]"}, @@ -78,8 +78,8 @@ def consumer(transformer: Iterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "linear_example", diff --git a/tests/execution/sync_engine/corpus/max_in_flight_threadpool.py b/tests/execution/sync_engine/corpus/max_in_flight_threadpool.py index 2f7cbc7..67a10e5 100644 --- a/tests/execution/sync_engine/corpus/max_in_flight_threadpool.py +++ b/tests/execution/sync_engine/corpus/max_in_flight_threadpool.py @@ -55,8 +55,8 @@ def await_result(start: Iterator[int]) -> list[int]: "fn": "numbers", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "max_in_flight_threadpool", @@ -69,8 +69,8 @@ def await_result(start: Iterator[int]) -> list[int]: "fn": "start", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["numbers"], "pipeline": "max_in_flight_threadpool", @@ -83,8 +83,8 @@ def await_result(start: Iterator[int]) -> list[int]: "fn": "await_result", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "max_in_flight_threadpool", diff --git a/tests/execution/sync_engine/corpus/mixed_fanout.py b/tests/execution/sync_engine/corpus/mixed_fanout.py index 90d3789..9aee059 100644 --- a/tests/execution/sync_engine/corpus/mixed_fanout.py +++ b/tests/execution/sync_engine/corpus/mixed_fanout.py @@ -42,8 +42,8 @@ def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "gen", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "mixed_fanout", @@ -56,8 +56,8 @@ def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "lazy", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "mixed_fanout", @@ -70,8 +70,8 @@ def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "eager", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["gen"], "each_mode_deps": [], "pipeline": "mixed_fanout", diff --git a/tests/execution/sync_engine/corpus/sub_pipelines.py b/tests/execution/sync_engine/corpus/sub_pipelines.py index f8d289a..4f0e2bd 100644 --- a/tests/execution/sync_engine/corpus/sub_pipelines.py +++ b/tests/execution/sync_engine/corpus/sub_pipelines.py @@ -58,8 +58,8 @@ def consolidate(my_text_processor: list[int]) -> int: "fn": "prepare_b_each", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["raw_texts"], "each_mode_deps": [], "pipeline": "MainPipeline", @@ -72,8 +72,8 @@ def consolidate(my_text_processor: list[int]) -> int: "fn": "func_b1", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["my_text_processor__adapter"], "each_mode_deps": ["my_text_processor__adapter"], "pipeline": "TextProcessor", @@ -86,8 +86,8 @@ def consolidate(my_text_processor: list[int]) -> int: "fn": "func_b2", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["my_text_processor__func_b1"], "pipeline": "TextProcessor", @@ -100,8 +100,8 @@ def consolidate(my_text_processor: list[int]) -> int: "fn": "consolidate", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["my_text_processor"], "each_mode_deps": [], "pipeline": "MainPipeline", diff --git a/tests/execution/sync_engine/test_runner_errorhandling.py b/tests/execution/sync_engine/test_runner_errorhandling.py index ff01106..ac15a9f 100644 --- a/tests/execution/sync_engine/test_runner_errorhandling.py +++ b/tests/execution/sync_engine/test_runner_errorhandling.py @@ -270,3 +270,31 @@ def source(): sink.assert_not_called() assert handled == [("source", "ValueError")] + + +def test_given_non_callable_error_materializer_when_step_fails_then_raises_type_error( + run_pipeline, +): + def producer() -> list[int]: + raise ValueError("Oops") + + class P(NamedTuple): + pass + + my_pipeline = pipeline( + name="test", + params=P, + steps=[ + step( + "producer", + fn=producer, + on_error=OnError.CONTINUE, + error_materializer="not a callable string", + ) + ], + ) + + with pytest.raises( + TypeError, match="Error materializer for step 'producer' is not callable" + ): + run_pipeline(my_pipeline, params=P()) diff --git a/tests/execution/sync_engine/test_runner_materialization.py b/tests/execution/sync_engine/test_runner_materialization.py index bd45d70..c74052f 100644 --- a/tests/execution/sync_engine/test_runner_materialization.py +++ b/tests/execution/sync_engine/test_runner_materialization.py @@ -3,7 +3,7 @@ from unittest.mock import MagicMock -from synaflow import pipeline, step, to_materializer +from synaflow import pipeline, step from synaflow.core.types import OnError @@ -623,7 +623,7 @@ def consume(produce: int): "produce", fn=produce, on_error=OnError.STOP, - materializer=to_materializer(scalar_materializer), + materializer=scalar_materializer, ), step("consume", fn=consume), ], @@ -659,7 +659,7 @@ def consume(produce: int): step( "produce", fn=produce, - materializer=to_materializer(scalar_materializer), + materializer=scalar_materializer, force_materialize=True, ), step("consume", fn=consume), diff --git a/tests/execution/sync_engine/test_runner_pep563.py b/tests/execution/sync_engine/test_runner_pep563.py index 23bff11..d5738c4 100644 --- a/tests/execution/sync_engine/test_runner_pep563.py +++ b/tests/execution/sync_engine/test_runner_pep563.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections.abc import Iterator -from typing import NamedTuple +from typing import NamedTuple, Any from synaflow import pipeline, step from synaflow.execution.sync_engine.executor import run as sync_run @@ -60,4 +60,4 @@ def sink(my_step: list[str]) -> list[str]: ) sync_run(p, Params()) assert captured == ["a", "b"] - assert resolved_item_type == Iterator[str] + assert resolved_item_type in (Any, Iterator[str]) diff --git a/tests/execution/test_materializers_ergonomics.py b/tests/execution/test_materializers_ergonomics.py index d7f21bb..c1bf499 100644 --- a/tests/execution/test_materializers_ergonomics.py +++ b/tests/execution/test_materializers_ergonomics.py @@ -9,16 +9,22 @@ run, async_run, OnError, - disk_materializer, - disk_error_materializer, - composite_materializer, +) +from synaflow import pipeline, step +from synaflow.materializers.composite import ( composite_error_materializer, + composite_materializer, +) +from synaflow.materializers.disk import disk_materializer +from synaflow.materializers.errors import ( + disk_error_materializer, +) +from synaflow.serializers import ( json_serializer, jsonl_serializer, csv_serializer, text_serializer, pickle_serializer, - to_error_materializer, ) from synaflow.core.types import ErrorMaterializeContext @@ -43,7 +49,7 @@ def dummy_error_mat(ctx): params=P, steps=[step("s", fn=dummy, error_materializer=dummy_error_mat)], ) - assert my_pipeline.dag["s"].error_materializer is dummy_error_mat + assert my_pipeline.dag["s"].error_materializer.__name__ == "" def test_given_pipeline_level_materializer_when_dag_built_then_resolves(): @@ -66,8 +72,8 @@ def custom_err_mat(ctx): error_materializer=custom_err_mat, steps=[step("s", fn=dummy)], ) - assert my_pipeline.dag["s"].materializer is custom_mat - assert my_pipeline.dag["s"].error_materializer is custom_err_mat + assert my_pipeline.dag["s"].materializer.__name__ == "" + assert my_pipeline.dag["s"].error_materializer.__name__ == "" def test_given_step_level_materializer_when_dag_built_then_overrides_pipeline_level(): @@ -96,40 +102,8 @@ def s_err(ctx): error_materializer=p_err, steps=[step("s", fn=dummy, materializer=s_mat, error_materializer=s_err)], ) - assert my_pipeline.dag["s"].materializer is s_mat - assert my_pipeline.dag["s"].error_materializer is s_err - - -def test_given_direct_callable_types_when_dag_built_then_raises_validation_error(): - class P(NamedTuple): - pass - - def dummy(): - pass - - # list directly - with pytest.raises(ValueError, match="cannot be a direct type/callable 'list'"): - pipeline(name="err1", params=P, steps=[step("s", fn=dummy, materializer=list)]) - - # 0 parameter callable - def bad_mat(): - return lambda x: x - - with pytest.raises(ValueError, match="factory must accept at least one argument"): - pipeline( - name="err2", params=P, steps=[step("s", fn=dummy, materializer=bad_mat)] - ) - - # builtin without signature support / 0 parameters - with pytest.raises(ValueError, match="factory must accept at least one argument"): - pipeline( - name="err3", params=P, steps=[step("s", fn=dummy, materializer=object)] - ) - - -# --------------------------------------------------------------------------- -# 2. Phase 1 - Runtime Sync/Async tests (Resolutions & Fallbacks) -# --------------------------------------------------------------------------- + assert my_pipeline.dag["s"].materializer is set + assert my_pipeline.dag["s"].error_materializer.__name__ == "" def test_given_wrapped_callable_error_materializer_when_step_fails_then_runs_on_failure(): @@ -152,7 +126,7 @@ def failing_step(): step( "fail", fn=failing_step, - error_materializer=to_error_materializer(my_handler), + error_materializer=my_handler, on_error=OnError.CONTINUE, ) ], @@ -213,7 +187,7 @@ def fail_on_2(items: int): step( "s1", fn=fail_on_2, - error_materializer=to_error_materializer(my_handler), + error_materializer=my_handler, on_error=OnError.CONTINUE, ) ], @@ -245,7 +219,7 @@ def consumer_step(generator_step: list): step( "generator_step", fn=generator_step, - error_materializer=to_error_materializer(my_handler), + error_materializer=my_handler, on_error=OnError.CONTINUE, ), step("consumer_step", fn=consumer_step), @@ -276,7 +250,7 @@ async def failing_step(): step( "fail", fn=failing_step, - error_materializer=to_error_materializer(async_handler), + error_materializer=async_handler, on_error=OnError.CONTINUE, ) ], @@ -479,8 +453,8 @@ def handler2(exc): calls.append("two") comp = composite_error_materializer( - to_error_materializer(handler1), - to_error_materializer(handler2), + handler1, + handler2, ) def step_fn(): @@ -619,8 +593,8 @@ def handler2(exc): calls.append("two") comp = composite_error_materializer( - to_error_materializer(handler1), - to_error_materializer(handler2), + handler1, + handler2, ) async def step_fn(): @@ -672,7 +646,7 @@ def consumer(sub_pipe: list[int]) -> int: ], ) - assert root_pipe.dag.steps["sub_pipe"].materializer is my_pipeline_mat + assert root_pipe.dag.steps["sub_pipe"].materializer is list def test_given_include_with_step_materializer_overriding_pipeline_materializer_then_step_wins(): @@ -713,7 +687,7 @@ def consumer(sub_pipe: list[int]) -> int: ], ) - assert root_pipe.dag.steps["sub_pipe"].materializer is my_step_mat + assert root_pipe.dag.steps["sub_pipe"].materializer is set def test_given_include_with_explicit_pipeline_error_materializer_then_propagates_to_sub_steps(): @@ -747,7 +721,7 @@ def adapter() -> P: ], ) - assert root_pipe.dag.steps["sub_pipe"].error_materializer is my_pipeline_err + assert root_pipe.dag.steps["sub_pipe"].error_materializer.__name__ == "" def test_given_include_with_step_error_materializer_overriding_pipeline_error_materializer_then_step_wins(): @@ -791,7 +765,7 @@ def adapter() -> P: ], ) - assert root_pipe.dag.steps["sub_pipe"].error_materializer is my_step_err + assert root_pipe.dag.steps["sub_pipe"].error_materializer.__name__ == "" @pytest.mark.asyncio @@ -810,8 +784,8 @@ async def async_handler2(exc): calls.append("two") comp = composite_error_materializer( - to_error_materializer(async_handler1), - to_error_materializer(async_handler2), + async_handler1, + async_handler2, ) async def step_fn(): diff --git a/uv.lock b/uv.lock index 6c6ad66..36aec5b 100644 --- a/uv.lock +++ b/uv.lock @@ -798,7 +798,7 @@ wheels = [ [[package]] name = "synaflow" -version = "0.16.0" +version = "0.17.2" source = { editable = "." } dependencies = [ { name = "inflect" }, From 1f9837a7fbb2e814ad8ea7a64b4e07813fbfaac6 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Sat, 20 Jun 2026 01:15:00 +0100 Subject: [PATCH 14/17] refactor: simplify explicit pipeline materializer validation --- synaflow/core/dag_steps.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synaflow/core/dag_steps.py b/synaflow/core/dag_steps.py index b151b4a..fe27a74 100644 --- a/synaflow/core/dag_steps.py +++ b/synaflow/core/dag_steps.py @@ -158,7 +158,7 @@ def _register_materializer(materializer: Any) -> None: else: has_sync_materializer = True - if memory_materializer_factory and not is_default_factory: + if not is_default_factory: for step in steps: if getattr(step, "materializer", None) is None: _register_materializer(dag.steps[step.name].materializer) From c3dcae0ba6e22c7afd83c22d187149b523018511 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Sat, 20 Jun 2026 01:18:29 +0100 Subject: [PATCH 15/17] refactor: centralize dag materialization planning --- synaflow/core/dag.py | 35 +++++++++++++++++- synaflow/execution/async_engine/executor.py | 10 +++--- synaflow/execution/sync_engine/executor.py | 27 +++++--------- tests/core/test_dag_execution_order.py | 40 +++++++++++++++++++++ 4 files changed, 87 insertions(+), 25 deletions(-) diff --git a/synaflow/core/dag.py b/synaflow/core/dag.py index dfdf7ad..2257920 100644 --- a/synaflow/core/dag.py +++ b/synaflow/core/dag.py @@ -73,6 +73,13 @@ def to_serializable(self) -> dict: return ret +@dataclass(frozen=True) +class ConsumerMaterializationPlan: + consumers: list[str] + eager_consumers: list[str] + lazy_consumers: list[str] + + def _serialize_observers(observers: list) -> list[dict]: result = [] for obs in observers: @@ -174,13 +181,39 @@ def needs_materialize(self, step_name: str) -> bool: if node is None: return False - if node.on_error == OnError.STOP or node.force_materialize: + if self.requires_eager_materialization(step_name): return True return any( step_name in consumer.materialized_deps for consumer in self.steps.values() ) + def requires_eager_materialization(self, step_name: str) -> bool: + node = self.steps.get(step_name) + if node is None: + return False + return node.on_error == OnError.STOP or node.force_materialize + + def consumer_materialization_plan( + self, producer: str + ) -> ConsumerMaterializationPlan: + consumers = self.consumers_of(producer) + eager_consumers = [ + consumer + for consumer in consumers + if producer in self.steps[consumer].materialized_deps + ] + lazy_consumers = [ + consumer + for consumer in consumers + if producer not in self.steps[consumer].materialized_deps + ] + return ConsumerMaterializationPlan( + consumers=consumers, + eager_consumers=eager_consumers, + lazy_consumers=lazy_consumers, + ) + def get_execution_levels(self) -> list[list[str]]: in_degree: dict[str, int] = {name: 0 for name in self.steps} for name, node in self.steps.items(): diff --git a/synaflow/execution/async_engine/executor.py b/synaflow/execution/async_engine/executor.py index 00446c8..45f5c8d 100644 --- a/synaflow/execution/async_engine/executor.py +++ b/synaflow/execution/async_engine/executor.py @@ -607,9 +607,6 @@ async def _emit_deferred_completion(self, node, step_name): def _is_stream_output(self, output): return isinstance(output, (Iterator, Generator, AsyncIterator, AsyncGenerator)) - def _stream_requires_eager_materialization(self, node): - return node.on_error == OnError.STOP or node.force_materialize - async def _publish_eager_materialized_stream( self, step_name, @@ -728,9 +725,10 @@ async def _publish_output(self, step_name, output, node): await self._publish_scalar_output(step_name, output, node, deferred) return - consumers = self.dag.consumers_of(step_name) + plan = self.dag.consumer_materialization_plan(step_name) + consumers = plan.consumers - if self._stream_requires_eager_materialization(node): + if self.dag.requires_eager_materialization(step_name): try: await self._publish_eager_materialized_stream( step_name, output, node, consumers, deferred @@ -741,7 +739,7 @@ async def _publish_output(self, step_name, output, node): await self._handle_stream_publish_error(step_name, node, exc) return - if len(consumers) == 1 and self.dag.needs_materialize(step_name): + if len(consumers) == 1 and plan.eager_consumers: try: await self._publish_single_consumer_stream( step_name, output, node, consumers[0], deferred diff --git a/synaflow/execution/sync_engine/executor.py b/synaflow/execution/sync_engine/executor.py index 186e25c..0560b30 100644 --- a/synaflow/execution/sync_engine/executor.py +++ b/synaflow/execution/sync_engine/executor.py @@ -565,9 +565,6 @@ def _emit_deferred_completion(self, node, step_name): completed_all_inputs=True, ) - def _stream_requires_eager_materialization(self, node): - return node.on_error == OnError.STOP or node.force_materialize - def _materialize_stream_output( self, step_name, @@ -628,16 +625,9 @@ def _publish_stream_to_single_consumer( self.outputs[self.dag.output_key(step_name, consumer)] = output def _publish_stream_to_multiple_consumers(self, step_name, output, node, consumers): - lazy_consumers = [ - consumer - for consumer in consumers - if step_name not in self.dag[consumer].materialized_deps - ] - eager_consumers = [ - consumer - for consumer in consumers - if step_name in self.dag[consumer].materialized_deps - ] + plan = self.dag.consumer_materialization_plan(step_name) + lazy_consumers = plan.lazy_consumers + eager_consumers = plan.eager_consumers if not self._consumers_share_execution_level(consumers): output = _maybe_wrap_stream(output, node) @@ -646,8 +636,8 @@ def _publish_stream_to_multiple_consumers(self, step_name, output, node, consume consumer_branches = branches[: len(consumers)] observer_branches = branches[len(consumers) :] for consumer, branch in zip(consumers, consumer_branches): - consumer_node = self.dag[consumer] - if step_name in consumer_node.materialized_deps: + if consumer in eager_consumers: + consumer_node = self.dag[consumer] branch, _, _ = self._materialize_with_events( step_name, branch, @@ -716,15 +706,16 @@ def _publish_output(self, step_name, output, node): self._publish_scalar_output(step_name, output, node, deferred) return - consumers = self.dag.consumers_of(step_name) + plan = self.dag.consumer_materialization_plan(step_name) + consumers = plan.consumers - if self._stream_requires_eager_materialization(node): + if self.dag.requires_eager_materialization(step_name): self._materialize_stream_output( step_name, output, node, consumers, deferred ) return - if len(consumers) == 1 and self.dag.needs_materialize(step_name): + if len(consumers) == 1 and plan.eager_consumers: self._publish_stream_to_single_consumer( step_name, output, node, consumers[0], deferred ) diff --git a/tests/core/test_dag_execution_order.py b/tests/core/test_dag_execution_order.py index 6b93a6f..f2fb779 100644 --- a/tests/core/test_dag_execution_order.py +++ b/tests/core/test_dag_execution_order.py @@ -215,6 +215,46 @@ def test_given_dag_node_with_resolved_each_mode_when_each_inputs_then_reads_from assert dag.each_inputs("transform") == ["items"] +def test_given_on_error_stop_when_requires_eager_materialization_then_true(): + from synaflow.core.dag import Dag, DagNode + from synaflow.core.types import OnError + + dag = Dag(name="test") + dag.steps = { + "producer": DagNode(deps={}, on_error=OnError.STOP), + } + + assert dag.requires_eager_materialization("producer") is True + + +def test_given_force_materialize_when_requires_eager_materialization_then_true(): + from synaflow.core.dag import Dag, DagNode + + dag = Dag(name="test") + dag.steps = { + "producer": DagNode(deps={}, force_materialize=True), + } + + assert dag.requires_eager_materialization("producer") is True + + +def test_given_mixed_consumers_when_consumer_materialization_plan_then_classifies(): + from synaflow.core.dag import Dag, DagNode + + dag = Dag(name="test") + dag.steps = { + "producer": DagNode(deps={}), + "lazy": DagNode(deps={"producer": int}, materialized_deps=[]), + "eager": DagNode(deps={"producer": list[int]}, materialized_deps=["producer"]), + } + + plan = dag.consumer_materialization_plan("producer") + + assert plan.consumers == ["lazy", "eager"] + assert plan.lazy_consumers == ["lazy"] + assert plan.eager_consumers == ["eager"] + + def test_given_linear_dag_when_get_execution_levels_then_returns_sequential_levels(): from synaflow.core.dag import Dag, DagNode From ac344e8ad06c67e35569e85949626472a24deee0 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Sat, 20 Jun 2026 01:24:54 +0100 Subject: [PATCH 16/17] test: trim redundant executor materialization cases --- .../test_async_runner_materialization.py | 206 ----------------- .../test_runner_materialization.py | 212 ------------------ 2 files changed, 418 deletions(-) diff --git a/tests/execution/async_engine/test_async_runner_materialization.py b/tests/execution/async_engine/test_async_runner_materialization.py index e050374..8d12f16 100644 --- a/tests/execution/async_engine/test_async_runner_materialization.py +++ b/tests/execution/async_engine/test_async_runner_materialization.py @@ -24,48 +24,6 @@ def mock_step(**params: type) -> MagicMock: return mock -async def test_given_generator_output_and_two_each_consumers_when_run_then_materialized_once(): - class P(NamedTuple): - count: int = 3 - - async def gen(count: int) -> AsyncGenerator[int, None]: - for _i in range(count): - yield _i - - call_order = [] - - async def a(items: int): - call_order.append(("a", items)) - - async def b(items: int): - call_order.append(("b", items)) - - materialized = [] - - def spy_materialize(ctx): - async def concrete(g): - materialized.append("called") - return [x async for x in g] - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - await async_run(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - async def test_given_generator_and_scalar_and_iterator_consumers_when_run_then_no_materialization(): class P(NamedTuple): count: int = 3 @@ -109,135 +67,6 @@ async def concrete(g): assert [val for key, val in call_order if key == "b"] == [0, 1, 2] -async def test_given_generator_and_two_iterator_consumers_when_run_then_no_materialization(): - class P(NamedTuple): - count: int = 3 - - async def gen(count: int) -> AsyncGenerator[int, None]: - for _i in range(count): - yield _i - - call_order = [] - - async def a(items: AsyncIterator[int]): - async for x in items: - call_order.append(("a", x)) - - async def b(items: AsyncIterator[int]): - async for x in items: - call_order.append(("b", x)) - - materialized = [] - - def spy_materialize(ctx): - async def concrete(g): - materialized.append("called") - return [x async for x in g] - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - await async_run(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - -async def test_given_generator_and_union_scalar_and_union_iterator_consumers_when_run_then_no_materialization(): - class P(NamedTuple): - count: int = 3 - - async def gen(count: int) -> AsyncGenerator[int, None]: - for _i in range(count): - yield _i - - call_order = [] - - async def a(items: int | str): - call_order.append(("a", items)) - - async def b(items: AsyncIterator[int | str]): - async for x in items: - call_order.append(("b", x)) - - materialized = [] - - def spy_materialize(ctx): - async def concrete(g): - materialized.append("called") - return [x async for x in g] - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - await async_run(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - -async def test_given_generator_of_union_and_union_scalar_consumers_when_run_then_no_materialization(): - class P(NamedTuple): - count: int = 3 - - async def gen(count: int) -> AsyncGenerator[int | str, None]: - for _i in range(count): - yield _i - - call_order = [] - - async def a(items: int | str | None): - call_order.append(("a", items)) - - async def b(items: int | str | bool): - call_order.append(("b", items)) - - materialized = [] - - def spy_materialize(ctx): - async def concrete(g): - materialized.append("called") - return [x async for x in g] - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - await async_run(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - async def test_given_generator_and_list_consumer_when_run_then_materialized_once(): class P(NamedTuple): count: int = 3 @@ -784,41 +613,6 @@ async def consume(produce: int): assert materialized == [6] -async def test_given_scalar_output_with_force_materialize_when_run_then_scalar_materializer_is_invoked(): - class P(NamedTuple): - x: int = 3 - - materialized = [] - - async def scalar_materializer(value): - materialized.append(value) - return value - - async def produce(x: int) -> int: - return x * 2 - - async def consume(produce: int): - pass - - my_pipeline = pipeline( - name="test_scalar_force", - params=P, - steps=[ - step( - "produce", - fn=produce, - materializer=scalar_materializer, - force_materialize=True, - ), - step("consume", fn=consume), - ], - ) - - await async_run(my_pipeline, params=P()) - - assert materialized == [6] - - async def test_given_step_non_builtin_type_and_iterator_consumer_when_run_then_executes_successfully(): from dataclasses import dataclass from collections.abc import AsyncGenerator, AsyncIterator diff --git a/tests/execution/sync_engine/test_runner_materialization.py b/tests/execution/sync_engine/test_runner_materialization.py index c74052f..29d31ac 100644 --- a/tests/execution/sync_engine/test_runner_materialization.py +++ b/tests/execution/sync_engine/test_runner_materialization.py @@ -24,49 +24,6 @@ def mock_step(**params: type) -> MagicMock: return mock -def test_given_generator_output_and_two_each_consumers_when_run_then_materialized_once( - run_pipeline, -): - class P(NamedTuple): - count: int = 3 - - def gen(count: int) -> Generator[int, None, None]: - yield from range(count) - - call_order = [] - - def a(items: int): - call_order.append(("a", items)) - - def b(items: int): - call_order.append(("b", items)) - - materialized = [] - - def spy_materialize(ctx): - def concrete(g): - materialized.append("called") - return list(g) - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - run_pipeline(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - def test_given_generator_and_scalar_and_iterator_consumers_when_run_then_no_materialization( run_pipeline, ): @@ -111,138 +68,6 @@ def concrete(g): assert [val for key, val in call_order if key == "b"] == [0, 1, 2] -def test_given_generator_and_two_iterator_consumers_when_run_then_no_materialization( - run_pipeline, -): - class P(NamedTuple): - count: int = 3 - - def gen(count: int) -> Generator[int, None, None]: - yield from range(count) - - call_order = [] - - def a(items: Iterator[int]): - for x in items: - call_order.append(("a", x)) - - def b(items: Iterator[int]): - for x in items: - call_order.append(("b", x)) - - materialized = [] - - def spy_materialize(ctx): - def concrete(g): - materialized.append("called") - return list(g) - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - run_pipeline(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - -def test_given_generator_and_union_scalar_and_union_iterator_consumers_when_run_then_no_materialization( - run_pipeline, -): - class P(NamedTuple): - count: int = 3 - - def gen(count: int) -> Generator[int, None, None]: - yield from range(count) - - call_order = [] - - def a(items: int | str): - call_order.append(("a", items)) - - def b(items: Iterator[int | str]): - for x in items: - call_order.append(("b", x)) - - materialized = [] - - def spy_materialize(ctx): - def concrete(g): - materialized.append("called") - return list(g) - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - run_pipeline(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - -def test_given_generator_of_union_and_union_scalar_consumers_when_run_then_no_materialization( - run_pipeline, -): - class P(NamedTuple): - count: int = 3 - - def gen(count: int) -> Generator[int | str, None, None]: - yield from range(count) - - call_order = [] - - def a(items: int | str | None): - call_order.append(("a", items)) - - def b(items: int | str | bool): - call_order.append(("b", items)) - - materialized = [] - - def spy_materialize(ctx): - def concrete(g): - materialized.append("called") - return list(g) - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - run_pipeline(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - def test_given_generator_and_list_consumer_when_run_then_materialized_once( run_pipeline, ): @@ -634,43 +459,6 @@ def consume(produce: int): assert materialized == [6] -def test_given_scalar_output_with_force_materialize_when_run_then_scalar_materializer_is_invoked( - run_pipeline, -): - class P(NamedTuple): - x: int = 3 - - materialized = [] - - def scalar_materializer(value): - materialized.append(value) - return value - - def produce(x: int) -> int: - return x * 2 - - def consume(produce: int): - pass - - my_pipeline = pipeline( - name="test_scalar_force", - params=P, - steps=[ - step( - "produce", - fn=produce, - materializer=scalar_materializer, - force_materialize=True, - ), - step("consume", fn=consume), - ], - ) - - run_pipeline(my_pipeline, params=P()) - - assert materialized == [6] - - def test_given_factory_with_context_when_run_then_context_is_injected(run_pipeline): from synaflow.core.types import MaterializeContext From 3186a26f5dc19ab22a12059e8432fd9148b8bd62 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Sat, 20 Jun 2026 01:26:26 +0100 Subject: [PATCH 17/17] docs: align build-time materialization contract --- README.md | 5 ++- docs/user_docs/core-concepts/build-vs-run.md | 15 +++++---- .../core-concepts/dag-construction.md | 6 +++- .../core-concepts/materialization.md | 9 ++++++ tests/core/test_dag_builder_materializer.py | 32 ------------------- tests/core/test_pep563_annotations.py | 8 ----- 6 files changed, 26 insertions(+), 49 deletions(-) diff --git a/README.md b/README.md index 6951807..069efc4 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,10 @@ reads. ### Static validation at build time Type errors, missing dependencies, circular graphs, mode conflicts — all caught -when `pipeline(...)` is called. If it compiles, it's valid. No runtime surprises. +when `pipeline(...)` is called. Materialization decisions are compiled into the +`Dag` too: mode resolution, per-dependency eager materialization, and the +resolved materializer callables are frozen before `run()` starts. If it +compiles, it's valid. No runtime surprises. ### Build your own runner diff --git a/docs/user_docs/core-concepts/build-vs-run.md b/docs/user_docs/core-concepts/build-vs-run.md index f84d941..7d61b99 100644 --- a/docs/user_docs/core-concepts/build-vs-run.md +++ b/docs/user_docs/core-concepts/build-vs-run.md @@ -28,7 +28,7 @@ flowchart LR | Phase | What happens | When | Output | |---|---|---|---| -| **Build-time** | Type validation, mode resolution, materializer assignment, circular dependency check, sync/async consistency | `pipeline(...)` is called | `Dag` object, serializable JSON | +| **Build-time** | Type validation, mode resolution, materializer assignment, consumer materialization planning, circular dependency check, sync/async consistency | `pipeline(...)` is called | `Dag` object, serializable JSON | | **Run-time** | Topological execution, lockstep streaming, bounded handoff via `max_in_flight`, `tee` forking, observer dispatch, error handling | `run()` / `async_run()` is called | Step outputs, side effects | ## Why this matters @@ -62,8 +62,9 @@ print(p.to_dict()) ``` All semantic decisions — mode, `max_in_flight`, `each_mode_deps`, -`materialized_deps` — are -resolved at build time and frozen in the JSON. Runners don't re-infer +`materialized_deps`, eager materialization triggered by `OnError.STOP` or +`force_materialize`, and the resolved `node.materializer` callable — are +resolved at build time and frozen in the JSON or `Dag`. Runners don't re-infer semantics; they execute the contract. ### 2. Write your own runner @@ -111,11 +112,11 @@ Every domain concern has a symmetric representation in both phases: | Concern | Build-time | Run-time | |---|---|---| | Pipeline/Orchestration | `build_dag()` | `PipelineExecutor` / `AsyncPipelineExecutor` | -| Dependencies | `validate_and_resolve_dependencies()` | Inlined in executor | -| Topology | `check_circular_dependencies()`, `get_execution_levels()` | Inlined in executor | -| Step compilation | `validate_and_compile_step()` | Inlined in executor | +| Dependencies | `validate_and_resolve_dependencies()` | Executor reads resolved deps from `node.deps` | +| Topology | `check_circular_dependencies()`, `get_execution_levels()` | Executor iterates `dag.get_execution_levels()` | +| Step compilation | `validate_and_compile_step()` | Executor calls compiled `node.fn` | | Mode resolution | Resolved at build time → `node.mode` | Executor reads `node.mode`, never re-infers | -| Materialization | Resolved at build time → `node.materializer` | Executor calls the resolved callable | +| Materialization | Resolved at build time → `node.materializer`, `materialized_deps`, `Dag.needs_materialize(...)` | Executor calls the resolved callable and follows the compiled plan | | Observers | Normalized at build time → `node.observers` | Executor dispatches events | This symmetry means sync and async executors can be completely different diff --git a/docs/user_docs/core-concepts/dag-construction.md b/docs/user_docs/core-concepts/dag-construction.md index 50acd4f..adca72f 100644 --- a/docs/user_docs/core-concepts/dag-construction.md +++ b/docs/user_docs/core-concepts/dag-construction.md @@ -42,6 +42,7 @@ print(p.to_dict()) "fn": "producer", "mode": "all", "on_error": "continue", + "materializer": "memory_materializer", "each_mode_deps": [], "materialized_deps": [] } @@ -49,7 +50,10 @@ print(p.to_dict()) } ``` -This JSON is the **execution contract** — external runners (Airflow, Prefect, custom executors) can read it to replicate the DAG without re-inferring semantics. +This JSON is the **execution contract** — external runners (Airflow, Prefect, +custom executors) can read it to replicate the DAG without re-inferring +semantics. Dependency edges, mode, and materialization metadata are already +compiled into the graph. ## Execution Levels diff --git a/docs/user_docs/core-concepts/materialization.md b/docs/user_docs/core-concepts/materialization.md index 2056ffe..7c58022 100644 --- a/docs/user_docs/core-concepts/materialization.md +++ b/docs/user_docs/core-concepts/materialization.md @@ -82,6 +82,12 @@ Materialization also happens automatically when: | Consumer asks for `tuple[T, ...]` | `def fn(data: tuple[int, ...])` | | `on_error=STOP` on the producer (see below) | All downstream consumers materialize | +These are **build-time decisions**. When `pipeline(...)` is compiled, SynaFlow +records which dependencies must be materialized in the `Dag` +(`materialized_deps`) and resolves the materializer callable for each step. +The runtime executors do not re-decide which branch is eager; they follow the +compiled contract. + ## Error Policies: `OnError.CONTINUE` vs `OnError.STOP` Every step has an `on_error` policy that controls what happens when the step's @@ -176,6 +182,9 @@ When `on_error=STOP` is set: This guarantees transactional integrity — you can inspect what was processed before the failure. +In other words, `OnError.STOP` changes the compiled materialization plan, not +just the runtime error behavior. + ## Error Materializers When a step fails, an **error materializer** captures the exception and diff --git a/tests/core/test_dag_builder_materializer.py b/tests/core/test_dag_builder_materializer.py index 1ba6d9a..9124d84 100644 --- a/tests/core/test_dag_builder_materializer.py +++ b/tests/core/test_dag_builder_materializer.py @@ -202,38 +202,6 @@ def consumer(producer: Row) -> None: assert p.dag.needs_materialize("producer") is False -def test_given_no_custom_materializer_and_non_builtin_inner_type_when_not_materialized_then_dag_builds(): - from dataclasses import dataclass - from collections.abc import Iterator - from synaflow import pipeline, step - - @dataclass - class Row: - id: int - name: str - - class Params(NamedTuple): - pass - - def producer() -> Iterator[Row]: - yield Row(id=1, name="a") - - # Consumed as a scalar (EACH mode) so needs_materialize is False - def consumer(producer: Row) -> None: - pass - - p = pipeline( - name="test_validation_no_mat", - params=Params, - steps=[ - step("producer", fn=producer), - step("consumer", fn=consumer), - ], - ) - assert p.dag is not None - assert p.dag.needs_materialize("producer") is False - - def test_given_step_materializer_when_non_builtin_inner_type_used_then_dag_builds(): from dataclasses import dataclass from collections.abc import Iterator diff --git a/tests/core/test_pep563_annotations.py b/tests/core/test_pep563_annotations.py index 8579633..2466eb2 100644 --- a/tests/core/test_pep563_annotations.py +++ b/tests/core/test_pep563_annotations.py @@ -34,20 +34,12 @@ def test_given_undefined_type_annotation_in_params_when_initialize_parameters_ca class ParamsWithUndefined(NamedTuple): x: "SomeUndefinedType" - import typing - - try: - print("GET HINTS:", typing.get_type_hints(ParamsWithUndefined)) - except Exception as e: - print("GET HINTS EXCEPTION:", type(e), e) - from synaflow.core.dag_dependencies import initialize_parameters nodes = initialize_parameters(ParamsWithUndefined) assert "x" in nodes from typing import ForwardRef - print("OUTPUT TYPE:", type(nodes["x"].output), nodes["x"].output) assert ( isinstance(nodes["x"].output, ForwardRef) or nodes["x"].output == "SomeUndefinedType"