From 646486a4e03b24d482da407ffd0c146f965927ea Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:38:23 +0100 Subject: [PATCH 01/12] refactor(core): catch specific exceptions (NameError, TypeError) in type hint evaluation --- synaflow/core/dag_dependencies.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synaflow/core/dag_dependencies.py b/synaflow/core/dag_dependencies.py index 60ca2bc..406971e 100644 --- a/synaflow/core/dag_dependencies.py +++ b/synaflow/core/dag_dependencies.py @@ -23,7 +23,7 @@ def initialize_parameters(params: type[NamedTuple]) -> dict[str, DagNode]: hints = {} try: hints = typing.get_type_hints(params) - except Exception: + except (NameError, TypeError): hints = getattr(params, "__annotations__", {}) for field in getattr(params, "_fields", []): tp = hints.get(field) @@ -34,7 +34,7 @@ def initialize_parameters(params: type[NamedTuple]) -> dict[str, DagNode]: def get_safe_type_hints(fn: Any) -> dict[str, Any]: try: return typing.get_type_hints(fn, include_extras=True) - except Exception: + except (NameError, TypeError): return {} From b5f5d985e699aa3f2bb16301be10ffa826513fc1 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:42:58 +0100 Subject: [PATCH 02/12] test(core): add PEP 563 runtime execution and materializer tests --- tests/core/test_pep563_annotations.py | 60 +++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/tests/core/test_pep563_annotations.py b/tests/core/test_pep563_annotations.py index 8579633..71cba02 100644 --- a/tests/core/test_pep563_annotations.py +++ b/tests/core/test_pep563_annotations.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections.abc import Iterator from typing import NamedTuple from synaflow import pipeline, step @@ -52,3 +53,62 @@ class ParamsWithUndefined(NamedTuple): isinstance(nodes["x"].output, ForwardRef) or nodes["x"].output == "SomeUndefinedType" ) + + +def test_given_future_annotations_when_run_then_executes_successfully(run_pipeline): + class Params(NamedTuple): + name: str = "" + + captured = None + + def my_step(name: str) -> str: + nonlocal captured + captured = name + return name + + p = pipeline( + name="test_future_annotations_run", + params=Params, + steps=[ + step("my_step", fn=my_step), + ], + ) + run_pipeline(p, Params(name="hello")) + assert captured == "hello" + + +def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object( + run_pipeline, +): + resolved_item_type = None + + def my_factory(ctx): + nonlocal resolved_item_type + resolved_item_type = ctx.item_type + return lambda it: list(it) + + class Params(NamedTuple): + pass + + def my_step() -> Iterator[str]: + yield "a" + yield "b" + + captured = None + + def sink(my_step: list[str]) -> list[str]: + nonlocal captured + captured = my_step + return my_step + + p = pipeline( + name="test_future_annotations_mat", + params=Params, + steps=[ + step("my_step", fn=my_step, materializer=my_factory), + step("sink", fn=sink), + ], + ) + run_pipeline(p, Params()) + assert captured == ["a", "b"] + assert resolved_item_type == Iterator[str] From d471e363b83140c3680a9ab168acdc7e4c6d51a5 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:48:19 +0100 Subject: [PATCH 03/12] test(core): add custom types and Future compatibility checks to type_compatibility tests --- tests/core/test_is_type_compatible.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/core/test_is_type_compatible.py b/tests/core/test_is_type_compatible.py index 6cc3958..e0ff300 100644 --- a/tests/core/test_is_type_compatible.py +++ b/tests/core/test_is_type_compatible.py @@ -1,9 +1,21 @@ from collections.abc import Generator, Iterator +from dataclasses import dataclass +from concurrent.futures import Future from typing import Any import pytest from synaflow.core.type_compatibility import is_type_compatible +@dataclass +class MyDataclass: + x: int + + +@dataclass +class OtherDataclass: + x: int + + @pytest.mark.parametrize( "producer, consumer, expected", [ @@ -23,6 +35,15 @@ # Mismatched types should still fail (list[int], list[str], False), (list[dict], list[int], True), + # Custom/specific type compatibility tests + (MyDataclass, MyDataclass, True), + (MyDataclass, OtherDataclass, False), + (list[MyDataclass], list[MyDataclass], True), + (list[MyDataclass], list[OtherDataclass], False), + (tuple[int, str], tuple[int, str], True), + (tuple[int, str], tuple[str, int], False), + (Future, Future, True), + (list[Future], list[Future], True), ], ) def test_given_bare_containers_when_checking_compatibility_then_returns_expected( From 5d35fa72cbb6f6c919996dd8a0d05895855e93ae Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:49:44 +0100 Subject: [PATCH 04/12] refactor(tests): move pep 563 runtime tests from core to execution module --- tests/core/test_pep563_annotations.py | 59 ------------------------ tests/execution/test_pep563_runtime.py | 64 ++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 59 deletions(-) create mode 100644 tests/execution/test_pep563_runtime.py diff --git a/tests/core/test_pep563_annotations.py b/tests/core/test_pep563_annotations.py index 71cba02..847cafa 100644 --- a/tests/core/test_pep563_annotations.py +++ b/tests/core/test_pep563_annotations.py @@ -53,62 +53,3 @@ class ParamsWithUndefined(NamedTuple): isinstance(nodes["x"].output, ForwardRef) or nodes["x"].output == "SomeUndefinedType" ) - - -def test_given_future_annotations_when_run_then_executes_successfully(run_pipeline): - class Params(NamedTuple): - name: str = "" - - captured = None - - def my_step(name: str) -> str: - nonlocal captured - captured = name - return name - - p = pipeline( - name="test_future_annotations_run", - params=Params, - steps=[ - step("my_step", fn=my_step), - ], - ) - run_pipeline(p, Params(name="hello")) - assert captured == "hello" - - -def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object( - run_pipeline, -): - resolved_item_type = None - - def my_factory(ctx): - nonlocal resolved_item_type - resolved_item_type = ctx.item_type - return lambda it: list(it) - - class Params(NamedTuple): - pass - - def my_step() -> Iterator[str]: - yield "a" - yield "b" - - captured = None - - def sink(my_step: list[str]) -> list[str]: - nonlocal captured - captured = my_step - return my_step - - p = pipeline( - name="test_future_annotations_mat", - params=Params, - steps=[ - step("my_step", fn=my_step, materializer=my_factory), - step("sink", fn=sink), - ], - ) - run_pipeline(p, Params()) - assert captured == ["a", "b"] - assert resolved_item_type == Iterator[str] diff --git a/tests/execution/test_pep563_runtime.py b/tests/execution/test_pep563_runtime.py new file mode 100644 index 0000000..5b15e55 --- /dev/null +++ b/tests/execution/test_pep563_runtime.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from collections.abc import Iterator +from typing import NamedTuple +from synaflow import pipeline, step + + +def test_given_future_annotations_when_run_then_executes_successfully(run_pipeline): + class Params(NamedTuple): + name: str = "" + + captured = None + + def my_step(name: str) -> str: + nonlocal captured + captured = name + return name + + p = pipeline( + name="test_future_annotations_run", + params=Params, + steps=[ + step("my_step", fn=my_step), + ], + ) + run_pipeline(p, Params(name="hello")) + assert captured == "hello" + + +def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object( + run_pipeline, +): + resolved_item_type = None + + def my_factory(ctx): + nonlocal resolved_item_type + resolved_item_type = ctx.item_type + return lambda it: list(it) + + class Params(NamedTuple): + pass + + def my_step() -> Iterator[str]: + yield "a" + yield "b" + + captured = None + + def sink(my_step: list[str]) -> list[str]: + nonlocal captured + captured = my_step + return my_step + + p = pipeline( + name="test_future_annotations_mat", + params=Params, + steps=[ + step("my_step", fn=my_step, materializer=my_factory), + step("sink", fn=sink), + ], + ) + run_pipeline(p, Params()) + assert captured == ["a", "b"] + assert resolved_item_type == Iterator[str] From af1f7d1d2f99beaa9e937ab1fb753529b4a45201 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:50:29 +0100 Subject: [PATCH 05/12] chore(tests): remove unused Iterator import from core test file --- tests/core/test_pep563_annotations.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/core/test_pep563_annotations.py b/tests/core/test_pep563_annotations.py index 847cafa..8579633 100644 --- a/tests/core/test_pep563_annotations.py +++ b/tests/core/test_pep563_annotations.py @@ -1,6 +1,5 @@ from __future__ import annotations -from collections.abc import Iterator from typing import NamedTuple from synaflow import pipeline, step From 77b60708af953dcee9e9cb96c89bbbc0a9dffbe7 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 18:53:46 +0100 Subject: [PATCH 06/12] test: maintain parity by moving PEP 563 runtime tests to sync and async engines --- .../async_engine/test_async_runner_pep563.py | 66 +++++++++++++++++++ .../test_runner_pep563.py} | 11 ++-- 2 files changed, 71 insertions(+), 6 deletions(-) create mode 100644 tests/execution/async_engine/test_async_runner_pep563.py rename tests/execution/{test_pep563_runtime.py => sync_engine/test_runner_pep563.py} (88%) diff --git a/tests/execution/async_engine/test_async_runner_pep563.py b/tests/execution/async_engine/test_async_runner_pep563.py new file mode 100644 index 0000000..eeb0c97 --- /dev/null +++ b/tests/execution/async_engine/test_async_runner_pep563.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import NamedTuple +from synaflow import async_run, pipeline, step + + +async def test_given_future_annotations_when_run_then_executes_successfully(): + class Params(NamedTuple): + name: str = "" + + captured = None + + def my_step(name: str) -> str: + nonlocal captured + captured = name + return name + + p = pipeline( + name="test_future_annotations_run", + params=Params, + steps=[ + step("my_step", fn=my_step), + ], + ) + await async_run(p, Params(name="hello")) + assert captured == "hello" + + +async def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object(): + resolved_item_type = None + + def my_factory(ctx): + nonlocal resolved_item_type + resolved_item_type = ctx.item_type + + async def mat(it): + return [x async for x in it] + + return mat + + class Params(NamedTuple): + pass + + async def my_step() -> AsyncIterator[str]: + yield "a" + yield "b" + + captured = None + + def sink(my_step: list[str]) -> list[str]: + nonlocal captured + captured = my_step + return my_step + + p = pipeline( + name="test_future_annotations_mat", + params=Params, + steps=[ + step("my_step", fn=my_step, materializer=my_factory), + step("sink", fn=sink), + ], + ) + await async_run(p, Params()) + assert captured == ["a", "b"] + assert resolved_item_type == AsyncIterator[str] diff --git a/tests/execution/test_pep563_runtime.py b/tests/execution/sync_engine/test_runner_pep563.py similarity index 88% rename from tests/execution/test_pep563_runtime.py rename to tests/execution/sync_engine/test_runner_pep563.py index 5b15e55..23bff11 100644 --- a/tests/execution/test_pep563_runtime.py +++ b/tests/execution/sync_engine/test_runner_pep563.py @@ -3,9 +3,10 @@ from collections.abc import Iterator from typing import NamedTuple from synaflow import pipeline, step +from synaflow.execution.sync_engine.executor import run as sync_run -def test_given_future_annotations_when_run_then_executes_successfully(run_pipeline): +def test_given_future_annotations_when_run_then_executes_successfully(): class Params(NamedTuple): name: str = "" @@ -23,13 +24,11 @@ def my_step(name: str) -> str: step("my_step", fn=my_step), ], ) - run_pipeline(p, Params(name="hello")) + sync_run(p, Params(name="hello")) assert captured == "hello" -def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object( - run_pipeline, -): +def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object(): resolved_item_type = None def my_factory(ctx): @@ -59,6 +58,6 @@ def sink(my_step: list[str]) -> list[str]: step("sink", fn=sink), ], ) - run_pipeline(p, Params()) + sync_run(p, Params()) assert captured == ["a", "b"] assert resolved_item_type == Iterator[str] From fd84941bc15e808d1fdad65254b6b158ec9927a0 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:00:19 +0100 Subject: [PATCH 07/12] feat: only require custom materializer for custom types when needs_materialize is True --- synaflow/core/dag_builder.py | 22 +++++++----- tests/core/test_dag_builder_materializer.py | 32 +++++++++++++++++ .../test_async_runner_materialization.py | 36 +++++++++++++++++++ .../test_runner_materialization.py | 36 +++++++++++++++++++ 4 files changed, 118 insertions(+), 8 deletions(-) diff --git a/synaflow/core/dag_builder.py b/synaflow/core/dag_builder.py index a0fe8e9..849b8d6 100644 --- a/synaflow/core/dag_builder.py +++ b/synaflow/core/dag_builder.py @@ -219,13 +219,19 @@ def _resolve_materializers( and is_iterable_type(node.output) and node.materializer is memory_materializer_factory ): - inner = get_inner_type(node.output) - if inner is not None and not _is_builtin_type(inner): - raise ValueError( - f"Node '{name}': output item type '{inner}' requires a custom" - " materializer. Provide a step-level materializer or a" - " pipeline-level materializer." - ) + needs_materialize = ( + node.on_error == OnError.STOP + or node.force_materialize + or any(name in consumer.materialized_deps for consumer in dag.values()) + ) + if needs_materialize: + inner = get_inner_type(node.output) + if inner is not None and not _is_builtin_type(inner): + raise ValueError( + f"Node '{name}': output item type '{inner}' requires a custom" + " materializer. Provide a step-level materializer or a" + " pipeline-level materializer." + ) def _compute_materialized_deps(dag: dict[str, DagNode]) -> None: @@ -321,12 +327,12 @@ def build_dag( params, pipeline_obs_resolved, ) + _compute_materialized_deps(dag) _resolve_materializers( dag, memory_materializer_factory, error_materializer_factory, ) - _compute_materialized_deps(dag) dag_obj = _finalize_dag( pipeline_name, dag, diff --git a/tests/core/test_dag_builder_materializer.py b/tests/core/test_dag_builder_materializer.py index ea7f0f7..b6c6041 100644 --- a/tests/core/test_dag_builder_materializer.py +++ b/tests/core/test_dag_builder_materializer.py @@ -167,6 +167,38 @@ def consumer(producer: list[Row]) -> int: ) +def test_given_no_custom_materializer_and_non_builtin_inner_type_when_not_materialized_then_dag_builds(): + from dataclasses import dataclass + from collections.abc import Iterator + from synaflow import pipeline, step + + @dataclass + class Row: + id: int + name: str + + class Params(NamedTuple): + pass + + def producer() -> Iterator[Row]: + yield Row(id=1, name="a") + + # Consumed as a scalar (EACH mode) so needs_materialize is False + def consumer(producer: Row) -> None: + pass + + p = pipeline( + name="test_validation_no_mat", + params=Params, + steps=[ + step("producer", fn=producer), + step("consumer", fn=consumer), + ], + ) + assert p.dag is not None + assert p.dag.needs_materialize("producer") is False + + def test_given_step_materializer_when_non_builtin_inner_type_used_then_dag_builds(): from dataclasses import dataclass from collections.abc import Iterator diff --git a/tests/execution/async_engine/test_async_runner_materialization.py b/tests/execution/async_engine/test_async_runner_materialization.py index 956a69c..fccba18 100644 --- a/tests/execution/async_engine/test_async_runner_materialization.py +++ b/tests/execution/async_engine/test_async_runner_materialization.py @@ -858,3 +858,39 @@ async def async_list(async_iterator) -> list: await async_run(my_pipeline, params=P()) assert seen == [Row(id=1, name="alice"), Row(id=2, name="bob")] + + +async def test_given_no_custom_materializer_and_non_builtin_type_when_not_materialized_then_executes_successfully(): + from dataclasses import dataclass + from collections.abc import AsyncGenerator + from synaflow import async_run + + @dataclass + class Row: + id: int + name: str + + class P(NamedTuple): + pass + + async def producer() -> AsyncGenerator[Row, None]: + yield Row(id=1, name="alice") + yield Row(id=2, name="bob") + + seen = [] + + # Consumed as a scalar (EACH mode) so needs_materialize is False + async def consumer(producer: Row): + seen.append(producer) + + my_pipeline = pipeline( + name="test_no_materializer_custom_type_not_materialized", + params=P, + steps=[ + step("producer", fn=producer), + step("consumer", fn=consumer), + ], + ) + + await async_run(my_pipeline, params=P()) + assert seen == [Row(id=1, name="alice"), Row(id=2, name="bob")] diff --git a/tests/execution/sync_engine/test_runner_materialization.py b/tests/execution/sync_engine/test_runner_materialization.py index ece914a..88c8724 100644 --- a/tests/execution/sync_engine/test_runner_materialization.py +++ b/tests/execution/sync_engine/test_runner_materialization.py @@ -859,3 +859,39 @@ def consumer(producer: list[Row]): run_pipeline(my_pipeline, params=P()) assert seen == [Row(id=1, name="alice"), Row(id=2, name="bob")] + + +def test_given_no_custom_materializer_and_non_builtin_type_when_not_materialized_then_executes_successfully( + run_pipeline, +): + from dataclasses import dataclass + + @dataclass + class Row: + id: int + name: str + + class P(NamedTuple): + pass + + def producer() -> Iterator[Row]: + yield Row(id=1, name="alice") + yield Row(id=2, name="bob") + + seen = [] + + # Consumed as a scalar (EACH mode) so needs_materialize is False + def consumer(producer: Row): + seen.append(producer) + + my_pipeline = pipeline( + name="test_no_materializer_custom_type_not_materialized", + params=P, + steps=[ + step("producer", fn=producer), + step("consumer", fn=consumer), + ], + ) + + run_pipeline(my_pipeline, params=P()) + assert seen == [Row(id=1, name="alice"), Row(id=2, name="bob")] From c7463b7d3e2a9d7634d39d3d654e2505d21805c3 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:02:24 +0100 Subject: [PATCH 08/12] test: modify non-builtin type test to consume as Iterator and not use a custom materializer --- .../test_async_runner_materialization.py | 19 +++++++------------ .../test_runner_materialization.py | 10 +++++----- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/tests/execution/async_engine/test_async_runner_materialization.py b/tests/execution/async_engine/test_async_runner_materialization.py index fccba18..b2c06a9 100644 --- a/tests/execution/async_engine/test_async_runner_materialization.py +++ b/tests/execution/async_engine/test_async_runner_materialization.py @@ -819,9 +819,9 @@ async def consume(produce: int): assert materialized == [6] -async def test_given_step_custom_materializer_and_non_builtin_type_when_run_then_executes_successfully(): +async def test_given_step_non_builtin_type_and_iterator_consumer_when_run_then_executes_successfully(): from dataclasses import dataclass - from collections.abc import AsyncGenerator + from collections.abc import AsyncGenerator, AsyncIterator from synaflow import async_run @dataclass @@ -838,20 +838,15 @@ async def producer() -> AsyncGenerator[Row, None]: seen = [] - async def consumer(producer: list[Row]): - seen.extend(producer) - - async def async_list(async_iterator) -> list: - items = [] - async for item in async_iterator: - items.append(item) - return items + async def consumer(producer: AsyncIterator[Row]): + async for item in producer: + seen.append(item) my_pipeline = pipeline( - name="test_custom_materializer_custom_type", + name="test_custom_type_iterator_no_mat", params=P, steps=[ - step("producer", fn=producer, materializer=to_materializer(async_list)), + step("producer", fn=producer), step("consumer", fn=consumer), ], ) diff --git a/tests/execution/sync_engine/test_runner_materialization.py b/tests/execution/sync_engine/test_runner_materialization.py index 88c8724..bd45d70 100644 --- a/tests/execution/sync_engine/test_runner_materialization.py +++ b/tests/execution/sync_engine/test_runner_materialization.py @@ -826,7 +826,7 @@ def sink(pair: list[tuple[int | None, int | None]]): assert seen == [(None, 10), (None, 20)] -def test_given_step_custom_materializer_and_non_builtin_type_when_run_then_executes_successfully( +def test_given_step_non_builtin_type_and_iterator_consumer_when_run_then_executes_successfully( run_pipeline, ): from dataclasses import dataclass @@ -845,14 +845,14 @@ def producer() -> Iterator[Row]: seen = [] - def consumer(producer: list[Row]): - seen.extend(producer) + def consumer(producer: Iterator[Row]): + seen.extend(list(producer)) my_pipeline = pipeline( - name="test_custom_materializer_custom_type", + name="test_custom_type_iterator_no_mat", params=P, steps=[ - step("producer", fn=producer, materializer=to_materializer(list)), + step("producer", fn=producer), step("consumer", fn=consumer), ], ) From e288e542f6e39dd37a1dcbac97b5777176ac9c18 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:05:15 +0100 Subject: [PATCH 09/12] test: add custom class type cases to is_type_compatible test --- tests/core/test_is_type_compatible.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/core/test_is_type_compatible.py b/tests/core/test_is_type_compatible.py index e0ff300..0d8c262 100644 --- a/tests/core/test_is_type_compatible.py +++ b/tests/core/test_is_type_compatible.py @@ -40,6 +40,9 @@ class OtherDataclass: (MyDataclass, OtherDataclass, False), (list[MyDataclass], list[MyDataclass], True), (list[MyDataclass], list[OtherDataclass], False), + (Iterator[MyDataclass], Iterator[MyDataclass], True), + (Iterator[MyDataclass], Iterator[OtherDataclass], False), + (Generator[MyDataclass, None, None], Iterator[MyDataclass], True), (tuple[int, str], tuple[int, str], True), (tuple[int, str], tuple[str, int], False), (Future, Future, True), From 7f8f35db14d87d87e14e0fb023b1cb53043ac731 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:08:07 +0100 Subject: [PATCH 10/12] refactor: use Dag.needs_materialize in _resolve_materializers instead of duplicating logic --- synaflow/core/dag_builder.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/synaflow/core/dag_builder.py b/synaflow/core/dag_builder.py index 849b8d6..0f58cc8 100644 --- a/synaflow/core/dag_builder.py +++ b/synaflow/core/dag_builder.py @@ -192,11 +192,11 @@ def _resolve_step_observers( def _resolve_materializers( - dag: dict[str, DagNode], + dag: Dag, pipeline_materializer: Any, pipeline_error_materializer: Any, ) -> None: - for name, node in dag.items(): + for name, node in dag.steps.items(): if not node.fn: node.materializer = None node.error_materializer = None @@ -219,12 +219,7 @@ def _resolve_materializers( and is_iterable_type(node.output) and node.materializer is memory_materializer_factory ): - needs_materialize = ( - node.on_error == OnError.STOP - or node.force_materialize - or any(name in consumer.materialized_deps for consumer in dag.values()) - ) - if needs_materialize: + if dag.needs_materialize(name): inner = get_inner_type(node.output) if inner is not None and not _is_builtin_type(inner): raise ValueError( @@ -328,11 +323,6 @@ def build_dag( pipeline_obs_resolved, ) _compute_materialized_deps(dag) - _resolve_materializers( - dag, - memory_materializer_factory, - error_materializer_factory, - ) dag_obj = _finalize_dag( pipeline_name, dag, @@ -340,6 +330,11 @@ def build_dag( error_materializer_factory, pipeline_obs_resolved, ) + _resolve_materializers( + dag_obj, + memory_materializer_factory, + error_materializer_factory, + ) check_circular_dependencies(dag_obj, pipeline_name) From 1d577043f3d2a4c41dbd61e4b289a42efc683334 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:11:29 +0100 Subject: [PATCH 11/12] test: add custom NamedTuple and Iterator[Future] compatibility test cases --- tests/core/test_is_type_compatible.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/core/test_is_type_compatible.py b/tests/core/test_is_type_compatible.py index 0d8c262..ccd4b6c 100644 --- a/tests/core/test_is_type_compatible.py +++ b/tests/core/test_is_type_compatible.py @@ -16,6 +16,19 @@ class OtherDataclass: x: int +from typing import NamedTuple + + +class MyNamedTuple(NamedTuple): + id: int + name: str + + +class OtherNamedTuple(NamedTuple): + id: int + name: str + + @pytest.mark.parametrize( "producer, consumer, expected", [ @@ -43,6 +56,17 @@ class OtherDataclass: (Iterator[MyDataclass], Iterator[MyDataclass], True), (Iterator[MyDataclass], Iterator[OtherDataclass], False), (Generator[MyDataclass, None, None], Iterator[MyDataclass], True), + # NamedTuple cases + (MyNamedTuple, MyNamedTuple, True), + (MyNamedTuple, OtherNamedTuple, False), + (list[MyNamedTuple], list[MyNamedTuple], True), + (list[MyNamedTuple], list[OtherNamedTuple], False), + (Iterator[MyNamedTuple], Iterator[MyNamedTuple], True), + (Iterator[MyNamedTuple], Iterator[OtherNamedTuple], False), + (Generator[MyNamedTuple, None, None], Iterator[MyNamedTuple], True), + # Future cases + (Iterator[Future], Iterator[Future], True), + (Iterator[Future], Iterator[int], False), (tuple[int, str], tuple[int, str], True), (tuple[int, str], tuple[str, int], False), (Future, Future, True), From b0a1e0610cebe7369bfe0cdc78e0a9b2c6b2a454 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Fri, 19 Jun 2026 19:13:17 +0100 Subject: [PATCH 12/12] ci: report Total Coverage as a GitHub Check Run in PRs --- .github/scripts/coverage_report.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.github/scripts/coverage_report.py b/.github/scripts/coverage_report.py index 6e78f5b..4c9cad6 100644 --- a/.github/scripts/coverage_report.py +++ b/.github/scripts/coverage_report.py @@ -215,6 +215,18 @@ def run_ci(): summary=patch_summary, ) + total_conclusion = "success" if total_pct >= TOTAL_THRESHOLD else "failure" + total_summary = ( + f"Overall package coverage: **{total_pct:.1f}%** " + f"(threshold: {TOTAL_THRESHOLD}%)." + ) + create_check_run( + name="Total Coverage", + conclusion=total_conclusion, + title=f"Total Coverage: {total_pct:.1f}%", + summary=total_summary, + ) + if total_pct < TOTAL_THRESHOLD: print( f"FAIL: total coverage {total_pct:.1f}% is below {TOTAL_THRESHOLD}% threshold"