diff --git a/.github/scripts/coverage_report.py b/.github/scripts/coverage_report.py index 6e78f5b..4c9cad6 100644 --- a/.github/scripts/coverage_report.py +++ b/.github/scripts/coverage_report.py @@ -215,6 +215,18 @@ def run_ci(): summary=patch_summary, ) + total_conclusion = "success" if total_pct >= TOTAL_THRESHOLD else "failure" + total_summary = ( + f"Overall package coverage: **{total_pct:.1f}%** " + f"(threshold: {TOTAL_THRESHOLD}%)." + ) + create_check_run( + name="Total Coverage", + conclusion=total_conclusion, + title=f"Total Coverage: {total_pct:.1f}%", + summary=total_summary, + ) + if total_pct < TOTAL_THRESHOLD: print( f"FAIL: total coverage {total_pct:.1f}% is below {TOTAL_THRESHOLD}% threshold" diff --git a/synaflow/core/dag_builder.py b/synaflow/core/dag_builder.py index a0fe8e9..0f58cc8 100644 --- a/synaflow/core/dag_builder.py +++ b/synaflow/core/dag_builder.py @@ -192,11 +192,11 @@ def _resolve_step_observers( def _resolve_materializers( - dag: dict[str, DagNode], + dag: Dag, pipeline_materializer: Any, pipeline_error_materializer: Any, ) -> None: - for name, node in dag.items(): + for name, node in dag.steps.items(): if not node.fn: node.materializer = None node.error_materializer = None @@ -219,13 +219,14 @@ def _resolve_materializers( and is_iterable_type(node.output) and node.materializer is memory_materializer_factory ): - inner = get_inner_type(node.output) - if inner is not None and not _is_builtin_type(inner): - raise ValueError( - f"Node '{name}': output item type '{inner}' requires a custom" - " materializer. Provide a step-level materializer or a" - " pipeline-level materializer." - ) + if dag.needs_materialize(name): + inner = get_inner_type(node.output) + if inner is not None and not _is_builtin_type(inner): + raise ValueError( + f"Node '{name}': output item type '{inner}' requires a custom" + " materializer. Provide a step-level materializer or a" + " pipeline-level materializer." + ) def _compute_materialized_deps(dag: dict[str, DagNode]) -> None: @@ -321,11 +322,6 @@ def build_dag( params, pipeline_obs_resolved, ) - _resolve_materializers( - dag, - memory_materializer_factory, - error_materializer_factory, - ) _compute_materialized_deps(dag) dag_obj = _finalize_dag( pipeline_name, @@ -334,6 +330,11 @@ def build_dag( error_materializer_factory, pipeline_obs_resolved, ) + _resolve_materializers( + dag_obj, + memory_materializer_factory, + error_materializer_factory, + ) check_circular_dependencies(dag_obj, pipeline_name) diff --git a/synaflow/core/dag_dependencies.py b/synaflow/core/dag_dependencies.py index 60ca2bc..406971e 100644 --- a/synaflow/core/dag_dependencies.py +++ b/synaflow/core/dag_dependencies.py @@ -23,7 +23,7 @@ def initialize_parameters(params: type[NamedTuple]) -> dict[str, DagNode]: hints = {} try: hints = typing.get_type_hints(params) - except Exception: + except (NameError, TypeError): hints = getattr(params, "__annotations__", {}) for field in getattr(params, "_fields", []): tp = hints.get(field) @@ -34,7 +34,7 @@ def initialize_parameters(params: type[NamedTuple]) -> dict[str, DagNode]: def get_safe_type_hints(fn: Any) -> dict[str, Any]: try: return typing.get_type_hints(fn, include_extras=True) - except Exception: + except (NameError, TypeError): return {} diff --git a/tests/core/test_dag_builder_materializer.py b/tests/core/test_dag_builder_materializer.py index ea7f0f7..b6c6041 100644 --- a/tests/core/test_dag_builder_materializer.py +++ b/tests/core/test_dag_builder_materializer.py @@ -167,6 +167,38 @@ def consumer(producer: list[Row]) -> int: ) +def test_given_no_custom_materializer_and_non_builtin_inner_type_when_not_materialized_then_dag_builds(): + from dataclasses import dataclass + from collections.abc import Iterator + from synaflow import pipeline, step + + @dataclass + class Row: + id: int + name: str + + class Params(NamedTuple): + pass + + def producer() -> Iterator[Row]: + yield Row(id=1, name="a") + + # Consumed as a scalar (EACH mode) so needs_materialize is False + def consumer(producer: Row) -> None: + pass + + p = pipeline( + name="test_validation_no_mat", + params=Params, + steps=[ + step("producer", fn=producer), + step("consumer", fn=consumer), + ], + ) + assert p.dag is not None + assert p.dag.needs_materialize("producer") is False + + def test_given_step_materializer_when_non_builtin_inner_type_used_then_dag_builds(): from dataclasses import dataclass from collections.abc import Iterator diff --git a/tests/core/test_is_type_compatible.py b/tests/core/test_is_type_compatible.py index 6cc3958..ccd4b6c 100644 --- a/tests/core/test_is_type_compatible.py +++ b/tests/core/test_is_type_compatible.py @@ -1,9 +1,34 @@ from collections.abc import Generator, Iterator +from dataclasses import dataclass +from concurrent.futures import Future from typing import Any import pytest from synaflow.core.type_compatibility import is_type_compatible +@dataclass +class MyDataclass: + x: int + + +@dataclass +class OtherDataclass: + x: int + + +from typing import NamedTuple + + +class MyNamedTuple(NamedTuple): + id: int + name: str + + +class OtherNamedTuple(NamedTuple): + id: int + name: str + + @pytest.mark.parametrize( "producer, consumer, expected", [ @@ -23,6 +48,29 @@ # Mismatched types should still fail (list[int], list[str], False), (list[dict], list[int], True), + # Custom/specific type compatibility tests + (MyDataclass, MyDataclass, True), + (MyDataclass, OtherDataclass, False), + (list[MyDataclass], list[MyDataclass], True), + (list[MyDataclass], list[OtherDataclass], False), + (Iterator[MyDataclass], Iterator[MyDataclass], True), + (Iterator[MyDataclass], Iterator[OtherDataclass], False), + (Generator[MyDataclass, None, None], Iterator[MyDataclass], True), + # NamedTuple cases + (MyNamedTuple, MyNamedTuple, True), + (MyNamedTuple, OtherNamedTuple, False), + (list[MyNamedTuple], list[MyNamedTuple], True), + (list[MyNamedTuple], list[OtherNamedTuple], False), + (Iterator[MyNamedTuple], Iterator[MyNamedTuple], True), + (Iterator[MyNamedTuple], Iterator[OtherNamedTuple], False), + (Generator[MyNamedTuple, None, None], Iterator[MyNamedTuple], True), + # Future cases + (Iterator[Future], Iterator[Future], True), + (Iterator[Future], Iterator[int], False), + (tuple[int, str], tuple[int, str], True), + (tuple[int, str], tuple[str, int], False), + (Future, Future, True), + (list[Future], list[Future], True), ], ) def test_given_bare_containers_when_checking_compatibility_then_returns_expected( diff --git a/tests/execution/async_engine/test_async_runner_materialization.py b/tests/execution/async_engine/test_async_runner_materialization.py index 956a69c..b2c06a9 100644 --- a/tests/execution/async_engine/test_async_runner_materialization.py +++ b/tests/execution/async_engine/test_async_runner_materialization.py @@ -819,9 +819,9 @@ async def consume(produce: int): assert materialized == [6] -async def test_given_step_custom_materializer_and_non_builtin_type_when_run_then_executes_successfully(): +async def test_given_step_non_builtin_type_and_iterator_consumer_when_run_then_executes_successfully(): from dataclasses import dataclass - from collections.abc import AsyncGenerator + from collections.abc import AsyncGenerator, AsyncIterator from synaflow import async_run @dataclass @@ -838,20 +838,51 @@ async def producer() -> AsyncGenerator[Row, None]: seen = [] - async def consumer(producer: list[Row]): - seen.extend(producer) + async def consumer(producer: AsyncIterator[Row]): + async for item in producer: + seen.append(item) - async def async_list(async_iterator) -> list: - items = [] - async for item in async_iterator: - items.append(item) - return items + my_pipeline = pipeline( + name="test_custom_type_iterator_no_mat", + params=P, + steps=[ + step("producer", fn=producer), + step("consumer", fn=consumer), + ], + ) + + await async_run(my_pipeline, params=P()) + assert seen == [Row(id=1, name="alice"), Row(id=2, name="bob")] + + +async def test_given_no_custom_materializer_and_non_builtin_type_when_not_materialized_then_executes_successfully(): + from dataclasses import dataclass + from collections.abc import AsyncGenerator + from synaflow import async_run + + @dataclass + class Row: + id: int + name: str + + class P(NamedTuple): + pass + + async def producer() -> AsyncGenerator[Row, None]: + yield Row(id=1, name="alice") + yield Row(id=2, name="bob") + + seen = [] + + # Consumed as a scalar (EACH mode) so needs_materialize is False + async def consumer(producer: Row): + seen.append(producer) my_pipeline = pipeline( - name="test_custom_materializer_custom_type", + name="test_no_materializer_custom_type_not_materialized", params=P, steps=[ - step("producer", fn=producer, materializer=to_materializer(async_list)), + step("producer", fn=producer), step("consumer", fn=consumer), ], ) diff --git a/tests/execution/async_engine/test_async_runner_pep563.py b/tests/execution/async_engine/test_async_runner_pep563.py new file mode 100644 index 0000000..eeb0c97 --- /dev/null +++ b/tests/execution/async_engine/test_async_runner_pep563.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import NamedTuple +from synaflow import async_run, pipeline, step + + +async def test_given_future_annotations_when_run_then_executes_successfully(): + class Params(NamedTuple): + name: str = "" + + captured = None + + def my_step(name: str) -> str: + nonlocal captured + captured = name + return name + + p = pipeline( + name="test_future_annotations_run", + params=Params, + steps=[ + step("my_step", fn=my_step), + ], + ) + await async_run(p, Params(name="hello")) + assert captured == "hello" + + +async def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object(): + resolved_item_type = None + + def my_factory(ctx): + nonlocal resolved_item_type + resolved_item_type = ctx.item_type + + async def mat(it): + return [x async for x in it] + + return mat + + class Params(NamedTuple): + pass + + async def my_step() -> AsyncIterator[str]: + yield "a" + yield "b" + + captured = None + + def sink(my_step: list[str]) -> list[str]: + nonlocal captured + captured = my_step + return my_step + + p = pipeline( + name="test_future_annotations_mat", + params=Params, + steps=[ + step("my_step", fn=my_step, materializer=my_factory), + step("sink", fn=sink), + ], + ) + await async_run(p, Params()) + assert captured == ["a", "b"] + assert resolved_item_type == AsyncIterator[str] diff --git a/tests/execution/sync_engine/test_runner_materialization.py b/tests/execution/sync_engine/test_runner_materialization.py index ece914a..bd45d70 100644 --- a/tests/execution/sync_engine/test_runner_materialization.py +++ b/tests/execution/sync_engine/test_runner_materialization.py @@ -826,7 +826,7 @@ def sink(pair: list[tuple[int | None, int | None]]): assert seen == [(None, 10), (None, 20)] -def test_given_step_custom_materializer_and_non_builtin_type_when_run_then_executes_successfully( +def test_given_step_non_builtin_type_and_iterator_consumer_when_run_then_executes_successfully( run_pipeline, ): from dataclasses import dataclass @@ -845,14 +845,50 @@ def producer() -> Iterator[Row]: seen = [] - def consumer(producer: list[Row]): - seen.extend(producer) + def consumer(producer: Iterator[Row]): + seen.extend(list(producer)) my_pipeline = pipeline( - name="test_custom_materializer_custom_type", + name="test_custom_type_iterator_no_mat", params=P, steps=[ - step("producer", fn=producer, materializer=to_materializer(list)), + step("producer", fn=producer), + step("consumer", fn=consumer), + ], + ) + + run_pipeline(my_pipeline, params=P()) + assert seen == [Row(id=1, name="alice"), Row(id=2, name="bob")] + + +def test_given_no_custom_materializer_and_non_builtin_type_when_not_materialized_then_executes_successfully( + run_pipeline, +): + from dataclasses import dataclass + + @dataclass + class Row: + id: int + name: str + + class P(NamedTuple): + pass + + def producer() -> Iterator[Row]: + yield Row(id=1, name="alice") + yield Row(id=2, name="bob") + + seen = [] + + # Consumed as a scalar (EACH mode) so needs_materialize is False + def consumer(producer: Row): + seen.append(producer) + + my_pipeline = pipeline( + name="test_no_materializer_custom_type_not_materialized", + params=P, + steps=[ + step("producer", fn=producer), step("consumer", fn=consumer), ], ) diff --git a/tests/execution/sync_engine/test_runner_pep563.py b/tests/execution/sync_engine/test_runner_pep563.py new file mode 100644 index 0000000..23bff11 --- /dev/null +++ b/tests/execution/sync_engine/test_runner_pep563.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from collections.abc import Iterator +from typing import NamedTuple +from synaflow import pipeline, step +from synaflow.execution.sync_engine.executor import run as sync_run + + +def test_given_future_annotations_when_run_then_executes_successfully(): + class Params(NamedTuple): + name: str = "" + + captured = None + + def my_step(name: str) -> str: + nonlocal captured + captured = name + return name + + p = pipeline( + name="test_future_annotations_run", + params=Params, + steps=[ + step("my_step", fn=my_step), + ], + ) + sync_run(p, Params(name="hello")) + assert captured == "hello" + + +def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object(): + resolved_item_type = None + + def my_factory(ctx): + nonlocal resolved_item_type + resolved_item_type = ctx.item_type + return lambda it: list(it) + + class Params(NamedTuple): + pass + + def my_step() -> Iterator[str]: + yield "a" + yield "b" + + captured = None + + def sink(my_step: list[str]) -> list[str]: + nonlocal captured + captured = my_step + return my_step + + p = pipeline( + name="test_future_annotations_mat", + params=Params, + steps=[ + step("my_step", fn=my_step, materializer=my_factory), + step("sink", fn=sink), + ], + ) + sync_run(p, Params()) + assert captured == ["a", "b"] + assert resolved_item_type == Iterator[str]