Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/scripts/coverage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@ def run_ci():
summary=patch_summary,
)

total_conclusion = "success" if total_pct >= TOTAL_THRESHOLD else "failure"
total_summary = (
f"Overall package coverage: **{total_pct:.1f}%** "
f"(threshold: {TOTAL_THRESHOLD}%)."
)
create_check_run(
name="Total Coverage",
conclusion=total_conclusion,
title=f"Total Coverage: {total_pct:.1f}%",
summary=total_summary,
)

if total_pct < TOTAL_THRESHOLD:
print(
f"FAIL: total coverage {total_pct:.1f}% is below {TOTAL_THRESHOLD}% threshold"
Expand Down
29 changes: 15 additions & 14 deletions synaflow/core/dag_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ def _resolve_step_observers(


def _resolve_materializers(
dag: dict[str, DagNode],
dag: Dag,
pipeline_materializer: Any,
pipeline_error_materializer: Any,
) -> None:
for name, node in dag.items():
for name, node in dag.steps.items():
if not node.fn:
node.materializer = None
node.error_materializer = None
Expand All @@ -219,13 +219,14 @@ def _resolve_materializers(
and is_iterable_type(node.output)
and node.materializer is memory_materializer_factory
):
inner = get_inner_type(node.output)
if inner is not None and not _is_builtin_type(inner):
raise ValueError(
f"Node '{name}': output item type '{inner}' requires a custom"
" materializer. Provide a step-level materializer or a"
" pipeline-level materializer."
)
if dag.needs_materialize(name):
inner = get_inner_type(node.output)
if inner is not None and not _is_builtin_type(inner):
raise ValueError(
f"Node '{name}': output item type '{inner}' requires a custom"
" materializer. Provide a step-level materializer or a"
" pipeline-level materializer."
)


def _compute_materialized_deps(dag: dict[str, DagNode]) -> None:
Expand Down Expand Up @@ -321,11 +322,6 @@ def build_dag(
params,
pipeline_obs_resolved,
)
_resolve_materializers(
dag,
memory_materializer_factory,
error_materializer_factory,
)
_compute_materialized_deps(dag)
dag_obj = _finalize_dag(
pipeline_name,
Expand All @@ -334,6 +330,11 @@ def build_dag(
error_materializer_factory,
pipeline_obs_resolved,
)
_resolve_materializers(
dag_obj,
memory_materializer_factory,
error_materializer_factory,
)

check_circular_dependencies(dag_obj, pipeline_name)

Expand Down
4 changes: 2 additions & 2 deletions synaflow/core/dag_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def initialize_parameters(params: type[NamedTuple]) -> dict[str, DagNode]:
hints = {}
try:
hints = typing.get_type_hints(params)
except Exception:
except (NameError, TypeError):
hints = getattr(params, "__annotations__", {})
for field in getattr(params, "_fields", []):
tp = hints.get(field)
Expand All @@ -34,7 +34,7 @@ def initialize_parameters(params: type[NamedTuple]) -> dict[str, DagNode]:
def get_safe_type_hints(fn: Any) -> dict[str, Any]:
try:
return typing.get_type_hints(fn, include_extras=True)
except Exception:
except (NameError, TypeError):
return {}


Expand Down
32 changes: 32 additions & 0 deletions tests/core/test_dag_builder_materializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,38 @@ def consumer(producer: list[Row]) -> int:
)


def test_given_no_custom_materializer_and_non_builtin_inner_type_when_not_materialized_then_dag_builds():
from dataclasses import dataclass
from collections.abc import Iterator
from synaflow import pipeline, step

@dataclass
class Row:
id: int
name: str

class Params(NamedTuple):
pass

def producer() -> Iterator[Row]:
yield Row(id=1, name="a")

# Consumed as a scalar (EACH mode) so needs_materialize is False
def consumer(producer: Row) -> None:
pass

p = pipeline(
name="test_validation_no_mat",
params=Params,
steps=[
step("producer", fn=producer),
step("consumer", fn=consumer),
],
)
assert p.dag is not None
assert p.dag.needs_materialize("producer") is False


def test_given_step_materializer_when_non_builtin_inner_type_used_then_dag_builds():
from dataclasses import dataclass
from collections.abc import Iterator
Expand Down
48 changes: 48 additions & 0 deletions tests/core/test_is_type_compatible.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,34 @@
from collections.abc import Generator, Iterator
from dataclasses import dataclass
from concurrent.futures import Future
from typing import Any
import pytest
from synaflow.core.type_compatibility import is_type_compatible


@dataclass
class MyDataclass:
x: int


@dataclass
class OtherDataclass:
x: int


from typing import NamedTuple


class MyNamedTuple(NamedTuple):
id: int
name: str


class OtherNamedTuple(NamedTuple):
id: int
name: str


@pytest.mark.parametrize(
"producer, consumer, expected",
[
Expand All @@ -23,6 +48,29 @@
# Mismatched types should still fail
(list[int], list[str], False),
(list[dict], list[int], True),
# Custom/specific type compatibility tests
(MyDataclass, MyDataclass, True),
(MyDataclass, OtherDataclass, False),
(list[MyDataclass], list[MyDataclass], True),
(list[MyDataclass], list[OtherDataclass], False),
(Iterator[MyDataclass], Iterator[MyDataclass], True),
(Iterator[MyDataclass], Iterator[OtherDataclass], False),
(Generator[MyDataclass, None, None], Iterator[MyDataclass], True),
# NamedTuple cases
(MyNamedTuple, MyNamedTuple, True),
(MyNamedTuple, OtherNamedTuple, False),
(list[MyNamedTuple], list[MyNamedTuple], True),
(list[MyNamedTuple], list[OtherNamedTuple], False),
(Iterator[MyNamedTuple], Iterator[MyNamedTuple], True),
(Iterator[MyNamedTuple], Iterator[OtherNamedTuple], False),
(Generator[MyNamedTuple, None, None], Iterator[MyNamedTuple], True),
# Future cases
(Iterator[Future], Iterator[Future], True),
(Iterator[Future], Iterator[int], False),
(tuple[int, str], tuple[int, str], True),
(tuple[int, str], tuple[str, int], False),
(Future, Future, True),
(list[Future], list[Future], True),
],
)
def test_given_bare_containers_when_checking_compatibility_then_returns_expected(
Expand Down
53 changes: 42 additions & 11 deletions tests/execution/async_engine/test_async_runner_materialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,9 +819,9 @@ async def consume(produce: int):
assert materialized == [6]


async def test_given_step_custom_materializer_and_non_builtin_type_when_run_then_executes_successfully():
async def test_given_step_non_builtin_type_and_iterator_consumer_when_run_then_executes_successfully():
from dataclasses import dataclass
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, AsyncIterator
from synaflow import async_run

@dataclass
Expand All @@ -838,20 +838,51 @@ async def producer() -> AsyncGenerator[Row, None]:

seen = []

async def consumer(producer: list[Row]):
seen.extend(producer)
async def consumer(producer: AsyncIterator[Row]):
async for item in producer:
seen.append(item)

async def async_list(async_iterator) -> list:
items = []
async for item in async_iterator:
items.append(item)
return items
my_pipeline = pipeline(
name="test_custom_type_iterator_no_mat",
params=P,
steps=[
step("producer", fn=producer),
step("consumer", fn=consumer),
],
)

await async_run(my_pipeline, params=P())
assert seen == [Row(id=1, name="alice"), Row(id=2, name="bob")]


async def test_given_no_custom_materializer_and_non_builtin_type_when_not_materialized_then_executes_successfully():
from dataclasses import dataclass
from collections.abc import AsyncGenerator
from synaflow import async_run

@dataclass
class Row:
id: int
name: str

class P(NamedTuple):
pass

async def producer() -> AsyncGenerator[Row, None]:
yield Row(id=1, name="alice")
yield Row(id=2, name="bob")

seen = []

# Consumed as a scalar (EACH mode) so needs_materialize is False
async def consumer(producer: Row):
seen.append(producer)

my_pipeline = pipeline(
name="test_custom_materializer_custom_type",
name="test_no_materializer_custom_type_not_materialized",
params=P,
steps=[
step("producer", fn=producer, materializer=to_materializer(async_list)),
step("producer", fn=producer),
step("consumer", fn=consumer),
],
)
Expand Down
66 changes: 66 additions & 0 deletions tests/execution/async_engine/test_async_runner_pep563.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from __future__ import annotations

from collections.abc import AsyncIterator
from typing import NamedTuple
from synaflow import async_run, pipeline, step


async def test_given_future_annotations_when_run_then_executes_successfully():
class Params(NamedTuple):
name: str = ""

captured = None

def my_step(name: str) -> str:
nonlocal captured
captured = name
return name

p = pipeline(
name="test_future_annotations_run",
params=Params,
steps=[
step("my_step", fn=my_step),
],
)
await async_run(p, Params(name="hello"))
assert captured == "hello"


async def test_given_future_annotations_when_custom_materializer_executed_then_receives_type_object():
resolved_item_type = None

def my_factory(ctx):
nonlocal resolved_item_type
resolved_item_type = ctx.item_type

async def mat(it):
return [x async for x in it]

return mat

class Params(NamedTuple):
pass

async def my_step() -> AsyncIterator[str]:
yield "a"
yield "b"

captured = None

def sink(my_step: list[str]) -> list[str]:
nonlocal captured
captured = my_step
return my_step

p = pipeline(
name="test_future_annotations_mat",
params=Params,
steps=[
step("my_step", fn=my_step, materializer=my_factory),
step("sink", fn=sink),
],
)
await async_run(p, Params())
assert captured == ["a", "b"]
assert resolved_item_type == AsyncIterator[str]
Loading
Loading