Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
646486a
refactor(core): catch specific exceptions (NameError, TypeError) in t…
mvallebr Jun 19, 2026
b5f5d98
test(core): add PEP 563 runtime execution and materializer tests
mvallebr Jun 19, 2026
d471e36
test(core): add custom types and Future compatibility checks to type_…
mvallebr Jun 19, 2026
5d35fa7
refactor(tests): move pep 563 runtime tests from core to execution mo…
mvallebr Jun 19, 2026
af1f7d1
chore(tests): remove unused Iterator import from core test file
mvallebr Jun 19, 2026
77b6070
test: maintain parity by moving PEP 563 runtime tests to sync and asy…
mvallebr Jun 19, 2026
fd84941
feat: only require custom materializer for custom types when needs_ma…
mvallebr Jun 19, 2026
c7463b7
test: modify non-builtin type test to consume as Iterator and not use…
mvallebr Jun 19, 2026
e288e54
test: add custom class type cases to is_type_compatible test
mvallebr Jun 19, 2026
7f8f35d
refactor: use Dag.needs_materialize in _resolve_materializers instead…
mvallebr Jun 19, 2026
1d57704
test: add custom NamedTuple and Iterator[Future] compatibility test c…
mvallebr Jun 19, 2026
b0a1e06
ci: report Total Coverage as a GitHub Check Run in PRs
mvallebr Jun 19, 2026
ba6396a
refactor: simplify executor materialization and enforce callable erro…
mvallebr Jun 19, 2026
ed0ff7e
Merge origin/main into fix-memory-materializer-fallback
mvallebr Jun 20, 2026
1f9837a
refactor: simplify explicit pipeline materializer validation
mvallebr Jun 20, 2026
c3dcae0
refactor: centralize dag materialization planning
mvallebr Jun 20, 2026
ac344e8
test: trim redundant executor materialization cases
mvallebr Jun 20, 2026
3186a26
docs: align build-time materialization contract
mvallebr Jun 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ reads.
### Static validation at build time

Type errors, missing dependencies, circular graphs, mode conflicts — all caught
when `pipeline(...)` is called. If it compiles, it's valid. No runtime surprises.
when `pipeline(...)` is called. Materialization decisions are compiled into the
`Dag` too: mode resolution, per-dependency eager materialization, and the
resolved materializer callables are frozen before `run()` starts. If it
compiles, it's valid. No runtime surprises.

### Build your own runner

Expand Down
15 changes: 8 additions & 7 deletions docs/user_docs/core-concepts/build-vs-run.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ flowchart LR

| Phase | What happens | When | Output |
|---|---|---|---|
| **Build-time** | Type validation, mode resolution, materializer assignment, circular dependency check, sync/async consistency | `pipeline(...)` is called | `Dag` object, serializable JSON |
| **Build-time** | Type validation, mode resolution, materializer assignment, consumer materialization planning, circular dependency check, sync/async consistency | `pipeline(...)` is called | `Dag` object, serializable JSON |
| **Run-time** | Topological execution, lockstep streaming, bounded handoff via `max_in_flight`, `tee` forking, observer dispatch, error handling | `run()` / `async_run()` is called | Step outputs, side effects |

