Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,16 @@ from synaflow import ExecutionOverrides, Observer, PIPELINE_SCOPE, Scope
overrides = ExecutionOverrides.empty(p)
sub = Scope("payments")

overrides.resources["db"] = FakeDatabase()
overrides.observers[PIPELINE_SCOPE] = [Observer(noop_metrics)]
overrides.observers[sub.scope("validate")] = [Observer(test_recorder)]
overrides.materializers[sub.scope("normalize")] = list
```

For included sub-pipelines, `Scope(...)` is the public helper for addressing
compiled step keys without hardcoding `"payments__validate"` by hand.
Declared `resources={...}` are runtime-only and must be provided through
`ExecutionOverrides.resources`.

### Build your own runner

Expand Down
13 changes: 10 additions & 3 deletions docs/user_docs/core-concepts/build-vs-run.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ resolved at build time and frozen in the JSON or `Dag`. Runners don't re-infer
semantics; they execute the contract.

`ExecutionOverrides` fits inside that boundary: it can swap the concrete
runtime callable for a compiled key such as a materializer or observer scope,
but it does not change graph structure, dependency resolution, or eager-vs-lazy
planning.
runtime callable for a compiled key such as a materializer, observer scope, or
declared runtime resource, but it does not change graph structure, dependency
resolution, or eager-vs-lazy planning.

For nested pipelines, the public key helper is `Scope`, not manual string
concatenation:
Expand All @@ -81,6 +81,7 @@ from synaflow import ExecutionOverrides, Observer, PIPELINE_SCOPE, Scope
overrides = ExecutionOverrides.empty(p)
sub = Scope("incl")

overrides.resources["db"] = FakeDatabase()
overrides.observers[PIPELINE_SCOPE] = [Observer(noop)]
overrides.observers[sub.scope("validate")] = [Observer(spy)]
overrides.materializers[sub.scope("prepare")] = tuple
Expand All @@ -90,6 +91,11 @@ The executor never understands sub-pipelines directly. `Scope` resolves to the
compiled DAG step key before execution starts, so runtime still operates on the
same flat compiled contract.

For resources, the key space is explicit in the compiled pipeline contract via
`pipeline(resources={...})`. Unlike materializers and observers, resources are
runtime-only: if a pipeline declares one, `run()` / `async_run()` must receive
it via `ExecutionOverrides.resources`, or execution fails loudly.

### 2. Write your own runner

The `Dag` object is self-contained. Anyone can write a runner:
Expand Down Expand Up @@ -141,6 +147,7 @@ Every domain concern has a symmetric representation in both phases:
| Mode resolution | Resolved at build time → `node.mode` | Executor reads `node.mode`, never re-infers |
| Materialization | Resolved at build time → `node.materializer`, `materialized_deps`, `Dag.needs_materialize(...)` | Executor calls the resolved callable and follows the compiled plan |
| Observers | Normalized at build time → `node.observers` | Executor dispatches events |
| Resources | Declared at build time → `dag.resources` | Executor requires concrete runtime values via `ExecutionOverrides.resources` |

This symmetry means sync and async executors can be completely different
implementations (one uses generators, the other uses `asyncio.Queue`) but
Expand Down
3 changes: 2 additions & 1 deletion synaflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
StepEvent,
)
from .core.types import OnError, StepMode, StepParams, StepResult
from .execution import ExecutionOverrides
from .execution import ExecutionOverrides, ResourceRegistry
from .execution.async_engine.executor import async_run
from .execution.sync_engine.executor import run
from .serializers import (
Expand All @@ -28,6 +28,7 @@
"run",
"async_run",
"ExecutionOverrides",
"ResourceRegistry",
"OnError",
"StepMode",
"StepParams",
Expand Down
7 changes: 7 additions & 0 deletions synaflow/core/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def _serialize_pipeline_observers(observers: list) -> list[dict]:
class Dag:
name: str = ""
params: dict[str, Any] = field(default_factory=dict)
resources: dict[str, Any] = field(default_factory=dict)
steps: dict[str, DagNode] = field(default_factory=dict)
requires_sync_runner: bool = False
requires_async_runner: bool = False
Expand Down Expand Up @@ -132,6 +133,8 @@ def values(self):
def get(self, key, default=None):
if key in self.steps:
return self.steps[key]
if key in self.resources:
return DagNode(output=self.resources[key])
if key in self.params:
return DagNode(output=self.params[key])
return default
Expand All @@ -147,6 +150,10 @@ def to_dict(self) -> dict:
name: node.to_serializable() for name, node in self.steps.items()
},
}
if self.resources:
result["resources"] = {
k: get_type_name(v) for k, v in self.resources.items()
}
if self.error_materializer_factory is not None:
result["error_materializer"] = self.error_materializer_factory.__name__
if self.pipeline_observers:
Expand Down
88 changes: 86 additions & 2 deletions synaflow/core/dag_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
)

from synaflow.core.dag import Dag, DagNode
from synaflow.core.dag_dependencies import initialize_parameters
from synaflow.core.dag_dependencies import initialize_parameters, initialize_resources
from synaflow.core.definition import IncludeStep
from synaflow.core.dag_expansion import expand_macros
from synaflow.core.dag_steps import (
validate_and_compile_step,
Expand Down Expand Up @@ -169,6 +170,69 @@ def _validate_declared_step_names(steps: list[Any], pipeline_name: str) -> None:
validate_unique_step_name(step.name, {}, pipeline_name)


def _validate_resource_names(
resources: dict[str, Any],
params: type[NamedTuple],
expanded_steps: list[Any],
pipeline_name: str,
) -> None:
param_fields = set(getattr(params, "_fields", []))
step_names = {step.name for step in expanded_steps}

for resource_name in resources:
if resource_name in param_fields:
raise ValueError(
f"Pipeline '{pipeline_name}': resource '{resource_name}' collides with a params field."
)
if resource_name in step_names:
raise ValueError(
f"Pipeline '{pipeline_name}': resource '{resource_name}' collides with a step name."
)


def _merge_resources(
merged: dict[str, Any],
incoming: dict[str, Any],
pipeline_name: str,
) -> None:
for resource_name, resource in incoming.items():
if resource_name in merged and merged[resource_name] is not resource:
raise ValueError(
f"Pipeline '{pipeline_name}': resource '{resource_name}' is declared multiple times with different instances/factories."
)
merged.setdefault(resource_name, resource)


def _collect_pipeline_resources(
pipeline_name: str,
steps: list[Any],
resources: dict[str, Any],
include_chain: tuple[str, ...] = (),
) -> dict[str, Any]:
merged: dict[str, Any] = {}
_merge_resources(merged, resources, pipeline_name)

for step in steps:
if not isinstance(step, IncludeStep):
continue

sub_pipeline = step.pipeline
if sub_pipeline.name in include_chain:
raise ValueError(
f"Infinite cycle detected: Pipeline '{sub_pipeline.name}' is already in the inclusion chain '{'.'.join(include_chain)}'"
)

sub_resources = _collect_pipeline_resources(
pipeline_name,
sub_pipeline.steps,
sub_pipeline.resources,
(*include_chain, sub_pipeline.name),
)
_merge_resources(merged, sub_resources, pipeline_name)

return merged


def _resolve_pipeline_observers(
pipeline_observers: list[Observer],
) -> list[ResolvedObserver]:
Expand Down Expand Up @@ -328,10 +392,13 @@ def _compile_steps(
expanded_steps: list[Any],
pipeline_name: str,
params: type[NamedTuple],
resources: dict[str, Any],
pipeline_observers: list[ResolvedObserver],
) -> tuple[dict[str, DagNode], dict[str, DagNode]]:
dag: dict[str, DagNode] = {}
produced = initialize_parameters(params)
produced.update(initialize_resources(resources))
resource_nodes = initialize_resources(resources)

for step in expanded_steps:
validate_step_is_callable(step, pipeline_name)
Expand All @@ -340,6 +407,7 @@ def _compile_steps(
compiled_step = validate_and_compile_step(
step,
produced,
resource_nodes,
pipeline_name,
observers=_resolve_step_observers(pipeline_observers, step.observers),
)
Expand All @@ -353,12 +421,18 @@ def _finalize_dag(
pipeline_name: str,
dag: dict[str, DagNode],
produced: dict[str, DagNode],
resource_names: set[str],
error_materializer_factory: Any,
pipeline_observers: list[ResolvedObserver],
) -> Dag:
dag_obj = Dag(name=pipeline_name)
dag_obj.params = {
name: info.output for name, info in produced.items() if name not in dag
name: info.output
for name, info in produced.items()
if name not in dag and name not in resource_names
}
dag_obj.resources = {
name: info.output for name, info in produced.items() if name in resource_names
}
dag_obj.steps = dag
dag_obj.error_materializer_factory = error_materializer_factory
Expand All @@ -370,6 +444,7 @@ def build_dag(
pipeline_name: str,
params: type[NamedTuple],
steps: list[Any],
resources: dict[str, Any] | None = None,
memory_materializer_factory: Any = None,
is_default_factory: bool = False,
error_materializer_factory: Any = None,
Expand All @@ -382,17 +457,26 @@ def build_dag(
_validate_params_is_namedtuple(params, pipeline_name)
pipeline_obs_resolved = _resolve_pipeline_observers(pipeline_observers or [])
expanded_steps = _expand_and_validate_steps(steps, pipeline_name)
effective_resources = _collect_pipeline_resources(
pipeline_name,
steps,
resources or {},
include_chain=(pipeline_name,),
)
_validate_resource_names(effective_resources, params, expanded_steps, pipeline_name)
dag, produced = _compile_steps(
expanded_steps,
pipeline_name,
params,
effective_resources,
pipeline_obs_resolved,
)
_compute_materialized_deps(dag)
dag_obj = _finalize_dag(
pipeline_name,
dag,
produced,
set(effective_resources),
error_materializer_factory,
pipeline_obs_resolved,
)
Expand Down
13 changes: 12 additions & 1 deletion synaflow/core/dag_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ def initialize_parameters(params: type[NamedTuple]) -> dict[str, DagNode]:
return produced


def initialize_resources(resources: dict[str, Any]) -> dict[str, DagNode]:
produced: dict[str, DagNode] = {}
for name, resource in resources.items():
resource_type = resource if isinstance(resource, type) else type(resource)
produced[name] = DagNode(output=resource_type)
return produced


def get_safe_type_hints(fn: Any) -> dict[str, Any]:
try:
return typing.get_type_hints(fn, include_extras=True)
Expand All @@ -52,6 +60,7 @@ def validate_and_resolve_dependencies(
sig: inspect.Signature,
hints: dict[str, Any],
produced: dict[str, DagNode],
resources: dict[str, DagNode],
pipeline_name: str,
) -> tuple[dict[str, Any], dict[str, str]]:
deps: dict[str, Any] = {}
Expand All @@ -62,7 +71,9 @@ def validate_and_resolve_dependencies(
if consumer_type is inspect.Parameter.empty:
consumer_type = None

if param_name in produced:
if param_name in resources:
producer_name = param_name
elif param_name in produced:
producer_name = param_name
else:
param_base = get_base_dataset_name(param_name)
Expand Down
15 changes: 15 additions & 0 deletions synaflow/core/dag_expansion.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def _extract_sub_pipeline_param_fields(params: Any) -> list[str]:
return []


def _extract_sub_pipeline_resource_fields(resources: dict[str, Any]) -> list[str]:
return list(resources)


def _build_expanded_step_name(prefix: str, sub_step: Step, exported_name: str) -> str:
if sub_step.name == exported_name:
return prefix
Expand All @@ -114,6 +118,7 @@ def _expand_sub_pipeline_steps(
include_step: IncludeStep,
adapter_name: str,
sub_pipeline_param_fields: list[str],
sub_pipeline_resource_fields: list[str],
new_parent_chain: str | None,
) -> list[Step]:
prefix = include_step.name
Expand All @@ -131,6 +136,7 @@ def _expand_sub_pipeline_steps(
prefix,
adapter_name,
sub_pipeline_param_fields,
sub_pipeline_resource_fields,
sub_pipeline.params,
)
materializer, error_materializer, observers = _resolve_sub_step_overrides(
Expand Down Expand Up @@ -172,10 +178,14 @@ def _expand_include(
include_step, current_pipeline_name, parent_chain
)
sub_pipeline_param_fields = _extract_sub_pipeline_param_fields(sub_pipeline.params)
sub_pipeline_resource_fields = _extract_sub_pipeline_resource_fields(
sub_pipeline.resources
)
expanded_steps = _expand_sub_pipeline_steps(
include_step,
adapter_name,
sub_pipeline_param_fields,
sub_pipeline_resource_fields,
new_parent_chain,
)
return [adapter_step, *expanded_steps]
Expand All @@ -186,11 +196,14 @@ def _build_argument_mapping(
prefix: str,
adapter_name: str,
sub_pipeline_param_fields: list[str],
sub_pipeline_resource_fields: list[str],
) -> dict[str, str]:
arg_mapping: dict[str, str] = {}
for param_name in signature.parameters:
if param_name in sub_pipeline_param_fields:
arg_mapping[param_name] = adapter_name
elif param_name in sub_pipeline_resource_fields:
arg_mapping[param_name] = param_name
else:
arg_mapping[param_name] = f"{prefix}__{param_name}"
return arg_mapping
Expand Down Expand Up @@ -241,6 +254,7 @@ def _wrap_sub_step_fn(
prefix: str,
adapter_name: str,
sub_pipeline_param_fields: list[str],
sub_pipeline_resource_fields: list[str],
sub_pipeline_params_class: Any,
) -> Any:
signature = inspect.signature(original_fn)
Expand All @@ -249,6 +263,7 @@ def _wrap_sub_step_fn(
prefix,
adapter_name,
sub_pipeline_param_fields,
sub_pipeline_resource_fields,
)

if inspect.iscoroutinefunction(original_fn):
Expand Down
3 changes: 2 additions & 1 deletion synaflow/core/dag_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def validate_unique_step_name(
def validate_and_compile_step(
step: Step,
produced: dict[str, DagNode],
resources: dict[str, DagNode],
pipeline_name: str,
observers: list | None = None,
) -> DagNode:
Expand All @@ -51,7 +52,7 @@ def validate_and_compile_step(
_validate_max_in_flight(step, pipeline_name)

deps, dataset_param_names = validate_and_resolve_dependencies(
step, sig, hints, produced, pipeline_name
step, sig, hints, produced, resources, pipeline_name
)

mode, each_mode_deps = resolve_step_mode(step, deps, produced, pipeline_name)
Expand Down
2 changes: 2 additions & 0 deletions synaflow/core/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class PipelineDef:
name: str
params: Any
steps: list[Step | IncludeStep]
resources: dict[str, Any] = field(default_factory=dict)
exports: str | None = None
materializer: Callable | None = None
error_materializer: Callable | None = None
Expand All @@ -58,6 +59,7 @@ def __post_init__(self) -> None:
self.name,
self.params,
self.steps,
self.resources,
self.materializer,
is_default_factory=(self.materializer is None),
error_materializer_factory=self.error_materializer,
Expand Down
Loading
Loading