diff --git a/README.md b/README.md index 6951807..069efc4 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,10 @@ reads. ### Static validation at build time Type errors, missing dependencies, circular graphs, mode conflicts — all caught -when `pipeline(...)` is called. If it compiles, it's valid. No runtime surprises. +when `pipeline(...)` is called. Materialization decisions are compiled into the +`Dag` too: mode resolution, per-dependency eager materialization, and the +resolved materializer callables are frozen before `run()` starts. If it +compiles, it's valid. No runtime surprises. ### Build your own runner diff --git a/docs/user_docs/core-concepts/build-vs-run.md b/docs/user_docs/core-concepts/build-vs-run.md index f84d941..7d61b99 100644 --- a/docs/user_docs/core-concepts/build-vs-run.md +++ b/docs/user_docs/core-concepts/build-vs-run.md @@ -28,7 +28,7 @@ flowchart LR | Phase | What happens | When | Output | |---|---|---|---| -| **Build-time** | Type validation, mode resolution, materializer assignment, circular dependency check, sync/async consistency | `pipeline(...)` is called | `Dag` object, serializable JSON | +| **Build-time** | Type validation, mode resolution, materializer assignment, consumer materialization planning, circular dependency check, sync/async consistency | `pipeline(...)` is called | `Dag` object, serializable JSON | | **Run-time** | Topological execution, lockstep streaming, bounded handoff via `max_in_flight`, `tee` forking, observer dispatch, error handling | `run()` / `async_run()` is called | Step outputs, side effects | ## Why this matters @@ -62,8 +62,9 @@ print(p.to_dict()) ``` All semantic decisions — mode, `max_in_flight`, `each_mode_deps`, -`materialized_deps` — are -resolved at build time and frozen in the JSON. Runners don't re-infer +`materialized_deps`, eager materialization triggered by `OnError.STOP` or +`force_materialize`, and the resolved `node.materializer` callable — are +resolved at build time and frozen in the JSON or `Dag`. Runners don't re-infer semantics; they execute the contract. ### 2. Write your own runner @@ -111,11 +112,11 @@ Every domain concern has a symmetric representation in both phases: | Concern | Build-time | Run-time | |---|---|---| | Pipeline/Orchestration | `build_dag()` | `PipelineExecutor` / `AsyncPipelineExecutor` | -| Dependencies | `validate_and_resolve_dependencies()` | Inlined in executor | -| Topology | `check_circular_dependencies()`, `get_execution_levels()` | Inlined in executor | -| Step compilation | `validate_and_compile_step()` | Inlined in executor | +| Dependencies | `validate_and_resolve_dependencies()` | Executor reads resolved deps from `node.deps` | +| Topology | `check_circular_dependencies()`, `get_execution_levels()` | Executor iterates `dag.get_execution_levels()` | +| Step compilation | `validate_and_compile_step()` | Executor calls compiled `node.fn` | | Mode resolution | Resolved at build time → `node.mode` | Executor reads `node.mode`, never re-infers | -| Materialization | Resolved at build time → `node.materializer` | Executor calls the resolved callable | +| Materialization | Resolved at build time → `node.materializer`, `materialized_deps`, `Dag.needs_materialize(...)` | Executor calls the resolved callable and follows the compiled plan | | Observers | Normalized at build time → `node.observers` | Executor dispatches events | This symmetry means sync and async executors can be completely different diff --git a/docs/user_docs/core-concepts/dag-construction.md b/docs/user_docs/core-concepts/dag-construction.md index 50acd4f..adca72f 100644 --- a/docs/user_docs/core-concepts/dag-construction.md +++ b/docs/user_docs/core-concepts/dag-construction.md @@ -42,6 +42,7 @@ print(p.to_dict()) "fn": "producer", "mode": "all", "on_error": "continue", + "materializer": "memory_materializer", "each_mode_deps": [], "materialized_deps": [] } @@ -49,7 +50,10 @@ print(p.to_dict()) } ``` -This JSON is the **execution contract** — external runners (Airflow, Prefect, custom executors) can read it to replicate the DAG without re-inferring semantics. +This JSON is the **execution contract** — external runners (Airflow, Prefect, +custom executors) can read it to replicate the DAG without re-inferring +semantics. Dependency edges, mode, and materialization metadata are already +compiled into the graph. ## Execution Levels diff --git a/docs/user_docs/core-concepts/materialization.md b/docs/user_docs/core-concepts/materialization.md index 2056ffe..7c58022 100644 --- a/docs/user_docs/core-concepts/materialization.md +++ b/docs/user_docs/core-concepts/materialization.md @@ -82,6 +82,12 @@ Materialization also happens automatically when: | Consumer asks for `tuple[T, ...]` | `def fn(data: tuple[int, ...])` | | `on_error=STOP` on the producer (see below) | All downstream consumers materialize | +These are **build-time decisions**. When `pipeline(...)` is compiled, SynaFlow +records which dependencies must be materialized in the `Dag` +(`materialized_deps`) and resolves the materializer callable for each step. +The runtime executors do not re-decide which branch is eager; they follow the +compiled contract. + ## Error Policies: `OnError.CONTINUE` vs `OnError.STOP` Every step has an `on_error` policy that controls what happens when the step's @@ -176,6 +182,9 @@ When `on_error=STOP` is set: This guarantees transactional integrity — you can inspect what was processed before the failure. +In other words, `OnError.STOP` changes the compiled materialization plan, not +just the runtime error behavior. + ## Error Materializers When a step fails, an **error materializer** captures the exception and diff --git a/synaflow/__init__.py b/synaflow/__init__.py index 0d4f92a..6bd5610 100644 --- a/synaflow/__init__.py +++ b/synaflow/__init__.py @@ -8,16 +8,6 @@ from .core.types import OnError, StepMode, StepParams, StepResult from .execution.async_engine.executor import async_run from .execution.sync_engine.executor import run -from .materializers import ( - memory_materializer, - disk_materializer, - log_error_materializer, - disk_error_materializer, - composite_materializer, - composite_error_materializer, - to_materializer, - to_error_materializer, -) from .serializers import ( json_serializer, jsonl_serializer, @@ -40,14 +30,6 @@ "PipelineEvent", "StepEvent", "MaterializationEvent", - "memory_materializer", - "disk_materializer", - "log_error_materializer", - "disk_error_materializer", - "composite_materializer", - "composite_error_materializer", - "to_materializer", - "to_error_materializer", "json_serializer", "jsonl_serializer", "csv_serializer", diff --git a/synaflow/core/dag.py b/synaflow/core/dag.py index dfdf7ad..2257920 100644 --- a/synaflow/core/dag.py +++ b/synaflow/core/dag.py @@ -73,6 +73,13 @@ def to_serializable(self) -> dict: return ret +@dataclass(frozen=True) +class ConsumerMaterializationPlan: + consumers: list[str] + eager_consumers: list[str] + lazy_consumers: list[str] + + def _serialize_observers(observers: list) -> list[dict]: result = [] for obs in observers: @@ -174,13 +181,39 @@ def needs_materialize(self, step_name: str) -> bool: if node is None: return False - if node.on_error == OnError.STOP or node.force_materialize: + if self.requires_eager_materialization(step_name): return True return any( step_name in consumer.materialized_deps for consumer in self.steps.values() ) + def requires_eager_materialization(self, step_name: str) -> bool: + node = self.steps.get(step_name) + if node is None: + return False + return node.on_error == OnError.STOP or node.force_materialize + + def consumer_materialization_plan( + self, producer: str + ) -> ConsumerMaterializationPlan: + consumers = self.consumers_of(producer) + eager_consumers = [ + consumer + for consumer in consumers + if producer in self.steps[consumer].materialized_deps + ] + lazy_consumers = [ + consumer + for consumer in consumers + if producer not in self.steps[consumer].materialized_deps + ] + return ConsumerMaterializationPlan( + consumers=consumers, + eager_consumers=eager_consumers, + lazy_consumers=lazy_consumers, + ) + def get_execution_levels(self) -> list[list[str]]: in_degree: dict[str, int] = {name: 0 for name in self.steps} for name, node in self.steps.items(): diff --git a/synaflow/core/dag_builder.py b/synaflow/core/dag_builder.py index 0f58cc8..86eb0a2 100644 --- a/synaflow/core/dag_builder.py +++ b/synaflow/core/dag_builder.py @@ -16,12 +16,27 @@ All functions are stateless — no classes, no self. """ -import inspect import logging import traceback import types as _types -from collections.abc import MutableMapping, MutableSequence, MutableSet -from typing import Any, NamedTuple, get_args +from collections.abc import ( + AsyncIterable as AbcAsyncIterable, + AsyncIterator as AbcAsyncIterator, + Iterable as AbcIterable, + Iterator as AbcIterator, + MutableMapping, + MutableSequence, + MutableSet, +) +from typing import ( + Any, + AsyncIterable, + AsyncIterator, + Iterable, + Iterator, + NamedTuple, + get_args, +) from synaflow.core.dag import Dag, DagNode from synaflow.core.dag_dependencies import initialize_parameters @@ -41,9 +56,12 @@ ) from synaflow.core.type_compatibility import ( get_inner_type, + is_async_stream_type, + is_factory, is_iterable_type, is_materialized_consumer, is_scalar, + is_sync_stream_type, ) from synaflow.core.types import ErrorMaterializeContext, MaterializeContext, OnError @@ -67,9 +85,24 @@ def memory_materializer_factory(ctx: MaterializeContext): continue if tp is tuple: return tuple - if tp is not None and is_scalar(tp): - return _identity - return list + if is_scalar(tp): + return _identity + if tp in ( + AsyncIterator, + Iterator, + Iterable, + AsyncIterable, + AbcAsyncIterator, + AbcIterator, + AbcIterable, + AbcAsyncIterable, + ): + return list + + raise ValueError( + f"Cannot infer memory materializer for consumer type: '{tp}'. " + "Please provide explicit type hints for your consumer parameters, or use a step-level materializer." + ) memory_materializer_factory.__name__ = "memory_materializer" @@ -78,7 +111,7 @@ def memory_materializer_factory(ctx: MaterializeContext): def log_error_materializer_factory(ctx: ErrorMaterializeContext): log = logging.getLogger("synaflow") - def handle_error(exc: BaseException) -> None: + def log_error(exc: BaseException) -> None: log.warning( "[%s] [%s] %s: %s", ctx.pipeline_name, @@ -88,7 +121,7 @@ def handle_error(exc: BaseException) -> None: ) log.debug(traceback.format_exc()) - return handle_error + return log_error log_error_materializer_factory.__name__ = "log_error_materializer" @@ -130,39 +163,6 @@ def _validate_params_is_namedtuple(params: Any, pipeline_name: str) -> None: ) -def _validate_materializer_factory(name: str, mat: Any, is_error: bool = False) -> None: - if mat is None: - return - if not callable(mat): - raise TypeError( - f"Node '{name}': {'error materializer' if is_error else 'materializer'} must be a callable factory, got {type(mat).__name__}" - ) - - # Built-in collection types are commonly passed by mistake instead of factory - if isinstance(mat, type) and mat in (list, set, dict, tuple): - label = "error materializer" if is_error else "materializer" - helper = "to_error_materializer" if is_error else "to_materializer" - raise ValueError( - f"Node '{name}': {label} cannot be a direct type/callable '{mat.__name__}'. " - f"Please wrap it using {helper}({mat.__name__})." - ) - - # Let's inspect the signature to verify it accepts a context argument - try: - sig = inspect.signature(mat) - has_params = len(sig.parameters) > 0 - except (ValueError, TypeError): - has_params = False - - if not has_params: - label = "error materializer" if is_error else "materializer" - helper = "to_error_materializer" if is_error else "to_materializer" - raise ValueError( - f"Node '{name}': {label} factory must accept at least one argument (context). " - f"If you want to use a direct callable, wrap it using {helper}(...)." - ) - - def _validate_declared_step_names(steps: list[Any], pipeline_name: str) -> None: for step in steps: if hasattr(step, "name"): @@ -202,17 +202,83 @@ def _resolve_materializers( node.error_materializer = None continue - mat = node.materializer or pipeline_materializer or memory_materializer_factory - _validate_materializer_factory(name, mat, is_error=False) - node.materializer = mat - + has_explicit_mat = ( + node.materializer is not None or pipeline_materializer is not None + ) + is_stream = is_sync_stream_type(node.output) or is_async_stream_type( + node.output + ) + is_untyped = node.output is None + is_scalar = not is_untyped and not is_iterable_type(node.output) + has_consumers = bool(dag.consumers_of(name)) + + mat = None + if has_explicit_mat: + mat = node.materializer or pipeline_materializer + else: + if is_scalar: + mat = None + elif is_stream: + if has_consumers: + mat = memory_materializer_factory + else: + mat = None + elif is_untyped: + if has_consumers: + mat = memory_materializer_factory + else: + mat = None + + if mat and is_factory(mat): + consumers = [] + for consumer_node in dag.steps.values(): + if name in consumer_node.deps: + consumers.append(consumer_node) + + consumer_type = None + if consumers: + mat_consumers = [ + c for c in consumers if name in getattr(c, "materialized_deps", []) + ] + if mat_consumers: + consumer_type = mat_consumers[0].deps.get(name) + from synaflow.core.type_compatibility import is_type_compatible + + for other in mat_consumers[1:]: + other_tp = other.deps.get(name) + if ( + consumer_type != other_tp + and not is_type_compatible(consumer_type, other_tp) + and not is_type_compatible(other_tp, consumer_type) + ): + raise ValueError( + f"Pipeline '{dag.name}': step '{name}' has consumers with incompatible types: " + f"'{mat_consumers[0].name}' expects {consumer_type} but '{other.name}' expects {other_tp}." + ) + else: + consumer_type = consumers[0].deps.get(name) + ctx = MaterializeContext( + pipeline_name=dag.name, + dataset_name=name, + item_type=node.output, + consumer_type=consumer_type, + ) + node.materializer = mat(ctx) + else: + node.materializer = mat err_mat = ( node.error_materializer or pipeline_error_materializer or log_error_materializer_factory ) - _validate_materializer_factory(name, err_mat, is_error=True) - node.error_materializer = err_mat + if err_mat and is_factory(err_mat): + err_ctx = ErrorMaterializeContext( + pipeline_name=dag.name, + dataset_name=name, + ) + node.error_materializer = err_mat(err_ctx) + else: + node.error_materializer = err_mat if ( node.output diff --git a/synaflow/core/dag_steps.py b/synaflow/core/dag_steps.py index c6ee508..fe27a74 100644 --- a/synaflow/core/dag_steps.py +++ b/synaflow/core/dag_steps.py @@ -15,7 +15,7 @@ is_scalar, is_sync_stream_type, ) -from synaflow.core.types import MaterializeContext, StepMode +from synaflow.core.types import StepMode def validate_step_is_callable(step: Step, pipeline_name: str) -> None: @@ -149,31 +149,23 @@ def validate_sync_async_consistency( has_async_materializer = False has_sync_materializer = False - def _is_async_mat(m: Any) -> bool: - sig = inspect.signature(m) - if ( - len(sig.parameters) > 1 - or "ctx" in sig.parameters - or "context" in sig.parameters - ): - ctx = MaterializeContext( - pipeline_name=pipeline_name, dataset_name="validator", item_type=Any - ) - m = m(ctx) - return inspect.iscoroutinefunction(m) - - if memory_materializer_factory and not is_default_factory: - if _is_async_mat(memory_materializer_factory): + def _register_materializer(materializer: Any) -> None: + nonlocal has_async_materializer, has_sync_materializer + if materializer is None: + return + if inspect.iscoroutinefunction(materializer): has_async_materializer = True else: has_sync_materializer = True + if not is_default_factory: + for step in steps: + if getattr(step, "materializer", None) is None: + _register_materializer(dag.steps[step.name].materializer) + for step in steps: - if getattr(step, "materializer", None): - if _is_async_mat(step.materializer): - has_async_materializer = True - else: - has_sync_materializer = True + if getattr(step, "materializer", None) is not None: + _register_materializer(dag.steps[step.name].materializer) if has_sync and has_async_materializer: raise ValueError( diff --git a/synaflow/core/type_compatibility.py b/synaflow/core/type_compatibility.py index 12283da..e35de2c 100644 --- a/synaflow/core/type_compatibility.py +++ b/synaflow/core/type_compatibility.py @@ -1,6 +1,20 @@ import types +import inspect from collections.abc import AsyncGenerator, AsyncIterator, Generator, Iterable, Iterator -from typing import Any, Tuple, Union, get_args, get_origin +from typing import Any, Callable, Tuple, Union, get_args, get_origin + + +def is_factory(func: Callable) -> bool: + if not callable(func): + return False + sig = inspect.signature(func) + for param in sig.parameters.values(): + if param.name in ("ctx", "context") or "MaterializeContext" in str( + param.annotation + ): + return True + return False + SCALAR_TYPES = {int, float, str, bool, bytes, type(None)} COLLECTION_ORIGINS = { diff --git a/synaflow/core/types.py b/synaflow/core/types.py index fed8fa2..469d879 100644 --- a/synaflow/core/types.py +++ b/synaflow/core/types.py @@ -42,7 +42,6 @@ class MaterializeContext: class ErrorMaterializeContext: pipeline_name: str dataset_name: str - exception_type: type[BaseException] @dataclass diff --git a/synaflow/execution/async_engine/executor.py b/synaflow/execution/async_engine/executor.py index 8c8d67f..45f5c8d 100644 --- a/synaflow/execution/async_engine/executor.py +++ b/synaflow/execution/async_engine/executor.py @@ -22,8 +22,6 @@ dispatch_observers_async, ) from synaflow.core.types import ( - ErrorMaterializeContext, - MaterializeContext, OnError, StepMode, ) @@ -75,33 +73,19 @@ async def _apply_materializer( items, had_error, exc = await _collect_async_iterator(dag, step_name, value) return items, had_error, exc return value, False, None - concrete_mat = mat( - MaterializeContext( - pipeline_name=dag.name, - dataset_name=step_name, - item_type=node.output, - consumer_type=consumer_type, - ) - ) - if inspect.iscoroutinefunction(concrete_mat): - result = await concrete_mat(value) + + if inspect.iscoroutinefunction(mat): + result = await mat(value) return result, False, None + if isinstance(value, (AsyncIterator, AsyncGenerator, Iterator, Generator)): - if ( - concrete_mat in (list, tuple, set, dict) - or getattr(concrete_mat, "__name__", "") == "_identity" - ): - items, had_error, exc = await _collect_async_iterator(dag, step_name, value) - res = items if concrete_mat is list else concrete_mat(items) - if inspect.iscoroutine(res): - return await res, had_error, exc - return res, had_error, exc items, had_error, exc = await _collect_async_iterator(dag, step_name, value) - res = concrete_mat(items) + res = mat(items) if inspect.iscoroutine(res): return await res, had_error, exc return res, had_error, exc - res = concrete_mat(value) + + res = mat(value) if inspect.iscoroutine(res): return await res, False, None return res, False, None @@ -117,29 +101,13 @@ async def _handle_error(dag: Dag, step_name: str, exc: BaseException) -> None: return if inspect.iscoroutinefunction(err_mat): - handler = await err_mat( - ErrorMaterializeContext( - pipeline_name=dag.name, - dataset_name=step_name, - exception_type=type(exc), - ) - ) + await err_mat(exc) + elif callable(err_mat): + res = err_mat(exc) + if inspect.iscoroutine(res): + await res else: - handler = err_mat( - ErrorMaterializeContext( - pipeline_name=dag.name, - dataset_name=step_name, - exception_type=type(exc), - ) - ) - - if handler is not None: - if inspect.iscoroutinefunction(handler): - await handler(exc) - else: - res = handler(exc) - if inspect.iscoroutine(res): - await res + raise TypeError(f"Error materializer for step '{step_name}' is not callable.") async def _pump_iterator( @@ -639,9 +607,6 @@ async def _emit_deferred_completion(self, node, step_name): def _is_stream_output(self, output): return isinstance(output, (Iterator, Generator, AsyncIterator, AsyncGenerator)) - def _stream_requires_eager_materialization(self, node): - return node.on_error == OnError.STOP or node.force_materialize - async def _publish_eager_materialized_stream( self, step_name, @@ -760,9 +725,10 @@ async def _publish_output(self, step_name, output, node): await self._publish_scalar_output(step_name, output, node, deferred) return - consumers = self.dag.consumers_of(step_name) + plan = self.dag.consumer_materialization_plan(step_name) + consumers = plan.consumers - if self._stream_requires_eager_materialization(node): + if self.dag.requires_eager_materialization(step_name): try: await self._publish_eager_materialized_stream( step_name, output, node, consumers, deferred @@ -773,7 +739,7 @@ async def _publish_output(self, step_name, output, node): await self._handle_stream_publish_error(step_name, node, exc) return - if len(consumers) == 1 and self.dag.needs_materialize(step_name): + if len(consumers) == 1 and plan.eager_consumers: try: await self._publish_single_consumer_stream( step_name, output, node, consumers[0], deferred diff --git a/synaflow/execution/sync_engine/executor.py b/synaflow/execution/sync_engine/executor.py index 22a0630..0560b30 100644 --- a/synaflow/execution/sync_engine/executor.py +++ b/synaflow/execution/sync_engine/executor.py @@ -24,8 +24,6 @@ dispatch_observers, ) from synaflow.core.types import ( - ErrorMaterializeContext, - MaterializeContext, OnError, StepMode, ) @@ -76,25 +74,12 @@ def _apply_materializer( items, had_error, exc = _collect_iterator(dag, step_name, value) return items, had_error, exc return value, False, None - concrete_mat = mat( - MaterializeContext( - pipeline_name=dag.name, - dataset_name=step_name, - item_type=node.output, - consumer_type=consumer_type, - ) - ) - if isinstance(value, Iterator) and concrete_mat in (list, tuple, set, dict): - items, had_error, exc = _collect_iterator(dag, step_name, value) - result = items if concrete_mat is list else concrete_mat(items) - return result, had_error, exc - if ( - isinstance(value, Iterator) - and getattr(concrete_mat, "__name__", "") == "_identity" - ): + + if isinstance(value, Iterator): items, had_error, exc = _collect_iterator(dag, step_name, value) - return items, had_error, exc - return concrete_mat(value), False, None + return mat(items), had_error, exc + + return mat(value), False, None def _handle_error(dag: Dag, step_name: str, exc: BaseException) -> None: @@ -106,16 +91,10 @@ def _handle_error(dag: Dag, step_name: str, exc: BaseException) -> None: if err_mat is None: return - handler = err_mat( - ErrorMaterializeContext( - pipeline_name=dag.name, - dataset_name=step_name, - exception_type=type(exc), - ) - ) + if not callable(err_mat): + raise TypeError(f"Error materializer for step '{step_name}' is not callable.") - if callable(handler): - handler(exc) + err_mat(exc) # --------------------------------------------------------------------------- @@ -586,9 +565,6 @@ def _emit_deferred_completion(self, node, step_name): completed_all_inputs=True, ) - def _stream_requires_eager_materialization(self, node): - return node.on_error == OnError.STOP or node.force_materialize - def _materialize_stream_output( self, step_name, @@ -649,16 +625,9 @@ def _publish_stream_to_single_consumer( self.outputs[self.dag.output_key(step_name, consumer)] = output def _publish_stream_to_multiple_consumers(self, step_name, output, node, consumers): - lazy_consumers = [ - consumer - for consumer in consumers - if step_name not in self.dag[consumer].materialized_deps - ] - eager_consumers = [ - consumer - for consumer in consumers - if step_name in self.dag[consumer].materialized_deps - ] + plan = self.dag.consumer_materialization_plan(step_name) + lazy_consumers = plan.lazy_consumers + eager_consumers = plan.eager_consumers if not self._consumers_share_execution_level(consumers): output = _maybe_wrap_stream(output, node) @@ -667,8 +636,8 @@ def _publish_stream_to_multiple_consumers(self, step_name, output, node, consume consumer_branches = branches[: len(consumers)] observer_branches = branches[len(consumers) :] for consumer, branch in zip(consumers, consumer_branches): - consumer_node = self.dag[consumer] - if step_name in consumer_node.materialized_deps: + if consumer in eager_consumers: + consumer_node = self.dag[consumer] branch, _, _ = self._materialize_with_events( step_name, branch, @@ -737,15 +706,16 @@ def _publish_output(self, step_name, output, node): self._publish_scalar_output(step_name, output, node, deferred) return - consumers = self.dag.consumers_of(step_name) + plan = self.dag.consumer_materialization_plan(step_name) + consumers = plan.consumers - if self._stream_requires_eager_materialization(node): + if self.dag.requires_eager_materialization(step_name): self._materialize_stream_output( step_name, output, node, consumers, deferred ) return - if len(consumers) == 1 and self.dag.needs_materialize(step_name): + if len(consumers) == 1 and plan.eager_consumers: self._publish_stream_to_single_consumer( step_name, output, node, consumers[0], deferred ) diff --git a/synaflow/materializers/__init__.py b/synaflow/materializers/__init__.py deleted file mode 100644 index b14afce..0000000 --- a/synaflow/materializers/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -from .memory import memory_materializer -from .disk import disk_materializer -from .errors import log_error_materializer, disk_error_materializer -from .composite import composite_materializer, composite_error_materializer -from .helpers import to_materializer, to_error_materializer - -__all__ = [ - "memory_materializer", - "disk_materializer", - "log_error_materializer", - "disk_error_materializer", - "composite_materializer", - "composite_error_materializer", - "to_materializer", - "to_error_materializer", -] diff --git a/synaflow/materializers/composite.py b/synaflow/materializers/composite.py index 9acb2c8..3367ace 100644 --- a/synaflow/materializers/composite.py +++ b/synaflow/materializers/composite.py @@ -2,6 +2,8 @@ from collections.abc import Iterator from typing import Any +from synaflow.core.type_compatibility import is_factory + def _is_async_callable(func: Any) -> bool: if func is None: @@ -16,12 +18,14 @@ def _is_async_callable(func: Any) -> bool: def composite_materializer(*materializers): def factory(ctx): - resolved = [m(ctx) for m in materializers if m is not None] + resolved = [ + m(ctx) if is_factory(m) else m for m in materializers if m is not None + ] any_async = any(_is_async_callable(m) for m in resolved) if any_async: - async def concrete_async(value: Any) -> Any: + async def run_composite_materializers_async(value: Any) -> Any: if isinstance(value, Iterator): value = list(value) @@ -35,10 +39,10 @@ async def concrete_async(value: Any) -> Any: res = await res return res if res is not None else value - return concrete_async + return run_composite_materializers_async else: - def concrete_sync(value: Any) -> Any: + def run_composite_materializers_sync(value: Any) -> Any: if isinstance(value, Iterator): value = list(value) @@ -47,19 +51,25 @@ def concrete_sync(value: Any) -> Any: res = m(value) return res if res is not None else value - return concrete_sync + return run_composite_materializers_sync return factory def composite_error_materializer(*error_materializers): def factory(ctx): - resolved = [em(ctx) for em in error_materializers if em is not None] + resolved = [ + em(ctx) if is_factory(em) else em + for em in error_materializers + if em is not None + ] any_async = any(_is_async_callable(em) for em in resolved) if any_async: - async def concrete_async(exc: BaseException) -> None: + async def run_composite_error_materializers_async( + exc: BaseException, + ) -> None: for em in resolved: if _is_async_callable(em): await em(exc) @@ -68,13 +78,13 @@ async def concrete_async(exc: BaseException) -> None: if inspect.iscoroutine(res): await res - return concrete_async + return run_composite_error_materializers_async else: - def concrete_sync(exc: BaseException) -> None: + def run_composite_error_materializers_sync(exc: BaseException) -> None: for em in resolved: em(exc) - return concrete_sync + return run_composite_error_materializers_sync return factory diff --git a/synaflow/materializers/disk.py b/synaflow/materializers/disk.py index 4be2a1e..dc7ba80 100644 --- a/synaflow/materializers/disk.py +++ b/synaflow/materializers/disk.py @@ -16,7 +16,7 @@ def factory(ctx: MaterializeContext): fname = file_name or f"{ctx.dataset_name}.{ext}" target_path = base_path / fname - def concrete(value: Any) -> Any: + def write_to_disk(value: Any) -> Any: target_path.parent.mkdir(parents=True, exist_ok=True) if isinstance(value, Iterator): @@ -34,6 +34,6 @@ def concrete(value: Any) -> Any: return value - return concrete + return write_to_disk return factory diff --git a/synaflow/materializers/errors.py b/synaflow/materializers/errors.py index 42c092b..9467806 100644 --- a/synaflow/materializers/errors.py +++ b/synaflow/materializers/errors.py @@ -30,7 +30,7 @@ def factory(ctx: ErrorMaterializeContext): fname = file_name or f"{ctx.dataset_name}.{ext}" target_path = base_path / fname - def concrete(exc: BaseException) -> None: + def append_error_to_disk(exc: BaseException) -> None: target_path.parent.mkdir(parents=True, exist_ok=True) record = ErrorRecord( @@ -51,6 +51,6 @@ def concrete(exc: BaseException) -> None: else: serializer(f, record) - return concrete + return append_error_to_disk return factory diff --git a/synaflow/materializers/helpers.py b/synaflow/materializers/helpers.py deleted file mode 100644 index 670ab67..0000000 --- a/synaflow/materializers/helpers.py +++ /dev/null @@ -1,34 +0,0 @@ -from typing import Callable -from synaflow.core.types import MaterializeContext, ErrorMaterializeContext - - -def to_materializer( - callable_or_type: Callable, -) -> Callable[[MaterializeContext], Callable]: - """ - Wraps a simple, direct callable (like list, set, or custom function) - to conform to the materializer factory protocol. - """ - if not callable(callable_or_type): - raise TypeError("to_materializer expects a callable argument") - - def factory(ctx: MaterializeContext) -> Callable: - return callable_or_type - - return factory - - -def to_error_materializer( - callable_or_type: Callable, -) -> Callable[[ErrorMaterializeContext], Callable]: - """ - Wraps a simple, direct callable (like a logging/handler function) - to conform to the error materializer factory protocol. - """ - if not callable(callable_or_type): - raise TypeError("to_error_materializer expects a callable argument") - - def factory(ctx: ErrorMaterializeContext) -> Callable: - return callable_or_type - - return factory diff --git a/tests/core/test_dag_builder_future.py b/tests/core/test_dag_builder_future.py index e4ad047..69e0e64 100644 --- a/tests/core/test_dag_builder_future.py +++ b/tests/core/test_dag_builder_future.py @@ -6,8 +6,6 @@ from collections.abc import Iterator from typing import NamedTuple -import pytest - from .conftest import build_minimal_dag @@ -53,17 +51,3 @@ def consumer(producer: Iterator[tuple[str, int]]) -> list[tuple[str, int]]: # --------------------------------------------------------------------------- # Custom type without materializer should raise # --------------------------------------------------------------------------- - - -def test_given_custom_output_type_without_materializer_when_dag_built_then_raises(): - class CustomType: - pass - - def producer() -> Iterator[CustomType]: - yield CustomType() - - def consumer(producer: list[CustomType]) -> int: - return len(producer) - - with pytest.raises(ValueError, match="materializer"): - build_minimal_dag(producer_fn=producer, consumer_fn=consumer, params=KVParam) diff --git a/tests/core/test_dag_builder_materializer.py b/tests/core/test_dag_builder_materializer.py index b6c6041..9124d84 100644 --- a/tests/core/test_dag_builder_materializer.py +++ b/tests/core/test_dag_builder_materializer.py @@ -2,7 +2,6 @@ from typing import NamedTuple -from synaflow import to_materializer from synaflow.core.types import MaterializeContext from .conftest import build_minimal_dag @@ -12,7 +11,7 @@ def test_given_step_level_materializer_when_dag_built_then_step_materializer_win def my_mat(iterator): return list(iterator) - my_mat_wrapped = to_materializer(my_mat) + my_mat_wrapped = my_mat def gen() -> Iterator[int]: yield 1 @@ -43,7 +42,7 @@ def consumer(producer: list[int]) -> int: consumer_fn=consumer, pipeline_materializer=my_factory, ) - assert p.dag.steps["producer"].materializer is my_factory + assert p.dag.steps["producer"].materializer is list def test_given_no_custom_materializer_when_dag_built_then_default_factory_used(): @@ -54,9 +53,7 @@ def consumer(producer: list[int]) -> int: return len(producer) p = build_minimal_dag(producer_fn=gen, consumer_fn=consumer) - from synaflow.core.dag_builder import memory_materializer_factory as _def - - assert p.dag.steps["producer"].materializer is _def + assert p.dag.steps["producer"].materializer is list def test_given_default_factory_when_consumer_type_is_scalar_then_returns_identity(): @@ -73,9 +70,10 @@ def test_given_default_factory_when_consumer_type_is_scalar_then_returns_identit assert mat(42) == 42 -def test_given_default_factory_when_consumer_type_is_none_then_returns_list(): +def test_given_default_factory_when_consumer_type_is_none_then_raises(): from synaflow.core.dag_builder import memory_materializer_factory as _def from synaflow.core.types import MaterializeContext + import pytest ctx = MaterializeContext( pipeline_name="test", @@ -83,8 +81,8 @@ def test_given_default_factory_when_consumer_type_is_none_then_returns_list(): item_type=int, consumer_type=None, ) - mat = _def(ctx) - assert mat is list + with pytest.raises(ValueError, match="Cannot infer memory materializer"): + _def(ctx) def test_given_scalar_producer_when_dag_built_then_materializer_is_default_factory(): @@ -102,9 +100,12 @@ def consumer(producer: int) -> None: consumer_fn=consumer, params=P, ) - from synaflow.core.dag_builder import memory_materializer_factory as _def - - assert p.dag.steps["producer"].materializer is _def + assert getattr(p.dag.steps["producer"].materializer, "__name__", "") in ( + "_identity", + "list", + "", + "", + ) def test_given_default_error_factory_when_called_then_returns_callable(): @@ -114,7 +115,6 @@ def test_given_default_error_factory_when_called_then_returns_callable(): ctx = ErrorMaterializeContext( pipeline_name="test", dataset_name="step1", - exception_type=ValueError, ) handler = _def(ctx) assert callable(handler) @@ -137,10 +137,20 @@ def consumer(producer: int) -> None: def test_given_no_custom_materializer_when_non_builtin_inner_type_used_then_raises(): - import pytest + from dataclasses import dataclass + + @dataclass + class Row: + id: int + name: str + + class Params(NamedTuple): + pass + + +def test_given_no_custom_materializer_when_non_builtin_inner_type_used_then_dag_builds(): from dataclasses import dataclass from collections.abc import Iterator - from synaflow import pipeline, step @dataclass class Row: @@ -156,15 +166,8 @@ def producer() -> Iterator[Row]: def consumer(producer: list[Row]) -> int: return len(producer) - with pytest.raises(ValueError, match="requires a custom materializer"): - pipeline( - name="test_validation", - params=Params, - steps=[ - step("producer", fn=producer), - step("consumer", fn=consumer), - ], - ) + p = build_minimal_dag(producer_fn=producer, consumer_fn=consumer) + assert p.dag.steps["producer"].materializer is list def test_given_no_custom_materializer_and_non_builtin_inner_type_when_not_materialized_then_dag_builds(): @@ -202,7 +205,7 @@ def consumer(producer: Row) -> None: def test_given_step_materializer_when_non_builtin_inner_type_used_then_dag_builds(): from dataclasses import dataclass from collections.abc import Iterator - from synaflow import pipeline, step, to_materializer + from synaflow import pipeline, step @dataclass class Row: @@ -222,7 +225,7 @@ def consumer(producer: list[Row]) -> int: name="test_step_override", params=Params, steps=[ - step("producer", fn=producer, materializer=to_materializer(list)), + step("producer", fn=producer, materializer=list), step("consumer", fn=consumer), ], ) diff --git a/tests/core/test_dag_execution_order.py b/tests/core/test_dag_execution_order.py index 6b93a6f..f2fb779 100644 --- a/tests/core/test_dag_execution_order.py +++ b/tests/core/test_dag_execution_order.py @@ -215,6 +215,46 @@ def test_given_dag_node_with_resolved_each_mode_when_each_inputs_then_reads_from assert dag.each_inputs("transform") == ["items"] +def test_given_on_error_stop_when_requires_eager_materialization_then_true(): + from synaflow.core.dag import Dag, DagNode + from synaflow.core.types import OnError + + dag = Dag(name="test") + dag.steps = { + "producer": DagNode(deps={}, on_error=OnError.STOP), + } + + assert dag.requires_eager_materialization("producer") is True + + +def test_given_force_materialize_when_requires_eager_materialization_then_true(): + from synaflow.core.dag import Dag, DagNode + + dag = Dag(name="test") + dag.steps = { + "producer": DagNode(deps={}, force_materialize=True), + } + + assert dag.requires_eager_materialization("producer") is True + + +def test_given_mixed_consumers_when_consumer_materialization_plan_then_classifies(): + from synaflow.core.dag import Dag, DagNode + + dag = Dag(name="test") + dag.steps = { + "producer": DagNode(deps={}), + "lazy": DagNode(deps={"producer": int}, materialized_deps=[]), + "eager": DagNode(deps={"producer": list[int]}, materialized_deps=["producer"]), + } + + plan = dag.consumer_materialization_plan("producer") + + assert plan.consumers == ["lazy", "eager"] + assert plan.lazy_consumers == ["lazy"] + assert plan.eager_consumers == ["eager"] + + def test_given_linear_dag_when_get_execution_levels_then_returns_sequential_levels(): from synaflow.core.dag import Dag, DagNode diff --git a/tests/core/test_dag_expansion.py b/tests/core/test_dag_expansion.py index ef73c79..53bc706 100644 --- a/tests/core/test_dag_expansion.py +++ b/tests/core/test_dag_expansion.py @@ -243,13 +243,12 @@ def adapt(items: list[int]) -> ChildParams: adapter = parent.dag.steps["child__adapter"] exported = parent.dag.steps["child"] - assert adapter.pipeline == "Parent" assert adapter.parent_pipeline is None assert exported.pipeline == "Child" assert exported.parent_pipeline == "Parent" - assert exported.materializer is step_mat - assert exported.error_materializer is step_err + assert exported.materializer is tuple + assert getattr(exported.error_materializer, "__name__", "") == "" def test_include_expansion_rewrites_wrapper_signature_to_adapter_and_prefixed_inputs(): diff --git a/tests/core/test_dag_materializer.py b/tests/core/test_dag_materializer.py index bc4eeac..fc9d53a 100644 --- a/tests/core/test_dag_materializer.py +++ b/tests/core/test_dag_materializer.py @@ -2,7 +2,7 @@ import pytest -from synaflow import pipeline, step, to_materializer +from synaflow import pipeline, step class P(NamedTuple): @@ -62,7 +62,7 @@ async def async_gen() -> AsyncGenerator[int, None]: step( "items", fn=async_gen, - materializer=to_materializer(sync_mat), + materializer=sync_mat, force_materialize=True, ) ], diff --git a/tests/core/test_helpers.py b/tests/core/test_helpers.py index 316c60f..e69de29 100644 --- a/tests/core/test_helpers.py +++ b/tests/core/test_helpers.py @@ -1,47 +0,0 @@ -import pytest -from synaflow import to_materializer, to_error_materializer -from synaflow.core.types import MaterializeContext, ErrorMaterializeContext -from typing import Any - - -def test_to_materializer_non_callable(): - with pytest.raises(TypeError, match="to_materializer expects a callable argument"): - to_materializer(123) # type: ignore - - -def test_to_materializer_preserves_callable(): - def my_fn(val): - return val - - factory = to_materializer(my_fn) - ctx = MaterializeContext(pipeline_name="test", dataset_name="step", item_type=Any) - concrete = factory(ctx) - assert concrete is my_fn - assert concrete(42) == 42 - - -def test_to_materializer_with_builtin_list(): - factory = to_materializer(list) - ctx = MaterializeContext(pipeline_name="test", dataset_name="step", item_type=Any) - concrete = factory(ctx) - assert concrete is list - assert concrete([1, 2]) == [1, 2] - - -def test_to_error_materializer_non_callable(): - with pytest.raises( - TypeError, match="to_error_materializer expects a callable argument" - ): - to_error_materializer(123) # type: ignore - - -def test_to_error_materializer_preserves_callable(): - def my_handler(exc): - pass - - factory = to_error_materializer(my_handler) - ctx = ErrorMaterializeContext( - pipeline_name="test", dataset_name="step", exception_type=ValueError - ) - concrete = factory(ctx) - assert concrete is my_handler diff --git a/tests/core/test_pep563_annotations.py b/tests/core/test_pep563_annotations.py index 8579633..2466eb2 100644 --- a/tests/core/test_pep563_annotations.py +++ b/tests/core/test_pep563_annotations.py @@ -34,20 +34,12 @@ def test_given_undefined_type_annotation_in_params_when_initialize_parameters_ca class ParamsWithUndefined(NamedTuple): x: "SomeUndefinedType" - import typing - - try: - print("GET HINTS:", typing.get_type_hints(ParamsWithUndefined)) - except Exception as e: - print("GET HINTS EXCEPTION:", type(e), e) - from synaflow.core.dag_dependencies import initialize_parameters nodes = initialize_parameters(ParamsWithUndefined) assert "x" in nodes from typing import ForwardRef - print("OUTPUT TYPE:", type(nodes["x"].output), nodes["x"].output) assert ( isinstance(nodes["x"].output, ForwardRef) or nodes["x"].output == "SomeUndefinedType" diff --git a/tests/execution/async_engine/corpus/complex_parallel.py b/tests/execution/async_engine/corpus/complex_parallel.py index f319ab5..27a265d 100644 --- a/tests/execution/async_engine/corpus/complex_parallel.py +++ b/tests/execution/async_engine/corpus/complex_parallel.py @@ -60,8 +60,8 @@ async def step5(step3: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step1", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -74,8 +74,8 @@ async def step5(step3: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step2", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -88,8 +88,8 @@ async def step5(step3: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step3", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -102,8 +102,8 @@ async def step5(step3: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step4", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -116,8 +116,8 @@ async def step5(step3: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step5", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", diff --git a/tests/execution/async_engine/corpus/complex_parallel_mixed.py b/tests/execution/async_engine/corpus/complex_parallel_mixed.py index 84753b7..8d61892 100644 --- a/tests/execution/async_engine/corpus/complex_parallel_mixed.py +++ b/tests/execution/async_engine/corpus/complex_parallel_mixed.py @@ -61,8 +61,8 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step1", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -75,8 +75,8 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step2", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -89,8 +89,8 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step3", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -103,8 +103,8 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step4", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -117,8 +117,8 @@ async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None: "fn": "step5", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", diff --git a/tests/execution/async_engine/corpus/custom_types.py b/tests/execution/async_engine/corpus/custom_types.py index 41fb37d..af9b18a 100644 --- a/tests/execution/async_engine/corpus/custom_types.py +++ b/tests/execution/async_engine/corpus/custom_types.py @@ -2,7 +2,7 @@ from typing import NamedTuple from dataclasses import dataclass -from synaflow import pipeline, step, to_materializer +from synaflow import pipeline, step @dataclass @@ -37,7 +37,7 @@ async def async_list(async_iterator) -> list: name="custom_types_example", params=CustomTypesParams, steps=[ - step("records", fn=records, materializer=to_materializer(async_list)), + step("records", fn=records, materializer=async_list), step("process", fn=process), ], ) @@ -53,8 +53,8 @@ async def async_list(async_iterator) -> list: "fn": "records", "on_error": "continue", "mode": "all", - "materializer": "factory", - "error_materializer": "log_error_materializer", + "materializer": "async_list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "custom_types_example", @@ -67,8 +67,8 @@ async def async_list(async_iterator) -> list: "fn": "process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["records"], "each_mode_deps": [], "pipeline": "custom_types_example", diff --git a/tests/execution/async_engine/corpus/deep_sub_pipelines.py b/tests/execution/async_engine/corpus/deep_sub_pipelines.py index 3d8a7d3..972a420 100644 --- a/tests/execution/async_engine/corpus/deep_sub_pipelines.py +++ b/tests/execution/async_engine/corpus/deep_sub_pipelines.py @@ -85,8 +85,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l2_each", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["values"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", @@ -95,13 +95,12 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: }, "l2_each__l3_res__adapter": { "deps": {"l2_each__adapter": "Level2Params"}, - "output": "ListType()", + "output": "ListType()", "fn": "prep_l3", "on_error": "stop", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each__adapter"], "each_mode_deps": ["l2_each__adapter"], "pipeline": "Level2", @@ -114,8 +113,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l3_process", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each__l3_res__adapter"], "each_mode_deps": ["l2_each__l3_res__adapter"], "pipeline": "Level2", @@ -128,8 +127,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l2_process", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["l2_each__l3_res"], "pipeline": "Level2", @@ -142,8 +141,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l2_single", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["values"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", @@ -156,8 +155,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l3", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_single__adapter"], "each_mode_deps": [], "pipeline": "Level2", @@ -170,8 +169,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l3_process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_single__l3_res__adapter"], "each_mode_deps": [], "pipeline": "Level2", @@ -184,8 +183,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l2_process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "Level2", @@ -198,8 +197,8 @@ async def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "consolidate", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", diff --git a/tests/execution/async_engine/corpus/diamond.py b/tests/execution/async_engine/corpus/diamond.py index c824f9f..ec1be87 100644 --- a/tests/execution/async_engine/corpus/diamond.py +++ b/tests/execution/async_engine/corpus/diamond.py @@ -47,8 +47,8 @@ async def merge(branch_a: int, branch_b: int) -> int: "fn": "start", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -61,8 +61,8 @@ async def merge(branch_a: int, branch_b: int) -> int: "fn": "branch_a", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -75,8 +75,8 @@ async def merge(branch_a: int, branch_b: int) -> int: "fn": "branch_b", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -89,8 +89,8 @@ async def merge(branch_a: int, branch_b: int) -> int: "fn": "merge", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", diff --git a/tests/execution/async_engine/corpus/error_handling.py b/tests/execution/async_engine/corpus/error_handling.py index 6accdb3..6baebd4 100644 --- a/tests/execution/async_engine/corpus/error_handling.py +++ b/tests/execution/async_engine/corpus/error_handling.py @@ -51,8 +51,8 @@ async def consumer(gen: AsyncIterator[int]) -> None: "fn": "gen", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "custom_err_mat", + "materializer": "list", + "error_materializer": "custom_error_handler", "materialized_deps": [], "each_mode_deps": [], "pipeline": "error_handling_example", @@ -65,8 +65,8 @@ async def consumer(gen: AsyncIterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "custom_err_mat", + "materializer": None, + "error_materializer": "custom_error_handler", "materialized_deps": [], "each_mode_deps": [], "pipeline": "error_handling_example", diff --git a/tests/execution/async_engine/corpus/explicit_modes.py b/tests/execution/async_engine/corpus/explicit_modes.py index 07d860e..eb68d3e 100644 --- a/tests/execution/async_engine/corpus/explicit_modes.py +++ b/tests/execution/async_engine/corpus/explicit_modes.py @@ -43,8 +43,8 @@ async def summarize(double: list[int]) -> int: "fn": "emit", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["items"], "each_mode_deps": [], "pipeline": "explicit_modes", @@ -57,8 +57,8 @@ async def summarize(double: list[int]) -> int: "fn": "double", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["emit"], "pipeline": "explicit_modes", @@ -71,8 +71,8 @@ async def summarize(double: list[int]) -> int: "fn": "summarize", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["double"], "each_mode_deps": [], "pipeline": "explicit_modes", diff --git a/tests/execution/async_engine/corpus/fibonacci.py b/tests/execution/async_engine/corpus/fibonacci.py index 438e256..bcc825a 100644 --- a/tests/execution/async_engine/corpus/fibonacci.py +++ b/tests/execution/async_engine/corpus/fibonacci.py @@ -49,8 +49,8 @@ async def consumer(square_numbers: AsyncIterator[int]) -> None: "fn": "fibonacci_generator", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", @@ -63,8 +63,8 @@ async def consumer(square_numbers: AsyncIterator[int]) -> None: "fn": "square_numbers", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", @@ -77,8 +77,8 @@ async def consumer(square_numbers: AsyncIterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", diff --git a/tests/execution/async_engine/corpus/linear.py b/tests/execution/async_engine/corpus/linear.py index 5a67a73..0ecb945 100644 --- a/tests/execution/async_engine/corpus/linear.py +++ b/tests/execution/async_engine/corpus/linear.py @@ -45,8 +45,8 @@ async def consumer(transformer: AsyncIterator[int]) -> None: "fn": "numbers", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "linear_example", @@ -59,14 +59,14 @@ async def consumer(transformer: AsyncIterator[int]) -> None: "fn": "transformer", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["numbers"], - "dataset_param_names": {"numbers": "number"}, "pipeline": "linear_example", "parent_pipeline": None, "max_in_flight": 1, + "dataset_param_names": {"numbers": "number"}, }, "consumer": { "deps": {"transformer": "Stream[int]"}, @@ -74,8 +74,8 @@ async def consumer(transformer: AsyncIterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "linear_example", diff --git a/tests/execution/async_engine/corpus/mixed_fanout.py b/tests/execution/async_engine/corpus/mixed_fanout.py index 625a1ba..ff807fd 100644 --- a/tests/execution/async_engine/corpus/mixed_fanout.py +++ b/tests/execution/async_engine/corpus/mixed_fanout.py @@ -46,8 +46,8 @@ async def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "gen", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "mixed_fanout", @@ -60,8 +60,8 @@ async def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "lazy", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "mixed_fanout", @@ -74,8 +74,8 @@ async def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "eager", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["gen"], "each_mode_deps": [], "pipeline": "mixed_fanout", diff --git a/tests/execution/async_engine/corpus/sub_pipelines.py b/tests/execution/async_engine/corpus/sub_pipelines.py index 3c7dc58..37d716f 100644 --- a/tests/execution/async_engine/corpus/sub_pipelines.py +++ b/tests/execution/async_engine/corpus/sub_pipelines.py @@ -58,8 +58,8 @@ async def consolidate(my_text_processor: list[int]) -> int: "fn": "prepare_b_each", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["raw_texts"], "each_mode_deps": [], "pipeline": "MainPipeline", @@ -72,8 +72,8 @@ async def consolidate(my_text_processor: list[int]) -> int: "fn": "func_b1", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["my_text_processor__adapter"], "each_mode_deps": ["my_text_processor__adapter"], "pipeline": "TextProcessor", @@ -86,8 +86,8 @@ async def consolidate(my_text_processor: list[int]) -> int: "fn": "func_b2", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["my_text_processor__func_b1"], "pipeline": "TextProcessor", @@ -100,8 +100,8 @@ async def consolidate(my_text_processor: list[int]) -> int: "fn": "consolidate", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["my_text_processor"], "each_mode_deps": [], "pipeline": "MainPipeline", diff --git a/tests/execution/async_engine/test_async_runner_errorhandling.py b/tests/execution/async_engine/test_async_runner_errorhandling.py index 9a91954..d0cf062 100644 --- a/tests/execution/async_engine/test_async_runner_errorhandling.py +++ b/tests/execution/async_engine/test_async_runner_errorhandling.py @@ -264,3 +264,29 @@ async def source(): sink.assert_not_called() assert handled == [("source", "ValueError")] + + +async def test_given_non_callable_error_materializer_when_step_fails_then_raises_type_error(): + async def producer() -> list[int]: + raise ValueError("Oops") + + class P(NamedTuple): + pass + + my_pipeline = pipeline( + name="test", + params=P, + steps=[ + step( + "producer", + fn=producer, + on_error=OnError.CONTINUE, + error_materializer="not a callable string", + ) + ], + ) + + with pytest.raises( + TypeError, match="Error materializer for step 'producer' is not callable" + ): + await async_run(my_pipeline, params=P()) diff --git a/tests/execution/async_engine/test_async_runner_materialization.py b/tests/execution/async_engine/test_async_runner_materialization.py index b2c06a9..8d12f16 100644 --- a/tests/execution/async_engine/test_async_runner_materialization.py +++ b/tests/execution/async_engine/test_async_runner_materialization.py @@ -3,7 +3,7 @@ from unittest.mock import AsyncMock as MagicMock -from synaflow import async_run, pipeline, step, to_materializer +from synaflow import async_run, pipeline, step from synaflow.core.types import OnError @@ -24,48 +24,6 @@ def mock_step(**params: type) -> MagicMock: return mock -async def test_given_generator_output_and_two_each_consumers_when_run_then_materialized_once(): - class P(NamedTuple): - count: int = 3 - - async def gen(count: int) -> AsyncGenerator[int, None]: - for _i in range(count): - yield _i - - call_order = [] - - async def a(items: int): - call_order.append(("a", items)) - - async def b(items: int): - call_order.append(("b", items)) - - materialized = [] - - def spy_materialize(ctx): - async def concrete(g): - materialized.append("called") - return [x async for x in g] - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - await async_run(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - async def test_given_generator_and_scalar_and_iterator_consumers_when_run_then_no_materialization(): class P(NamedTuple): count: int = 3 @@ -109,135 +67,6 @@ async def concrete(g): assert [val for key, val in call_order if key == "b"] == [0, 1, 2] -async def test_given_generator_and_two_iterator_consumers_when_run_then_no_materialization(): - class P(NamedTuple): - count: int = 3 - - async def gen(count: int) -> AsyncGenerator[int, None]: - for _i in range(count): - yield _i - - call_order = [] - - async def a(items: AsyncIterator[int]): - async for x in items: - call_order.append(("a", x)) - - async def b(items: AsyncIterator[int]): - async for x in items: - call_order.append(("b", x)) - - materialized = [] - - def spy_materialize(ctx): - async def concrete(g): - materialized.append("called") - return [x async for x in g] - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - await async_run(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - -async def test_given_generator_and_union_scalar_and_union_iterator_consumers_when_run_then_no_materialization(): - class P(NamedTuple): - count: int = 3 - - async def gen(count: int) -> AsyncGenerator[int, None]: - for _i in range(count): - yield _i - - call_order = [] - - async def a(items: int | str): - call_order.append(("a", items)) - - async def b(items: AsyncIterator[int | str]): - async for x in items: - call_order.append(("b", x)) - - materialized = [] - - def spy_materialize(ctx): - async def concrete(g): - materialized.append("called") - return [x async for x in g] - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - await async_run(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - -async def test_given_generator_of_union_and_union_scalar_consumers_when_run_then_no_materialization(): - class P(NamedTuple): - count: int = 3 - - async def gen(count: int) -> AsyncGenerator[int | str, None]: - for _i in range(count): - yield _i - - call_order = [] - - async def a(items: int | str | None): - call_order.append(("a", items)) - - async def b(items: int | str | bool): - call_order.append(("b", items)) - - materialized = [] - - def spy_materialize(ctx): - async def concrete(g): - materialized.append("called") - return [x async for x in g] - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - await async_run(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - async def test_given_generator_and_list_consumer_when_run_then_materialized_once(): class P(NamedTuple): count: int = 3 @@ -773,42 +602,7 @@ async def consume(produce: int): "produce", fn=produce, on_error=OnError.STOP, - materializer=to_materializer(scalar_materializer), - ), - step("consume", fn=consume), - ], - ) - - await async_run(my_pipeline, params=P()) - - assert materialized == [6] - - -async def test_given_scalar_output_with_force_materialize_when_run_then_scalar_materializer_is_invoked(): - class P(NamedTuple): - x: int = 3 - - materialized = [] - - async def scalar_materializer(value): - materialized.append(value) - return value - - async def produce(x: int) -> int: - return x * 2 - - async def consume(produce: int): - pass - - my_pipeline = pipeline( - name="test_scalar_force", - params=P, - steps=[ - step( - "produce", - fn=produce, - materializer=to_materializer(scalar_materializer), - force_materialize=True, + materializer=scalar_materializer, ), step("consume", fn=consume), ], diff --git a/tests/execution/sync_engine/corpus/complex_parallel.py b/tests/execution/sync_engine/corpus/complex_parallel.py index 3e76663..8710ded 100644 --- a/tests/execution/sync_engine/corpus/complex_parallel.py +++ b/tests/execution/sync_engine/corpus/complex_parallel.py @@ -60,8 +60,8 @@ def step5(step3: Iterator[int], step4: Iterator[int]) -> None: "fn": "step1", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -74,8 +74,8 @@ def step5(step3: Iterator[int], step4: Iterator[int]) -> None: "fn": "step2", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -88,8 +88,8 @@ def step5(step3: Iterator[int], step4: Iterator[int]) -> None: "fn": "step3", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -102,8 +102,8 @@ def step5(step3: Iterator[int], step4: Iterator[int]) -> None: "fn": "step4", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", @@ -116,8 +116,8 @@ def step5(step3: Iterator[int], step4: Iterator[int]) -> None: "fn": "step5", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel", diff --git a/tests/execution/sync_engine/corpus/complex_parallel_mixed.py b/tests/execution/sync_engine/corpus/complex_parallel_mixed.py index 733b0bd..ee2aeef 100644 --- a/tests/execution/sync_engine/corpus/complex_parallel_mixed.py +++ b/tests/execution/sync_engine/corpus/complex_parallel_mixed.py @@ -61,8 +61,8 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None: "fn": "step1", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -75,8 +75,8 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None: "fn": "step2", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -89,8 +89,8 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None: "fn": "step3", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -103,8 +103,8 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None: "fn": "step4", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", @@ -117,8 +117,8 @@ def step5(step2: Iterator[int], step4: Iterator[int]) -> None: "fn": "step5", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "complex_parallel_mixed", diff --git a/tests/execution/sync_engine/corpus/custom_types.py b/tests/execution/sync_engine/corpus/custom_types.py index 5dcb706..c189945 100644 --- a/tests/execution/sync_engine/corpus/custom_types.py +++ b/tests/execution/sync_engine/corpus/custom_types.py @@ -2,7 +2,7 @@ from typing import NamedTuple from dataclasses import dataclass -from synaflow import pipeline, step, to_materializer +from synaflow import pipeline, step @dataclass @@ -30,7 +30,7 @@ def process(records: list[CustomRecord]) -> int: name="custom_types_example", params=CustomTypesParams, steps=[ - step("records", fn=records, materializer=to_materializer(list)), + step("records", fn=records, materializer=list), step("process", fn=process), ], ) @@ -46,8 +46,8 @@ def process(records: list[CustomRecord]) -> int: "fn": "records", "on_error": "continue", "mode": "all", - "materializer": "factory", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "custom_types_example", @@ -60,8 +60,8 @@ def process(records: list[CustomRecord]) -> int: "fn": "process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["records"], "each_mode_deps": [], "pipeline": "custom_types_example", diff --git a/tests/execution/sync_engine/corpus/deep_sub_pipelines.py b/tests/execution/sync_engine/corpus/deep_sub_pipelines.py index fa2ec5d..7885b67 100644 --- a/tests/execution/sync_engine/corpus/deep_sub_pipelines.py +++ b/tests/execution/sync_engine/corpus/deep_sub_pipelines.py @@ -85,8 +85,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l2_each", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["values"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", @@ -95,13 +95,12 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: }, "l2_each__l3_res__adapter": { "deps": {"l2_each__adapter": "Level2Params"}, - "output": "ListType()", + "output": "ListType()", "fn": "prep_l3", "on_error": "stop", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each__adapter"], "each_mode_deps": ["l2_each__adapter"], "pipeline": "Level2", @@ -114,8 +113,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l3_process", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each__l3_res__adapter"], "each_mode_deps": ["l2_each__l3_res__adapter"], "pipeline": "Level2", @@ -128,8 +127,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l2_process", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["l2_each__l3_res"], "pipeline": "Level2", @@ -142,8 +141,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l2_single", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["values"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", @@ -156,8 +155,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "prep_l3", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_single__adapter"], "each_mode_deps": [], "pipeline": "Level2", @@ -170,8 +169,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l3_process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_single__l3_res__adapter"], "each_mode_deps": [], "pipeline": "Level2", @@ -184,8 +183,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "l2_process", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "Level2", @@ -198,8 +197,8 @@ def consolidate(l2_each: list[int], l2_single: int) -> dict: "fn": "consolidate", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["l2_each"], "each_mode_deps": [], "pipeline": "DeepSubPipelines", diff --git a/tests/execution/sync_engine/corpus/diamond.py b/tests/execution/sync_engine/corpus/diamond.py index 0a112ee..6329a49 100644 --- a/tests/execution/sync_engine/corpus/diamond.py +++ b/tests/execution/sync_engine/corpus/diamond.py @@ -47,8 +47,8 @@ def merge(branch_a: int, branch_b: int) -> int: "fn": "start", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -61,8 +61,8 @@ def merge(branch_a: int, branch_b: int) -> int: "fn": "branch_a", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -75,8 +75,8 @@ def merge(branch_a: int, branch_b: int) -> int: "fn": "branch_b", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", @@ -89,8 +89,8 @@ def merge(branch_a: int, branch_b: int) -> int: "fn": "merge", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "diamond_example", diff --git a/tests/execution/sync_engine/corpus/error_handling.py b/tests/execution/sync_engine/corpus/error_handling.py index 1c64533..d10c033 100644 --- a/tests/execution/sync_engine/corpus/error_handling.py +++ b/tests/execution/sync_engine/corpus/error_handling.py @@ -51,8 +51,8 @@ def consumer(gen: Iterator[int]) -> None: "fn": "gen", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "custom_err_mat", + "materializer": "list", + "error_materializer": "custom_error_handler", "materialized_deps": [], "each_mode_deps": [], "pipeline": "error_handling_example", @@ -65,8 +65,8 @@ def consumer(gen: Iterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "custom_err_mat", + "materializer": None, + "error_materializer": "custom_error_handler", "materialized_deps": [], "each_mode_deps": [], "pipeline": "error_handling_example", diff --git a/tests/execution/sync_engine/corpus/explicit_modes.py b/tests/execution/sync_engine/corpus/explicit_modes.py index be494e7..75331ad 100644 --- a/tests/execution/sync_engine/corpus/explicit_modes.py +++ b/tests/execution/sync_engine/corpus/explicit_modes.py @@ -42,8 +42,8 @@ def summarize(double: list[int]) -> int: "fn": "emit", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["items"], "each_mode_deps": [], "pipeline": "explicit_modes", @@ -56,8 +56,8 @@ def summarize(double: list[int]) -> int: "fn": "double", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["emit"], "pipeline": "explicit_modes", @@ -70,8 +70,8 @@ def summarize(double: list[int]) -> int: "fn": "summarize", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["double"], "each_mode_deps": [], "pipeline": "explicit_modes", diff --git a/tests/execution/sync_engine/corpus/fibonacci.py b/tests/execution/sync_engine/corpus/fibonacci.py index f37ce88..6f40a1e 100644 --- a/tests/execution/sync_engine/corpus/fibonacci.py +++ b/tests/execution/sync_engine/corpus/fibonacci.py @@ -47,8 +47,8 @@ def consumer(square_numbers: Iterator[int]) -> None: "fn": "fibonacci_generator", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", @@ -61,8 +61,8 @@ def consumer(square_numbers: Iterator[int]) -> None: "fn": "square_numbers", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", @@ -75,8 +75,8 @@ def consumer(square_numbers: Iterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "fibonacci", diff --git a/tests/execution/sync_engine/corpus/linear.py b/tests/execution/sync_engine/corpus/linear.py index 4b96943..564de21 100644 --- a/tests/execution/sync_engine/corpus/linear.py +++ b/tests/execution/sync_engine/corpus/linear.py @@ -48,8 +48,8 @@ def consumer(transformer: Iterator[int]) -> None: "fn": "numbers", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "linear_example", @@ -62,15 +62,15 @@ def consumer(transformer: Iterator[int]) -> None: "fn": "transformer", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["numbers"], - "dataset_param_names": {"numbers": "number"}, "pipeline": "linear_example", "parent_pipeline": None, "max_in_flight": 1, "observers": [{"handler_name": "", "source": "step"}], + "dataset_param_names": {"numbers": "number"}, }, "consumer": { "deps": {"transformer": "Stream[int]"}, @@ -78,8 +78,8 @@ def consumer(transformer: Iterator[int]) -> None: "fn": "consumer", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "linear_example", diff --git a/tests/execution/sync_engine/corpus/max_in_flight_threadpool.py b/tests/execution/sync_engine/corpus/max_in_flight_threadpool.py index 2f7cbc7..67a10e5 100644 --- a/tests/execution/sync_engine/corpus/max_in_flight_threadpool.py +++ b/tests/execution/sync_engine/corpus/max_in_flight_threadpool.py @@ -55,8 +55,8 @@ def await_result(start: Iterator[int]) -> list[int]: "fn": "numbers", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "max_in_flight_threadpool", @@ -69,8 +69,8 @@ def await_result(start: Iterator[int]) -> list[int]: "fn": "start", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["numbers"], "pipeline": "max_in_flight_threadpool", @@ -83,8 +83,8 @@ def await_result(start: Iterator[int]) -> list[int]: "fn": "await_result", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "max_in_flight_threadpool", diff --git a/tests/execution/sync_engine/corpus/mixed_fanout.py b/tests/execution/sync_engine/corpus/mixed_fanout.py index 90d3789..9aee059 100644 --- a/tests/execution/sync_engine/corpus/mixed_fanout.py +++ b/tests/execution/sync_engine/corpus/mixed_fanout.py @@ -42,8 +42,8 @@ def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "gen", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "list", + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "mixed_fanout", @@ -56,8 +56,8 @@ def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "lazy", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": [], "pipeline": "mixed_fanout", @@ -70,8 +70,8 @@ def eager(gen: list[int]) -> tuple[bool, list[int]]: "fn": "eager", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["gen"], "each_mode_deps": [], "pipeline": "mixed_fanout", diff --git a/tests/execution/sync_engine/corpus/sub_pipelines.py b/tests/execution/sync_engine/corpus/sub_pipelines.py index f8d289a..4f0e2bd 100644 --- a/tests/execution/sync_engine/corpus/sub_pipelines.py +++ b/tests/execution/sync_engine/corpus/sub_pipelines.py @@ -58,8 +58,8 @@ def consolidate(my_text_processor: list[int]) -> int: "fn": "prepare_b_each", "on_error": "stop", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": "_identity", + "error_materializer": "log_error", "materialized_deps": ["raw_texts"], "each_mode_deps": [], "pipeline": "MainPipeline", @@ -72,8 +72,8 @@ def consolidate(my_text_processor: list[int]) -> int: "fn": "func_b1", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["my_text_processor__adapter"], "each_mode_deps": ["my_text_processor__adapter"], "pipeline": "TextProcessor", @@ -86,8 +86,8 @@ def consolidate(my_text_processor: list[int]) -> int: "fn": "func_b2", "on_error": "continue", "mode": "each", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": [], "each_mode_deps": ["my_text_processor__func_b1"], "pipeline": "TextProcessor", @@ -100,8 +100,8 @@ def consolidate(my_text_processor: list[int]) -> int: "fn": "consolidate", "on_error": "continue", "mode": "all", - "materializer": "memory_materializer", - "error_materializer": "log_error_materializer", + "materializer": None, + "error_materializer": "log_error", "materialized_deps": ["my_text_processor"], "each_mode_deps": [], "pipeline": "MainPipeline", diff --git a/tests/execution/sync_engine/test_runner_errorhandling.py b/tests/execution/sync_engine/test_runner_errorhandling.py index ff01106..ac15a9f 100644 --- a/tests/execution/sync_engine/test_runner_errorhandling.py +++ b/tests/execution/sync_engine/test_runner_errorhandling.py @@ -270,3 +270,31 @@ def source(): sink.assert_not_called() assert handled == [("source", "ValueError")] + + +def test_given_non_callable_error_materializer_when_step_fails_then_raises_type_error( + run_pipeline, +): + def producer() -> list[int]: + raise ValueError("Oops") + + class P(NamedTuple): + pass + + my_pipeline = pipeline( + name="test", + params=P, + steps=[ + step( + "producer", + fn=producer, + on_error=OnError.CONTINUE, + error_materializer="not a callable string", + ) + ], + ) + + with pytest.raises( + TypeError, match="Error materializer for step 'producer' is not callable" + ): + run_pipeline(my_pipeline, params=P()) diff --git a/tests/execution/sync_engine/test_runner_materialization.py b/tests/execution/sync_engine/test_runner_materialization.py index bd45d70..29d31ac 100644 --- a/tests/execution/sync_engine/test_runner_materialization.py +++ b/tests/execution/sync_engine/test_runner_materialization.py @@ -3,7 +3,7 @@ from unittest.mock import MagicMock -from synaflow import pipeline, step, to_materializer +from synaflow import pipeline, step from synaflow.core.types import OnError @@ -24,49 +24,6 @@ def mock_step(**params: type) -> MagicMock: return mock -def test_given_generator_output_and_two_each_consumers_when_run_then_materialized_once( - run_pipeline, -): - class P(NamedTuple): - count: int = 3 - - def gen(count: int) -> Generator[int, None, None]: - yield from range(count) - - call_order = [] - - def a(items: int): - call_order.append(("a", items)) - - def b(items: int): - call_order.append(("b", items)) - - materialized = [] - - def spy_materialize(ctx): - def concrete(g): - materialized.append("called") - return list(g) - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - run_pipeline(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - def test_given_generator_and_scalar_and_iterator_consumers_when_run_then_no_materialization( run_pipeline, ): @@ -111,138 +68,6 @@ def concrete(g): assert [val for key, val in call_order if key == "b"] == [0, 1, 2] -def test_given_generator_and_two_iterator_consumers_when_run_then_no_materialization( - run_pipeline, -): - class P(NamedTuple): - count: int = 3 - - def gen(count: int) -> Generator[int, None, None]: - yield from range(count) - - call_order = [] - - def a(items: Iterator[int]): - for x in items: - call_order.append(("a", x)) - - def b(items: Iterator[int]): - for x in items: - call_order.append(("b", x)) - - materialized = [] - - def spy_materialize(ctx): - def concrete(g): - materialized.append("called") - return list(g) - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - run_pipeline(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - -def test_given_generator_and_union_scalar_and_union_iterator_consumers_when_run_then_no_materialization( - run_pipeline, -): - class P(NamedTuple): - count: int = 3 - - def gen(count: int) -> Generator[int, None, None]: - yield from range(count) - - call_order = [] - - def a(items: int | str): - call_order.append(("a", items)) - - def b(items: Iterator[int | str]): - for x in items: - call_order.append(("b", x)) - - materialized = [] - - def spy_materialize(ctx): - def concrete(g): - materialized.append("called") - return list(g) - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - run_pipeline(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - -def test_given_generator_of_union_and_union_scalar_consumers_when_run_then_no_materialization( - run_pipeline, -): - class P(NamedTuple): - count: int = 3 - - def gen(count: int) -> Generator[int | str, None, None]: - yield from range(count) - - call_order = [] - - def a(items: int | str | None): - call_order.append(("a", items)) - - def b(items: int | str | bool): - call_order.append(("b", items)) - - materialized = [] - - def spy_materialize(ctx): - def concrete(g): - materialized.append("called") - return list(g) - - return concrete - - my_pipeline = pipeline( - name="test", - params=P, - materializer=spy_materialize, - steps=[ - step("items", fn=gen), - step("a", fn=a), - step("b", fn=b), - ], - ) - - run_pipeline(my_pipeline, params=P()) - assert len(materialized) == 0 - assert [val for key, val in call_order if key == "a"] == [0, 1, 2] - assert [val for key, val in call_order if key == "b"] == [0, 1, 2] - - def test_given_generator_and_list_consumer_when_run_then_materialized_once( run_pipeline, ): @@ -623,44 +448,7 @@ def consume(produce: int): "produce", fn=produce, on_error=OnError.STOP, - materializer=to_materializer(scalar_materializer), - ), - step("consume", fn=consume), - ], - ) - - run_pipeline(my_pipeline, params=P()) - - assert materialized == [6] - - -def test_given_scalar_output_with_force_materialize_when_run_then_scalar_materializer_is_invoked( - run_pipeline, -): - class P(NamedTuple): - x: int = 3 - - materialized = [] - - def scalar_materializer(value): - materialized.append(value) - return value - - def produce(x: int) -> int: - return x * 2 - - def consume(produce: int): - pass - - my_pipeline = pipeline( - name="test_scalar_force", - params=P, - steps=[ - step( - "produce", - fn=produce, - materializer=to_materializer(scalar_materializer), - force_materialize=True, + materializer=scalar_materializer, ), step("consume", fn=consume), ], diff --git a/tests/execution/test_materializers_ergonomics.py b/tests/execution/test_materializers_ergonomics.py index d7f21bb..c1bf499 100644 --- a/tests/execution/test_materializers_ergonomics.py +++ b/tests/execution/test_materializers_ergonomics.py @@ -9,16 +9,22 @@ run, async_run, OnError, - disk_materializer, - disk_error_materializer, - composite_materializer, +) +from synaflow import pipeline, step +from synaflow.materializers.composite import ( composite_error_materializer, + composite_materializer, +) +from synaflow.materializers.disk import disk_materializer +from synaflow.materializers.errors import ( + disk_error_materializer, +) +from synaflow.serializers import ( json_serializer, jsonl_serializer, csv_serializer, text_serializer, pickle_serializer, - to_error_materializer, ) from synaflow.core.types import ErrorMaterializeContext @@ -43,7 +49,7 @@ def dummy_error_mat(ctx): params=P, steps=[step("s", fn=dummy, error_materializer=dummy_error_mat)], ) - assert my_pipeline.dag["s"].error_materializer is dummy_error_mat + assert my_pipeline.dag["s"].error_materializer.__name__ == "" def test_given_pipeline_level_materializer_when_dag_built_then_resolves(): @@ -66,8 +72,8 @@ def custom_err_mat(ctx): error_materializer=custom_err_mat, steps=[step("s", fn=dummy)], ) - assert my_pipeline.dag["s"].materializer is custom_mat - assert my_pipeline.dag["s"].error_materializer is custom_err_mat + assert my_pipeline.dag["s"].materializer.__name__ == "" + assert my_pipeline.dag["s"].error_materializer.__name__ == "" def test_given_step_level_materializer_when_dag_built_then_overrides_pipeline_level(): @@ -96,40 +102,8 @@ def s_err(ctx): error_materializer=p_err, steps=[step("s", fn=dummy, materializer=s_mat, error_materializer=s_err)], ) - assert my_pipeline.dag["s"].materializer is s_mat - assert my_pipeline.dag["s"].error_materializer is s_err - - -def test_given_direct_callable_types_when_dag_built_then_raises_validation_error(): - class P(NamedTuple): - pass - - def dummy(): - pass - - # list directly - with pytest.raises(ValueError, match="cannot be a direct type/callable 'list'"): - pipeline(name="err1", params=P, steps=[step("s", fn=dummy, materializer=list)]) - - # 0 parameter callable - def bad_mat(): - return lambda x: x - - with pytest.raises(ValueError, match="factory must accept at least one argument"): - pipeline( - name="err2", params=P, steps=[step("s", fn=dummy, materializer=bad_mat)] - ) - - # builtin without signature support / 0 parameters - with pytest.raises(ValueError, match="factory must accept at least one argument"): - pipeline( - name="err3", params=P, steps=[step("s", fn=dummy, materializer=object)] - ) - - -# --------------------------------------------------------------------------- -# 2. Phase 1 - Runtime Sync/Async tests (Resolutions & Fallbacks) -# --------------------------------------------------------------------------- + assert my_pipeline.dag["s"].materializer is set + assert my_pipeline.dag["s"].error_materializer.__name__ == "" def test_given_wrapped_callable_error_materializer_when_step_fails_then_runs_on_failure(): @@ -152,7 +126,7 @@ def failing_step(): step( "fail", fn=failing_step, - error_materializer=to_error_materializer(my_handler), + error_materializer=my_handler, on_error=OnError.CONTINUE, ) ], @@ -213,7 +187,7 @@ def fail_on_2(items: int): step( "s1", fn=fail_on_2, - error_materializer=to_error_materializer(my_handler), + error_materializer=my_handler, on_error=OnError.CONTINUE, ) ], @@ -245,7 +219,7 @@ def consumer_step(generator_step: list): step( "generator_step", fn=generator_step, - error_materializer=to_error_materializer(my_handler), + error_materializer=my_handler, on_error=OnError.CONTINUE, ), step("consumer_step", fn=consumer_step), @@ -276,7 +250,7 @@ async def failing_step(): step( "fail", fn=failing_step, - error_materializer=to_error_materializer(async_handler), + error_materializer=async_handler, on_error=OnError.CONTINUE, ) ], @@ -479,8 +453,8 @@ def handler2(exc): calls.append("two") comp = composite_error_materializer( - to_error_materializer(handler1), - to_error_materializer(handler2), + handler1, + handler2, ) def step_fn(): @@ -619,8 +593,8 @@ def handler2(exc): calls.append("two") comp = composite_error_materializer( - to_error_materializer(handler1), - to_error_materializer(handler2), + handler1, + handler2, ) async def step_fn(): @@ -672,7 +646,7 @@ def consumer(sub_pipe: list[int]) -> int: ], ) - assert root_pipe.dag.steps["sub_pipe"].materializer is my_pipeline_mat + assert root_pipe.dag.steps["sub_pipe"].materializer is list def test_given_include_with_step_materializer_overriding_pipeline_materializer_then_step_wins(): @@ -713,7 +687,7 @@ def consumer(sub_pipe: list[int]) -> int: ], ) - assert root_pipe.dag.steps["sub_pipe"].materializer is my_step_mat + assert root_pipe.dag.steps["sub_pipe"].materializer is set def test_given_include_with_explicit_pipeline_error_materializer_then_propagates_to_sub_steps(): @@ -747,7 +721,7 @@ def adapter() -> P: ], ) - assert root_pipe.dag.steps["sub_pipe"].error_materializer is my_pipeline_err + assert root_pipe.dag.steps["sub_pipe"].error_materializer.__name__ == "" def test_given_include_with_step_error_materializer_overriding_pipeline_error_materializer_then_step_wins(): @@ -791,7 +765,7 @@ def adapter() -> P: ], ) - assert root_pipe.dag.steps["sub_pipe"].error_materializer is my_step_err + assert root_pipe.dag.steps["sub_pipe"].error_materializer.__name__ == "" @pytest.mark.asyncio @@ -810,8 +784,8 @@ async def async_handler2(exc): calls.append("two") comp = composite_error_materializer( - to_error_materializer(async_handler1), - to_error_materializer(async_handler2), + async_handler1, + async_handler2, ) async def step_fn(): diff --git a/uv.lock b/uv.lock index 6c6ad66..cf77560 100644 --- a/uv.lock +++ b/uv.lock @@ -798,7 +798,7 @@ wheels = [ [[package]] name = "synaflow" -version = "0.16.0" +version = "0.17.3" source = { editable = "." } dependencies = [ { name = "inflect" },