## Why this matters
Expand Down Expand Up @@ -62,8 +62,9 @@ print(p.to_dict())
```

All semantic decisions — mode, `max_in_flight`, `each_mode_deps`,
`materialized_deps` — are
resolved at build time and frozen in the JSON. Runners don't re-infer
`materialized_deps`, eager materialization triggered by `OnError.STOP` or
`force_materialize`, and the resolved `node.materializer` callable — are
resolved at build time and frozen in the JSON or `Dag`. Runners don't re-infer
semantics; they execute the contract.

### 2. Write your own runner
Expand Down Expand Up @@ -111,11 +112,11 @@ Every domain concern has a symmetric representation in both phases:
| Concern | Build-time | Run-time |
|---|---|---|
| Pipeline/Orchestration | `build_dag()` | `PipelineExecutor` / `AsyncPipelineExecutor` |
| Dependencies | `validate_and_resolve_dependencies()` | Inlined in executor |
| Topology | `check_circular_dependencies()`, `get_execution_levels()` | Inlined in executor |
| Step compilation | `validate_and_compile_step()` | Inlined in executor |
| Dependencies | `validate_and_resolve_dependencies()` | Executor reads resolved deps from `node.deps` |
| Topology | `check_circular_dependencies()`, `get_execution_levels()` | Executor iterates `dag.get_execution_levels()` |
| Step compilation | `validate_and_compile_step()` | Executor calls compiled `node.fn` |
| Mode resolution | Resolved at build time → `node.mode` | Executor reads `node.mode`, never re-infers |
| Materialization | Resolved at build time → `node.materializer` | Executor calls the resolved callable |
| Materialization | Resolved at build time → `node.materializer`, `materialized_deps`, `Dag.needs_materialize(...)` | Executor calls the resolved callable and follows the compiled plan |
| Observers | Normalized at build time → `node.observers` | Executor dispatches events |

This symmetry means sync and async executors can be completely different
Expand Down
6 changes: 5 additions & 1 deletion docs/user_docs/core-concepts/dag-construction.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,18 @@ print(p.to_dict())
"fn": "producer",
"mode": "all",
"on_error": "continue",
"materializer": "memory_materializer",
"each_mode_deps": [],
"materialized_deps": []
}
}
}
```

This JSON is the **execution contract** — external runners (Airflow, Prefect, custom executors) can read it to replicate the DAG without re-inferring semantics.
This JSON is the **execution contract** — external runners (Airflow, Prefect,
custom executors) can read it to replicate the DAG without re-inferring
semantics. Dependency edges, mode, and materialization metadata are already
compiled into the graph.

## Execution Levels

Expand Down
9 changes: 9 additions & 0 deletions docs/user_docs/core-concepts/materialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ Materialization also happens automatically when:
| Consumer asks for `tuple[T, ...]` | `def fn(data: tuple[int, ...])` |
| `on_error=STOP` on the producer (see below) | All downstream consumers materialize |

These are **build-time decisions**. When `pipeline(...)` is compiled, SynaFlow
records which dependencies must be materialized in the `Dag`
(`materialized_deps`) and resolves the materializer callable for each step.
The runtime executors do not re-decide which branch is eager; they follow the
compiled contract.

## Error Policies: `OnError.CONTINUE` vs `OnError.STOP`

Every step has an `on_error` policy that controls what happens when the step's
Expand Down Expand Up @@ -176,6 +182,9 @@ When `on_error=STOP` is set:
This guarantees transactional integrity — you can inspect what was processed
before the failure.

In other words, `OnError.STOP` changes the compiled materialization plan, not
just the runtime error behavior.

## Error Materializers

