From 51716c0e225e878a6ec882348cb8a0b07ecbd14e Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Sat, 20 Jun 2026 01:46:47 +0100 Subject: [PATCH 1/5] add observer execution overrides --- README.md | 7 +- docs/user_docs/core-concepts/build-vs-run.md | 5 +- synaflow/__init__.py | 2 + synaflow/core/constants.py | 1 + synaflow/execution/__init__.py | 8 +- synaflow/execution/async_engine/executor.py | 23 ++- synaflow/execution/overrides.py | 79 +++++++++- synaflow/execution/sync_engine/executor.py | 23 ++- tests/execution/test_execution_overrides.py | 157 ++++++++++++++++++- 9 files changed, 288 insertions(+), 17 deletions(-) create mode 100644 synaflow/core/constants.py diff --git a/README.md b/README.md index 1b3723d..488bf3d 100644 --- a/README.md +++ b/README.md @@ -77,9 +77,10 @@ compiles, it's valid. No runtime surprises. ### Runtime overrides on top of the compiled contract When you need test-time swaps without patching module globals, pass -`ExecutionOverrides` to `run()` or `async_run()` and replace only the compiled -materializers you care about. The DAG shape and semantics stay fixed; only the -runtime callable changes. +`ExecutionOverrides` to `run()` or `async_run()` and replace only compiled +runtime dependencies such as materializers or observers. Use +`PIPELINE_SCOPE` for pipeline-level observers. The DAG shape and semantics stay +fixed; only the runtime callable changes. ### Build your own runner diff --git a/docs/user_docs/core-concepts/build-vs-run.md b/docs/user_docs/core-concepts/build-vs-run.md index bbfa494..0870883 100644 --- a/docs/user_docs/core-concepts/build-vs-run.md +++ b/docs/user_docs/core-concepts/build-vs-run.md @@ -68,8 +68,9 @@ resolved at build time and frozen in the JSON or `Dag`. Runners don't re-infer semantics; they execute the contract. `ExecutionOverrides` fits inside that boundary: it can swap the concrete -runtime callable for a compiled key such as a materializer, but it does not -change graph structure, dependency resolution, or eager-vs-lazy planning. +runtime callable for a compiled key such as a materializer or observer scope, +but it does not change graph structure, dependency resolution, or eager-vs-lazy +planning. ### 2. Write your own runner diff --git a/synaflow/__init__.py b/synaflow/__init__.py index 685852b..eb31fad 100644 --- a/synaflow/__init__.py +++ b/synaflow/__init__.py @@ -1,4 +1,5 @@ from .core.definition import include, pipeline, step +from .core.constants import PIPELINE_SCOPE from .core.observers import ( MaterializationEvent, Observer, @@ -21,6 +22,7 @@ "pipeline", "step", "include", + "PIPELINE_SCOPE", "run", "async_run", "ExecutionOverrides", diff --git a/synaflow/core/constants.py b/synaflow/core/constants.py new file mode 100644 index 0000000..825620a --- /dev/null +++ b/synaflow/core/constants.py @@ -0,0 +1 @@ +PIPELINE_SCOPE = "__pipeline__" diff --git a/synaflow/execution/__init__.py b/synaflow/execution/__init__.py index 826ce65..0d70d2c 100644 --- a/synaflow/execution/__init__.py +++ b/synaflow/execution/__init__.py @@ -1,7 +1,13 @@ -from .overrides import ExecutionOverrides, MaterializerRegistry, PipelineRegistry +from .overrides import ( + ExecutionOverrides, + MaterializerRegistry, + ObserverRegistry, + PipelineRegistry, +) __all__ = [ "ExecutionOverrides", "MaterializerRegistry", + "ObserverRegistry", "PipelineRegistry", ] diff --git a/synaflow/execution/async_engine/executor.py b/synaflow/execution/async_engine/executor.py index e85bbc1..a872af8 100644 --- a/synaflow/execution/async_engine/executor.py +++ b/synaflow/execution/async_engine/executor.py @@ -3,6 +3,7 @@ from collections.abc import AsyncGenerator, AsyncIterator, Generator, Iterator from typing import Any +from synaflow.core.constants import PIPELINE_SCOPE from synaflow.core.dag import Dag from synaflow.core.definition import PipelineDef from synaflow.core.exceptions import PipelineStopException, StepExecutionError @@ -227,13 +228,29 @@ def _resolve_materializer(self, step_name: str, node: Any) -> Any: return node.materializer return self._overrides.materializers.resolve(step_name, node.materializer) + def _resolve_pipeline_observers(self) -> list: + if self._overrides is None: + return self.dag.pipeline_observers + return self._overrides.observers.resolve( + PIPELINE_SCOPE, self.dag.pipeline_observers + ) + + def _resolve_step_observers(self, node: Any, step_name: str) -> list: + pipeline_observers = self._resolve_pipeline_observers() + step_observers = [obs for obs in node.observers if obs.source == "step"] + if self._overrides is not None: + step_observers = self._overrides.observers.resolve( + step_name, step_observers + ) + return [*pipeline_observers, *step_observers] + async def _dispatch_pipeline_event( self, event: PipelineEvent, step_name: str | None = None, exception: BaseException | None = None, ) -> None: - registrations = self.dag.pipeline_observers + registrations = self._resolve_pipeline_observers() if not registrations: return ctx: Any @@ -269,7 +286,7 @@ async def _dispatch_step_event( completed_all_inputs: bool = True, exception: BaseException | None = None, ) -> None: - registrations = node.observers + registrations = self._resolve_step_observers(node, step_name) if not registrations: return ctx: Any @@ -317,7 +334,7 @@ async def _dispatch_materialization_event( materializer_name: str | None = None, exception: BaseException | None = None, ) -> None: - registrations = node.observers + registrations = self._resolve_step_observers(node, step_name) if not registrations: return ctx: Any diff --git a/synaflow/execution/overrides.py b/synaflow/execution/overrides.py index 28174b9..9f36673 100644 --- a/synaflow/execution/overrides.py +++ b/synaflow/execution/overrides.py @@ -2,7 +2,9 @@ from dataclasses import dataclass from typing import Any +from synaflow.core.constants import PIPELINE_SCOPE from synaflow.core.definition import PipelineDef +from synaflow.core.observers import Observer, ResolvedObserver class PipelineRegistry(MutableMapping[str, Any]): @@ -26,8 +28,7 @@ def __getitem__(self, key: str) -> Any: def __setitem__(self, key: str, value: Any) -> None: self._validate_key(key) - self._validate_value(key, value) - self._overrides[key] = value + self._overrides[key] = self._normalize_value(key, value) def __delitem__(self, key: str) -> None: self._validate_key(key) @@ -56,6 +57,10 @@ def _validate_key(self, key: str) -> None: def _validate_value(self, key: str, value: Any) -> None: return None + def _normalize_value(self, key: str, value: Any) -> Any: + self._validate_value(key, value) + return value + class MaterializerRegistry(PipelineRegistry): @classmethod @@ -74,17 +79,58 @@ def _validate_value(self, key: str, value: Any) -> None: raise TypeError(f"Materializer override for step '{key}' must be callable.") +class ObserverRegistry(PipelineRegistry): + @classmethod + def empty(cls, pipeline: PipelineDef) -> "ObserverRegistry": + return cls(contract_keys=_observer_contract_keys(pipeline)) + + @classmethod + def from_production(cls, pipeline: PipelineDef) -> "ObserverRegistry": + return cls( + contract_keys=_observer_contract_keys(pipeline), + fallback_values=_observer_fallback_values(pipeline), + ) + + def _normalize_value(self, key: str, value: Any) -> list[ResolvedObserver]: + if not isinstance(value, list): + raise TypeError( + f"Observer override for scope '{key}' must be a list of observers." + ) + + source = "pipeline" if key == PIPELINE_SCOPE else "step" + normalized: list[ResolvedObserver] = [] + for item in value: + if isinstance(item, ResolvedObserver): + normalized.append(item) + elif isinstance(item, Observer): + normalized.append(ResolvedObserver(handler=item.handler, source=source)) + elif callable(item): + normalized.append(ResolvedObserver(handler=item, source=source)) + else: + raise TypeError( + f"Observer override for scope '{key}' must contain only callables or Observer registrations." + ) + return normalized + + @dataclass(frozen=True) class ExecutionOverrides: materializers: MaterializerRegistry + observers: ObserverRegistry @classmethod def empty(cls, pipeline: PipelineDef) -> "ExecutionOverrides": - return cls(materializers=MaterializerRegistry.empty(pipeline)) + return cls( + materializers=MaterializerRegistry.empty(pipeline), + observers=ObserverRegistry.empty(pipeline), + ) @classmethod def from_production(cls, pipeline: PipelineDef) -> "ExecutionOverrides": - return cls(materializers=MaterializerRegistry.from_production(pipeline)) + return cls( + materializers=MaterializerRegistry.from_production(pipeline), + observers=ObserverRegistry.from_production(pipeline), + ) def _materializer_contract_keys(pipeline: PipelineDef) -> set[str]: @@ -101,3 +147,28 @@ def _materializer_fallback_values(pipeline: PipelineDef) -> dict[str, Any]: for step_name, node in pipeline.dag.steps.items() if node.materializer is not None } + + +def _observer_contract_keys(pipeline: PipelineDef) -> set[str]: + keys = set() + if pipeline.dag.pipeline_observers: + keys.add(PIPELINE_SCOPE) + keys.update( + step_name for step_name, node in pipeline.dag.steps.items() if node.observers + ) + return keys + + +def _observer_fallback_values( + pipeline: PipelineDef, +) -> dict[str, list[ResolvedObserver]]: + values: dict[str, list[ResolvedObserver]] = {} + if pipeline.dag.pipeline_observers: + values[PIPELINE_SCOPE] = list(pipeline.dag.pipeline_observers) + for step_name, node in pipeline.dag.steps.items(): + step_local = [ + observer for observer in node.observers if observer.source == "step" + ] + if step_local: + values[step_name] = step_local + return values diff --git a/synaflow/execution/sync_engine/executor.py b/synaflow/execution/sync_engine/executor.py index fd8afcf..b1c9791 100644 --- a/synaflow/execution/sync_engine/executor.py +++ b/synaflow/execution/sync_engine/executor.py @@ -5,6 +5,7 @@ from typing import Any from synaflow.core.dag import Dag +from synaflow.core.constants import PIPELINE_SCOPE from synaflow.execution.bounded_iterator import BoundedIterator from synaflow.core.definition import PipelineDef from synaflow.core.exceptions import PipelineStopException @@ -124,13 +125,29 @@ def __init__( # Lifecycle observer dispatch helpers # ------------------------------------------------------------------ + def _resolve_pipeline_observers(self) -> list: + if self._overrides is None: + return self.dag.pipeline_observers + return self._overrides.observers.resolve( + PIPELINE_SCOPE, self.dag.pipeline_observers + ) + + def _resolve_step_observers(self, node: Any, step_name: str) -> list: + pipeline_observers = self._resolve_pipeline_observers() + step_observers = [obs for obs in node.observers if obs.source == "step"] + if self._overrides is not None: + step_observers = self._overrides.observers.resolve( + step_name, step_observers + ) + return [*pipeline_observers, *step_observers] + def _dispatch_pipeline_event( self, event: PipelineEvent, step_name: str | None = None, exception: BaseException | None = None, ) -> None: - registrations = self.dag.pipeline_observers + registrations = self._resolve_pipeline_observers() if not registrations: return ctx: Any @@ -159,7 +176,7 @@ def _dispatch_step_event( completed_all_inputs: bool = True, exception: BaseException | None = None, ) -> None: - registrations = node.observers + registrations = self._resolve_step_observers(node, step_name) if not registrations: return ctx: Any @@ -207,7 +224,7 @@ def _dispatch_materialization_event( materializer_name: str | None = None, exception: BaseException | None = None, ) -> None: - registrations = node.observers + registrations = self._resolve_step_observers(node, step_name) if not registrations: return ctx: Any diff --git a/tests/execution/test_execution_overrides.py b/tests/execution/test_execution_overrides.py index b350fb4..8647762 100644 --- a/tests/execution/test_execution_overrides.py +++ b/tests/execution/test_execution_overrides.py @@ -2,7 +2,16 @@ import pytest -from synaflow import ExecutionOverrides, async_run, pipeline, step +from synaflow import ( + ExecutionOverrides, + Observer, + PIPELINE_SCOPE, + PipelineEvent, + StepEvent, + async_run, + pipeline, + step, +) def test_given_materializer_override_when_sync_run_then_override_is_used( @@ -133,3 +142,149 @@ def consume(items: list[int]) -> None: with pytest.raises(TypeError, match="must be callable"): overrides.materializers["items"] = 123 + + +def test_given_pipeline_observer_override_when_sync_run_then_pipeline_and_step_events_use_override( + run_pipeline, +): + class Params(NamedTuple): + value: int = 1 + + events = [] + + def record(ctx): + events.append(type(ctx).__name__) + + def emit(value: int) -> int: + return value + + p = pipeline( + name="observer_pipeline_override", + params=Params, + steps=[step("emit", fn=emit)], + observers=[Observer(lambda ctx: None)], + ) + + overrides = ExecutionOverrides.empty(p) + overrides.observers[PIPELINE_SCOPE] = [Observer(record)] + + run_pipeline(p, Params(), overrides=overrides) + + assert "PipelineStartedContext" in events + assert "PipelineCompletedContext" in events + assert "StepStartedContext" in events + assert "StepCompletedContext" in events + + +def test_given_step_observer_override_when_sync_run_then_only_step_events_use_override( + run_pipeline, +): + class Params(NamedTuple): + value: int = 1 + + pipeline_events = [] + step_events = [] + + def record_pipeline(ctx): + if ctx.event in (PipelineEvent.STARTED, PipelineEvent.COMPLETED): + pipeline_events.append(type(ctx).__name__) + + def record_step(ctx): + if ctx.event in (StepEvent.STARTED, StepEvent.COMPLETED): + step_events.append(type(ctx).__name__) + + def emit(value: int) -> int: + return value + + p = pipeline( + name="observer_step_override", + params=Params, + steps=[step("emit", fn=emit, observers=[Observer(lambda ctx: None)])], + observers=[Observer(record_pipeline)], + ) + + overrides = ExecutionOverrides.empty(p) + overrides.observers["emit"] = [Observer(record_step)] + + run_pipeline(p, Params(), overrides=overrides) + + assert pipeline_events == ["PipelineStartedContext", "PipelineCompletedContext"] + assert step_events == ["StepStartedContext", "StepCompletedContext"] + + +async def test_given_pipeline_observer_override_when_async_run_then_pipeline_and_step_events_use_override(): + class Params(NamedTuple): + value: int = 1 + + events = [] + + async def record(ctx): + events.append(type(ctx).__name__) + + async def emit(value: int) -> int: + return value + + p = pipeline( + name="async_observer_pipeline_override", + params=Params, + steps=[step("emit", fn=emit)], + observers=[Observer(lambda ctx: None)], + ) + + overrides = ExecutionOverrides.empty(p) + overrides.observers[PIPELINE_SCOPE] = [Observer(record)] + + await async_run(p, Params(), overrides=overrides) + + assert "PipelineStartedContext" in events + assert "PipelineCompletedContext" in events + assert "StepStartedContext" in events + assert "StepCompletedContext" in events + + +def test_given_invalid_observer_override_key_when_assigned_then_raises(): + class Params(NamedTuple): + value: int = 1 + + def emit(value: int) -> int: + return value + + p = pipeline( + name="invalid_observer_key", + params=Params, + steps=[step("emit", fn=emit)], + observers=[Observer(lambda ctx: None)], + ) + + overrides = ExecutionOverrides.empty(p) + + with pytest.raises(KeyError, match="Unknown override key 'missing'"): + overrides.observers["missing"] = [Observer(lambda ctx: None)] + + +def test_given_invalid_observer_override_value_when_assigned_then_raises(): + class Params(NamedTuple): + value: int = 1 + + def emit(value: int) -> int: + return value + + p = pipeline( + name="invalid_observer_value", + params=Params, + steps=[step("emit", fn=emit)], + observers=[Observer(lambda ctx: None)], + ) + + overrides = ExecutionOverrides.empty(p) + + record = lambda ctx: None + + with pytest.raises(TypeError, match="must be a list of observers"): + overrides.observers[PIPELINE_SCOPE] = record + + with pytest.raises( + TypeError, + match="must contain only callables or Observer registrations", + ): + overrides.observers[PIPELINE_SCOPE] = [123] From 7073630c6e0f4c4d796b0766a3a017779c15f30f Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Sat, 20 Jun 2026 01:55:31 +0100 Subject: [PATCH 2/5] add scope helper for override keys --- synaflow/__init__.py | 2 + synaflow/core/naming.py | 28 +++++ synaflow/execution/overrides.py | 54 ++++++---- tests/core/test_naming.py | 15 ++- tests/execution/test_execution_overrides.py | 114 ++++++++++++++++++++ 5 files changed, 190 insertions(+), 23 deletions(-) diff --git a/synaflow/__init__.py b/synaflow/__init__.py index eb31fad..ea4af72 100644 --- a/synaflow/__init__.py +++ b/synaflow/__init__.py @@ -1,5 +1,6 @@ from .core.definition import include, pipeline, step from .core.constants import PIPELINE_SCOPE +from .core.naming import Scope from .core.observers import ( MaterializationEvent, Observer, @@ -23,6 +24,7 @@ "step", "include", "PIPELINE_SCOPE", + "Scope", "run", "async_run", "ExecutionOverrides", diff --git a/synaflow/core/naming.py b/synaflow/core/naming.py index 6a0f117..c1465fe 100644 --- a/synaflow/core/naming.py +++ b/synaflow/core/naming.py @@ -4,6 +4,8 @@ plural, suffixed) without manual wiring. """ +from dataclasses import dataclass + import inflect _engine = inflect.engine() @@ -11,6 +13,32 @@ _SUFFIXES = {"_list", "_set", "_dict", "_tuple"} +@dataclass(frozen=True) +class Scope: + parts: tuple[str, ...] + + def __init__(self, *parts: str): + normalized = tuple(_validate_scope_part(part) for part in parts) + if not normalized: + raise ValueError("Scope requires at least one non-empty part.") + object.__setattr__(self, "parts", normalized) + + def scope(self, part: str) -> "Scope": + return Scope(*self.parts, part) + + def __call__(self, step_name: str) -> str: + return str(self.scope(step_name)) + + def __str__(self) -> str: + return "__".join(self.parts) + + +def _validate_scope_part(part: str) -> str: + if not isinstance(part, str) or not part: + raise ValueError("Scope parts must be non-empty strings.") + return part + + def get_base_dataset_name(name: str) -> str: """Return the absolute plural Base Dataset name. diff --git a/synaflow/execution/overrides.py b/synaflow/execution/overrides.py index 9f36673..d5d6c9d 100644 --- a/synaflow/execution/overrides.py +++ b/synaflow/execution/overrides.py @@ -4,6 +4,7 @@ from synaflow.core.constants import PIPELINE_SCOPE from synaflow.core.definition import PipelineDef +from synaflow.core.naming import Scope from synaflow.core.observers import Observer, ResolvedObserver @@ -18,23 +19,26 @@ def __init__( self._fallback_values = dict(fallback_values or {}) self._overrides: dict[str, Any] = {} - def __getitem__(self, key: str) -> Any: - self._validate_key(key) - if key in self._overrides: - return self._overrides[key] - if key in self._fallback_values: - return self._fallback_values[key] - raise KeyError(key) - - def __setitem__(self, key: str, value: Any) -> None: - self._validate_key(key) - self._overrides[key] = self._normalize_value(key, value) - - def __delitem__(self, key: str) -> None: - self._validate_key(key) - if key not in self._overrides: - raise KeyError(key) - del self._overrides[key] + def __getitem__(self, key: str | Scope) -> Any: + normalized_key = self._normalize_key(key) + self._validate_key(normalized_key) + if normalized_key in self._overrides: + return self._overrides[normalized_key] + if normalized_key in self._fallback_values: + return self._fallback_values[normalized_key] + raise KeyError(normalized_key) + + def __setitem__(self, key: str | Scope, value: Any) -> None: + normalized_key = self._normalize_key(key) + self._validate_key(normalized_key) + self._overrides[normalized_key] = self._normalize_value(normalized_key, value) + + def __delitem__(self, key: str | Scope) -> None: + normalized_key = self._normalize_key(key) + self._validate_key(normalized_key) + if normalized_key not in self._overrides: + raise KeyError(normalized_key) + del self._overrides[normalized_key] def __iter__(self) -> Iterator[str]: return iter(sorted(self._contract_keys)) @@ -42,13 +46,19 @@ def __iter__(self) -> Iterator[str]: def __len__(self) -> int: return len(self._contract_keys) - def resolve(self, key: str, default: Any = None) -> Any: - if key in self._overrides: - return self._overrides[key] - if key in self._fallback_values: - return self._fallback_values[key] + def resolve(self, key: str | Scope, default: Any = None) -> Any: + normalized_key = self._normalize_key(key) + if normalized_key in self._overrides: + return self._overrides[normalized_key] + if normalized_key in self._fallback_values: + return self._fallback_values[normalized_key] return default + def _normalize_key(self, key: str | Scope) -> str: + if isinstance(key, Scope): + return str(key) + return key + def _validate_key(self, key: str) -> None: if key not in self._contract_keys: valid = ", ".join(sorted(self._contract_keys)) or "" diff --git a/tests/core/test_naming.py b/tests/core/test_naming.py index 2649254..b7db85c 100644 --- a/tests/core/test_naming.py +++ b/tests/core/test_naming.py @@ -1,6 +1,6 @@ import pytest -from synaflow.core.naming import get_base_dataset_name +from synaflow.core.naming import Scope, get_base_dataset_name @pytest.mark.parametrize( @@ -26,3 +26,16 @@ ) def test_get_base_dataset_name(name, expected): assert get_base_dataset_name(name) == expected + + +def test_scope_builds_nested_step_keys(): + sub = Scope("incl") + + assert str(sub) == "incl" + assert str(sub.scope("validator")) == "incl__validator" + assert sub("validator") == "incl__validator" + + +def test_scope_rejects_empty_parts(): + with pytest.raises(ValueError, match="non-empty"): + Scope("") diff --git a/tests/execution/test_execution_overrides.py b/tests/execution/test_execution_overrides.py index 8647762..e0339e8 100644 --- a/tests/execution/test_execution_overrides.py +++ b/tests/execution/test_execution_overrides.py @@ -7,8 +7,10 @@ Observer, PIPELINE_SCOPE, PipelineEvent, + Scope, StepEvent, async_run, + include, pipeline, step, ) @@ -288,3 +290,115 @@ def emit(value: int) -> int: match="must contain only callables or Observer registrations", ): overrides.observers[PIPELINE_SCOPE] = [123] + + +def test_given_scope_key_when_materializer_overridden_in_sub_pipeline_then_override_is_used( + run_pipeline, +): + class SubParams(NamedTuple): + value: int + + class Params(NamedTuple): + value: int = 3 + + def prepare(value: int) -> Iterator[int]: + yield value + yield value + 1 + + def finish(prepare: list[int]) -> int: + return sum(prepare) + + sub = pipeline( + name="sub", + params=SubParams, + steps=[ + step("prepare", fn=prepare), + step("finish", fn=finish), + ], + exports="finish", + ) + + def adapt(value: int) -> SubParams: + return SubParams(value=value) + + captured = [] + + def consume(incl: int) -> None: + captured.append(incl) + + p = pipeline( + name="scope_materializer_override", + params=Params, + steps=[ + include("incl", pipeline=sub, fn=adapt), + step("consume", fn=consume), + ], + ) + + overrides = ExecutionOverrides.empty(p) + sub_scope = Scope("incl") + overrides.materializers[sub_scope.scope("prepare")] = lambda items: [ + item + 1 for item in items + ] + + run_pipeline(p, Params(), overrides=overrides) + + assert captured == [9] + + +def test_given_scope_key_when_observer_overridden_in_sub_pipeline_then_override_is_used( + run_pipeline, +): + class SubParams(NamedTuple): + value: int + + class Params(NamedTuple): + value: int = 3 + + events = [] + + def record(ctx): + if ctx.event in (StepEvent.STARTED, StepEvent.COMPLETED): + events.append((ctx.step_name, type(ctx).__name__)) + + def prepare(value: int) -> int: + return value * 2 + + def finish(prepare: int) -> int: + return prepare + + sub = pipeline( + name="sub", + params=SubParams, + steps=[ + step("prepare", fn=prepare, observers=[Observer(lambda ctx: None)]), + step("finish", fn=finish), + ], + exports="finish", + ) + + def adapt(value: int) -> SubParams: + return SubParams(value=value) + + def consume(incl: int) -> None: + return None + + p = pipeline( + name="scope_observer_override", + params=Params, + steps=[ + include("incl", pipeline=sub, fn=adapt), + step("consume", fn=consume), + ], + ) + + overrides = ExecutionOverrides.empty(p) + sub_scope = Scope("incl") + overrides.observers[sub_scope.scope("prepare")] = [Observer(record)] + + run_pipeline(p, Params(), overrides=overrides) + + assert events == [ + ("incl__prepare", "StepStartedContext"), + ("incl__prepare", "StepCompletedContext"), + ] From c3296f7e56ce7ec55a2517f2c7aa96cd0956725e Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Sat, 20 Jun 2026 02:01:11 +0100 Subject: [PATCH 3/5] accept builtin concrete materializers --- synaflow/core/type_compatibility.py | 5 +++- tests/core/test_dag_builder_materializer.py | 26 +++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/synaflow/core/type_compatibility.py b/synaflow/core/type_compatibility.py index e35de2c..ad3343a 100644 --- a/synaflow/core/type_compatibility.py +++ b/synaflow/core/type_compatibility.py @@ -7,7 +7,10 @@ def is_factory(func: Callable) -> bool: if not callable(func): return False - sig = inspect.signature(func) + try: + sig = inspect.signature(func) + except (TypeError, ValueError): + return False for param in sig.parameters.values(): if param.name in ("ctx", "context") or "MaterializeContext" in str( param.annotation diff --git a/tests/core/test_dag_builder_materializer.py b/tests/core/test_dag_builder_materializer.py index 9124d84..c5a7812 100644 --- a/tests/core/test_dag_builder_materializer.py +++ b/tests/core/test_dag_builder_materializer.py @@ -232,6 +232,32 @@ def consumer(producer: list[Row]) -> int: assert p.dag is not None +def test_given_builtin_concrete_materializer_when_signature_not_inspectable_then_dag_builds(): + from collections.abc import Iterator + from synaflow import pipeline, step + + class Params(NamedTuple): + pass + + def producer() -> Iterator[int]: + yield 1 + yield 2 + + def consumer(producer: int) -> None: + pass + + p = pipeline( + name="test_builtin_concrete_materializer", + params=Params, + steps=[ + step("producer", fn=producer, materializer=int), + step("consumer", fn=consumer), + ], + ) + assert p.dag is not None + assert p.dag.steps["producer"].materializer is int + + def test_given_pipeline_materializer_when_non_builtin_inner_type_used_then_dag_builds(): from dataclasses import dataclass from collections.abc import Iterator From 74023e44edbf6b568e58ca28acc74d6ed750bafa Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Sat, 20 Jun 2026 02:03:01 +0100 Subject: [PATCH 4/5] document scope override examples --- README.md | 14 ++++++++++++++ docs/user_docs/core-concepts/build-vs-run.md | 18 ++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/README.md b/README.md index 488bf3d..5374c45 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,20 @@ runtime dependencies such as materializers or observers. Use `PIPELINE_SCOPE` for pipeline-level observers. The DAG shape and semantics stay fixed; only the runtime callable changes. +```python +from synaflow import ExecutionOverrides, Observer, PIPELINE_SCOPE, Scope + +overrides = ExecutionOverrides.empty(p) +sub = Scope("payments") + +overrides.observers[PIPELINE_SCOPE] = [Observer(noop_metrics)] +overrides.observers[sub.scope("validate")] = [Observer(test_recorder)] +overrides.materializers[sub.scope("normalize")] = list +``` + +For included sub-pipelines, `Scope(...)` is the public helper for addressing +compiled step keys without hardcoding `"payments__validate"` by hand. + ### Build your own runner The DAG compiles to a deterministic JSON contract. Write custom runners or diff --git a/docs/user_docs/core-concepts/build-vs-run.md b/docs/user_docs/core-concepts/build-vs-run.md index 0870883..310bb84 100644 --- a/docs/user_docs/core-concepts/build-vs-run.md +++ b/docs/user_docs/core-concepts/build-vs-run.md @@ -72,6 +72,24 @@ runtime callable for a compiled key such as a materializer or observer scope, but it does not change graph structure, dependency resolution, or eager-vs-lazy planning. +For nested pipelines, the public key helper is `Scope`, not manual string +concatenation: + +```python +from synaflow import ExecutionOverrides, Observer, PIPELINE_SCOPE, Scope + +overrides = ExecutionOverrides.empty(p) +sub = Scope("incl") + +overrides.observers[PIPELINE_SCOPE] = [Observer(noop)] +overrides.observers[sub.scope("validate")] = [Observer(spy)] +overrides.materializers[sub.scope("prepare")] = tuple +``` + +The executor never understands sub-pipelines directly. `Scope` resolves to the +compiled DAG step key before execution starts, so runtime still operates on the +same flat compiled contract. + ### 2. Write your own runner The `Dag` object is self-contained. Anyone can write a runner: From 1608aa97df1feba655a7043891f57391e8f5c8a2 Mon Sep 17 00:00:00 2001 From: Marcelo Valle Date: Sat, 20 Jun 2026 02:06:17 +0100 Subject: [PATCH 5/5] align observer empty overrides with design --- synaflow/execution/overrides.py | 6 +- tests/execution/test_execution_overrides.py | 92 ++++++++++++++++++++- 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/synaflow/execution/overrides.py b/synaflow/execution/overrides.py index d5d6c9d..2d0f7c3 100644 --- a/synaflow/execution/overrides.py +++ b/synaflow/execution/overrides.py @@ -92,7 +92,11 @@ def _validate_value(self, key: str, value: Any) -> None: class ObserverRegistry(PipelineRegistry): @classmethod def empty(cls, pipeline: PipelineDef) -> "ObserverRegistry": - return cls(contract_keys=_observer_contract_keys(pipeline)) + contract_keys = _observer_contract_keys(pipeline) + return cls( + contract_keys=contract_keys, + fallback_values={key: [] for key in contract_keys}, + ) @classmethod def from_production(cls, pipeline: PipelineDef) -> "ObserverRegistry": diff --git a/tests/execution/test_execution_overrides.py b/tests/execution/test_execution_overrides.py index e0339e8..5d66805 100644 --- a/tests/execution/test_execution_overrides.py +++ b/tests/execution/test_execution_overrides.py @@ -102,6 +102,30 @@ def consume(items: list[int]) -> None: assert overrides.materializers["items"] is list +def test_given_execution_overrides_empty_when_materializer_not_overridden_then_compiled_callable_is_kept(): + class Params(NamedTuple): + count: int = 1 + + def gen(count: int) -> Iterator[int]: + yield from range(count) + + def consume(items: list[int]) -> None: + return None + + p = pipeline( + name="compiled_materializer_empty", + params=Params, + steps=[ + step("items", fn=gen), + step("consume", fn=consume), + ], + ) + + overrides = ExecutionOverrides.empty(p) + + assert overrides.materializers["items"] is list + + def test_given_unknown_materializer_override_key_when_assigned_then_raises(): class Params(NamedTuple): value: int = 1 @@ -178,6 +202,72 @@ def emit(value: int) -> int: assert "StepCompletedContext" in events +def test_given_execution_overrides_empty_when_observers_not_overridden_then_all_observers_are_silenced( + run_pipeline, +): + class Params(NamedTuple): + value: int = 1 + + events = [] + + def record_pipeline(ctx): + events.append(type(ctx).__name__) + + def record_step(ctx): + events.append(type(ctx).__name__) + + def emit(value: int) -> int: + return value + + p = pipeline( + name="observer_step_override", + params=Params, + steps=[step("emit", fn=emit, observers=[Observer(lambda ctx: None)])], + observers=[Observer(record_pipeline)], + ) + + overrides = ExecutionOverrides.empty(p) + + run_pipeline(p, Params(), overrides=overrides) + + assert events == [] + + +def test_given_execution_overrides_from_production_when_observers_not_overridden_then_compiled_observers_are_kept( + run_pipeline, +): + class Params(NamedTuple): + value: int = 1 + + pipeline_events = [] + step_events = [] + + def record_pipeline(ctx): + if ctx.event in (PipelineEvent.STARTED, PipelineEvent.COMPLETED): + pipeline_events.append(type(ctx).__name__) + + def record_step(ctx): + if ctx.event in (StepEvent.STARTED, StepEvent.COMPLETED): + step_events.append(type(ctx).__name__) + + def emit(value: int) -> int: + return value + + p = pipeline( + name="observer_step_from_production", + params=Params, + steps=[step("emit", fn=emit, observers=[Observer(record_step)])], + observers=[Observer(record_pipeline)], + ) + + overrides = ExecutionOverrides.from_production(p) + + run_pipeline(p, Params(), overrides=overrides) + + assert pipeline_events == ["PipelineStartedContext", "PipelineCompletedContext"] + assert step_events == ["StepStartedContext", "StepCompletedContext"] + + def test_given_step_observer_override_when_sync_run_then_only_step_events_use_override( run_pipeline, ): @@ -210,7 +300,7 @@ def emit(value: int) -> int: run_pipeline(p, Params(), overrides=overrides) - assert pipeline_events == ["PipelineStartedContext", "PipelineCompletedContext"] + assert pipeline_events == [] assert step_events == ["StepStartedContext", "StepCompletedContext"]