When a step fails, an **error materializer** captures the exception and
Expand Down
18 changes: 0 additions & 18 deletions synaflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,6 @@
from .core.types import OnError, StepMode, StepParams, StepResult
from .execution.async_engine.executor import async_run
from .execution.sync_engine.executor import run
from .materializers import (
memory_materializer,
disk_materializer,
log_error_materializer,
disk_error_materializer,
composite_materializer,
composite_error_materializer,
to_materializer,
to_error_materializer,
)
from .serializers import (
json_serializer,
jsonl_serializer,
Expand All @@ -40,14 +30,6 @@
"PipelineEvent",
"StepEvent",
"MaterializationEvent",
"memory_materializer",
"disk_materializer",
"log_error_materializer",
"disk_error_materializer",
"composite_materializer",
"composite_error_materializer",
"to_materializer",
"to_error_materializer",
"json_serializer",
"jsonl_serializer",
"csv_serializer",
Expand Down
35 changes: 34 additions & 1 deletion synaflow/core/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ def to_serializable(self) -> dict:
return ret


@dataclass(frozen=True)
class ConsumerMaterializationPlan:
consumers: list[str]
eager_consumers: list[str]
lazy_consumers: list[str]


def _serialize_observers(observers: list) -> list[dict]:
result = []
for obs in observers:
Expand Down Expand Up @@ -174,13 +181,39 @@ def needs_materialize(self, step_name: str) -> bool:
if node is None:
return False

if node.on_error == OnError.STOP or node.force_materialize:
if self.requires_eager_materialization(step_name):
return True

return any(
step_name in consumer.materialized_deps for consumer in self.steps.values()
)

def requires_eager_materialization(self, step_name: str) -> bool:
node = self.steps.get(step_name)
if node is None:
return False
return node.on_error == OnError.STOP or node.force_materialize

def consumer_materialization_plan(
self, producer: str
) -> ConsumerMaterializationPlan:
consumers = self.consumers_of(producer)
eager_consumers = [
consumer
for consumer in consumers
if producer in self.steps[consumer].materialized_deps
]
lazy_consumers = [
consumer
for consumer in consumers
if producer not in self.steps[consumer].materialized_deps
]
return ConsumerMaterializationPlan(
consumers=consumers,
eager_consumers=eager_consumers,
lazy_consumers=lazy_consumers,
)

def get_execution_levels(self) -> list[list[str]]:
in_degree: dict[str, int] = {name: 0 for name in self.steps}
for name, node in self.steps.items():
Expand Down
160 changes: 113 additions & 47 deletions synaflow/core/dag_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,27 @@
All functions are stateless — no classes, no self.
"""

import inspect
import logging
import traceback
import types as _types
from collections.abc import MutableMapping, MutableSequence, MutableSet
from typing import Any, NamedTuple, get_args
from collections.abc import (
AsyncIterable as AbcAsyncIterable,
AsyncIterator as AbcAsyncIterator,
Iterable as AbcIterable,
Iterator as AbcIterator,
MutableMapping,
MutableSequence,
MutableSet,
)
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Iterable,
Iterator,
NamedTuple,
get_args,
)

from synaflow.core.dag import Dag, DagNode
from synaflow.core.dag_dependencies import initialize_parameters
Expand All @@ -41,9 +56,12 @@
)
from synaflow.core.type_compatibility import (
get_inner_type,
is_async_stream_type,
is_factory,
is_iterable_type,
is_materialized_consumer,
is_scalar,
is_sync_stream_type,
)
from synaflow.core.types import ErrorMaterializeContext, MaterializeContext, OnError

Expand All @@ -67,9 +85,24 @@ def memory_materializer_factory(ctx: MaterializeContext):
continue
if tp is tuple:
return tuple
if tp is not None and is_scalar(tp):
return _identity
return list
if is_scalar(tp):
return _identity
if tp in (
AsyncIterator,
Iterator,
Iterable,
AsyncIterable,
AbcAsyncIterator,
AbcIterator,
AbcIterable,
AbcAsyncIterable,
):
return list

raise ValueError(
f"Cannot infer memory materializer for consumer type: '{tp}'. "
"Please provide explicit type hints for your consumer parameters, or use a step-level materializer."
)


memory_materializer_factory.__name__ = "memory_materializer"
Expand All @@ -78,7 +111,7 @@ def memory_materializer_factory(ctx: MaterializeContext):
def log_error_materializer_factory(ctx: ErrorMaterializeContext):
log = logging.getLogger("synaflow")

def handle_error(exc: BaseException) -> None:
def log_error(exc: BaseException) -> None:
log.warning(
"[%s] [%s] %s: %s",
ctx.pipeline_name,
Expand All @@ -88,7 +121,7 @@ def handle_error(exc: BaseException) -> None:
)
log.debug(traceback.format_exc())

return handle_error
return log_error


log_error_materializer_factory.__name__ = "log_error_materializer"
Expand Down Expand Up @@ -130,39 +163,6 @@ def _validate_params_is_namedtuple(params: Any, pipeline_name: str) -> None:
)


def _validate_materializer_factory(name: str, mat: Any, is_error: bool = False) -> None:
if mat is None:
return
if not callable(mat):
raise TypeError(
f"Node '{name}': {'error materializer' if is_error else 'materializer'} must be a callable factory, got {type(mat).__name__}"
)

# Built-in collection types are commonly passed by mistake instead of factory
if isinstance(mat, type) and mat in (list, set, dict, tuple):
label = "error materializer" if is_error else "materializer"
helper = "to_error_materializer" if is_error else "to_materializer"
raise ValueError(
f"Node '{name}': {label} cannot be a direct type/callable '{mat.__name__}'. "
f"Please wrap it using {helper}({mat.__name__})."
)

# Let's inspect the signature to verify it accepts a context argument
try:
sig = inspect.signature(mat)
has_params = len(sig.parameters) > 0
except (ValueError, TypeError):
has_params = False

if not has_params:
label = "error materializer" if is_error else "materializer"
helper = "to_error_materializer" if is_error else "to_materializer"
raise ValueError(
f"Node '{name}': {label} factory must accept at least one argument (context). "
f"If you want to use a direct callable, wrap it using {helper}(...)."
)


def _validate_declared_step_names(steps: list[Any], pipeline_name: str) -> None:
for step in steps:
if hasattr(step, "name"):
Expand Down Expand Up @@ -202,17 +202,83 @@ def _resolve_materializers(
node.error_materializer = None
continue

mat = node.materializer or pipeline_materializer or memory_materializer_factory
_validate_materializer_factory(name, mat, is_error=False)
node.materializer = mat

has_explicit_mat = (
node.materializer is not None or pipeline_materializer is not None
)
is_stream = is_sync_stream_type(node.output) or is_async_stream_type(
node.output
)
is_untyped = node.output is None
is_scalar = not is_untyped and not is_iterable_type(node.output)
has_consumers = bool(dag.consumers_of(name))

mat = None
if has_explicit_mat:
mat = node.materializer or pipeline_materializer
else:
if is_scalar:
mat = None
elif is_stream:
if has_consumers:
mat = memory_materializer_factory
else:
mat = None
elif is_untyped:
if has_consumers:
mat = memory_materializer_factory
else:
mat = None

if mat and is_factory(mat):
consumers = []
for consumer_node in dag.steps.values():
if name in consumer_node.deps:
consumers.append(consumer_node)

consumer_type = None
if consumers:
mat_consumers = [
c for c in consumers if name in getattr(c, "materialized_deps", [])
]
if mat_consumers:
consumer_type = mat_consumers[0].deps.get(name)
from synaflow.core.type_compatibility import is_type_compatible

for other in mat_consumers[1:]:
other_tp = other.deps.get(name)
if (
consumer_type != other_tp
and not is_type_compatible(consumer_type, other_tp)
and not is_type_compatible(other_tp, consumer_type)
):
raise ValueError(
f"Pipeline '{dag.name}': step '{name}' has consumers with incompatible types: "
f"'{mat_consumers[0].name}' expects {consumer_type} but '{other.name}' expects {other_tp}."
)
else:
consumer_type = consumers[0].deps.get(name)
ctx = MaterializeContext(
pipeline_name=dag.name,
dataset_name=name,
item_type=node.output,
consumer_type=consumer_type,
)
node.materializer = mat(ctx)
else:
node.materializer = mat
err_mat = (
node.error_materializer
or pipeline_error_materializer
or log_error_materializer_factory
)
_validate_materializer_factory(name, err_mat, is_error=True)
node.error_materializer = err_mat
if err_mat and is_factory(err_mat):
err_ctx = ErrorMaterializeContext(
pipeline_name=dag.name,
dataset_name=name,
)
node.error_materializer = err_mat(err_ctx)
else:
node.error_materializer = err_mat

if (
node.output
Expand Down
Loading
Loading