Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
[![License](https://img.shields.io/github/license/humansoftware/synaflow)](https://github.com/humansoftware/synaflow/blob/main/LICENSE)
[![Python](https://img.shields.io/pypi/pyversions/synaflow)](https://pypi.org/project/synaflow/)

**SynaFlow** is a lightweight, pure-Python pipeline engine that uses **Type Hints** to automatically wire and execute Directed Acyclic Graphs (DAGs) with lockstep streaming.
**SynaFlow** is a lightweight, pure-Python pipeline engine that uses **Type Hints** to automatically wire and execute Directed Acyclic Graphs (DAGs) with lockstep streaming and optional bounded handoff.

**Why the name?** **Synapse** + **Flow**. Just like synapses automatically wire neurons together, SynaFlow automatically wires your functions together based on their types. "Flow" represents the lazy, streaming nature of how data moves through those connections.

Expand Down Expand Up @@ -56,11 +56,15 @@ type hints and wires the DAG automatically.
Parameter names match producer names — SynaFlow connects them automatically.
Singular/plural/suffix synonyms work too (`item` → `items`, `user_list` → `users`).

### Lazy streaming with zero boilerplate
### Lazy streaming with bounded handoff

Multiple consumers of the same producer? SynaFlow forks the stream with
`itertools.tee` and advances them in lockstep. One consumer can stream lazily
while another materializes — no manual `tee`, no memory spikes.
SynaFlow streams lazily by default. Multiple consumers can stay lockstep, one
consumer can stay lazy while another materializes, and when you need a bounded
window between stages you can set `max_in_flight` on the producing step.

This is especially useful for I/O-bound pipelines where one step starts work
and the next resolves it, such as HTTP requests, RPC calls, or object-store
reads.

### Static validation at build time

Expand All @@ -77,7 +81,7 @@ auto-generate native DAGs for Airflow, Prefect, or Dagster.
| | SynaFlow | Hamilton | Airflow / Prefect / Dagster |
|---|---|---|---|
| **Auto wiring** | ✅ type hints + smart binding | ✅ type hints (exact names) | ❌ explicit `A >> B` |
| **Lazy streaming** | ✅ lockstep tee | ❌ DataFrame-centric | ❌ task-based |
| **Lazy streaming** | ✅ lockstep + bounded handoff | ❌ DataFrame-centric | ❌ task-based |
| **Smart binding** | ✅ singular/plural/suffix | ❌ | ❌ |
| **Scope** | In-process micro | Feature engineering | Cluster orchestration |
| **DAG export** | ✅ JSON | ✅ | ✅ |
Expand All @@ -100,7 +104,7 @@ Start here: **[humansoftware.github.io/synaflow](https://humansoftware.github.io
| Section | Description |
|---|---|
| [Tutorial](https://humansoftware.github.io/synaflow/tutorial/hello-world/) | 5-level step-by-step guide building a pipeline from scratch |
| [Core Concepts](https://humansoftware.github.io/synaflow/core-concepts/how-dag-is-wired/) | How the DAG is wired, lockstep flow, build vs run, event-based processing |
| [Core Concepts](https://humansoftware.github.io/synaflow/core-concepts/how-dag-is-wired/) | How the DAG is wired, lockstep flow, max in flight, build vs run, event-based processing |
| [Examples](https://humansoftware.github.io/synaflow/core-concepts/examples/) | Every corpus pipeline with auto-generated diagrams and source code |
| [Comparisons](https://humansoftware.github.io/synaflow/comparisons/hamilton/) | Detailed comparisons with Hamilton, Java Streams, and LINQ |
| [Design Philosophy](docs/DESIGN_PHILOSOPHY.md) | Architectural decisions, contracts, and design rationale |
Expand Down
7 changes: 7 additions & 0 deletions boilerplates/minimal/{{cookiecutter.project_name}}/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ Edit `pipeline.py`:
2. Replace `producer`, `transformer`, `consumer` with your own functions
3. Add or remove `step()` calls in the `pipeline()` definition

## Next concepts

If your pipeline is I/O-bound and one step starts work while the next step
waits for the result, read the `max_in_flight` docs:

- https://humansoftware.github.io/synaflow/core-concepts/max-in-flight/

For more complex projects, see the `structured` template:
```bash
uvx cookiecutter gh:humansoftware/synaflow --directory=boilerplates/structured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ src/{{ cookiecutter.package_name }}/
2. Wire them in `pipeline.py`
3. Run from `main.py`

## Next concepts

If your pipeline needs a bounded ahead window between two streaming stages,
especially for I/O-bound work, read the `max_in_flight` docs:

- https://humansoftware.github.io/synaflow/core-concepts/max-in-flight/

Need a simpler single-file project? Use the `minimal` template:
```bash
uvx cookiecutter gh:humansoftware/synaflow --directory=boilerplates/minimal
Expand Down
24 changes: 15 additions & 9 deletions docs/DESIGN_PHILOSOPHY.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ The materializer can persist the shuffle phase to disk when datasets are too lar
| | SynaFlow | Hamilton | Dagster | Prefect | Airflow |
|---|---|---|---|---|---|
| **Type-hint wiring** | ✅ auto | ✅ | ❌ | ❌ | ❌ |
| **Lazy streaming** | ✅ lockstep tee | ❌ DataFrame-centric | ❌ task-based | ❌ task-based | ❌ task-based |
| **Lazy streaming** | ✅ lockstep + bounded handoff | ❌ DataFrame-centric | ❌ task-based | ❌ task-based | ❌ task-based |
| **Smart binding** | ✅ singular/plural/suffix | ❌ | ❌ | ❌ | ❌ |
| **Scope** | In-process micro | Feature engineering | Asset orchestration | Workflow orchestration | DAG scheduling |
| **DAG export** | ✅ JSON | ✅ | ✅ | ✅ | ✅ |
Expand Down Expand Up @@ -111,7 +111,7 @@ serve fundamentally different data models.
|---|---|---|
| **Default flow** | Lazy streaming (`Iterator[T]`) | DataFrame columns (materialized) |
| **Memory** | One item per step — generators | Entire column in memory |
| **Multiple consumers** | Auto `tee` in lockstep | Single consumer per column |
| **Multiple consumers** | Auto `tee` in lockstep, bounded handoff when configured | Single consumer per column |
| **Materialization** | Consumer-driven: ask for `list[T]` → materialize | Always materialized |
| **Generators** | Native: `yield` in any step | Not supported |
| **Streaming to disk** | Transparent via materializer factories | Manual code in each function |
Expand All @@ -130,7 +130,8 @@ Type hints are used for validation, not DAG construction. **Dask delayed** build
graphs lazily but requires explicit task declarations — no auto-wiring from signatures.

None of these support smart binding (singular/plural synonyms), lazy lockstep
streaming with automatic `tee`, or consumer-driven per-branch materialization.
streaming with automatic `tee`, bounded `max_in_flight` handoff, or
consumer-driven per-branch materialization.

## 3. Architectural Decisions and Patterns (Decision Log)

Expand Down Expand Up @@ -207,23 +208,28 @@ The default materializer factory maps producer-consumer type pairs to appropriat
| `T` | `Iterable[T]` | — | **validation error** |
| `CustomType` | any | — | **validation error** (requires custom factory) |

**Invocation rules:** The materializer is invoked when (a) the consumer demands a materialized protocol (`list`/`set`/`tuple`/`dict`), OR (b) the producer has `on_error=STOP`, OR (c) the step has `force_materialize=True`. For fan-out scenarios, `tee` splits the stream before materialization, so lazy consumers receive the original stream while materialized consumers receive the materialized copy.
**Invocation rules:** The materializer is invoked when (a) the consumer demands a materialized protocol (`list`/`set`/`tuple`/`dict`), OR (b) the producer has `on_error=STOP`, OR (c) the step has `force_materialize=True`. For fan-out scenarios, materialization stays branch-local: lazy consumers keep progressive delivery while materialized consumers receive a collected branch copy.

For uneven multi-stream each-mode, exhaustion is modeled with `None` padding rather than silent truncation. This is part of the execution contract and must behave identically in sync and async runners.

### 3.10. `force_materialize` Flag
### 3.10. `max_in_flight`
**Decision:** Every compiled `DagNode` carries `max_in_flight`, defaulting to `1`, and runners enforce it from DAG metadata rather than step definitions.
**Reason:** Bounded handoff is part of runtime semantics, not user-code convenience. The contract is "maximum number of items already emitted by a step and not yet delivered to the next consumption stage." Keeping it in the DAG preserves build/run separation, JSON export fidelity, and sync/async parity.
**Operational motivation:** This is primarily for I/O-bound pipelines where one step starts work and the next resolves it, such as request submission followed by response awaiting. It gives the runtime a small, explicit ahead window without changing the programming model into manual queues or semaphores.

### 3.11. `force_materialize` Flag
**Decision:** Steps can explicitly declare `force_materialize=True` to trigger materialization of their output regardless of consumer types or error handling configuration.
**Reason:** Some use cases require materialization as a side effect (e.g., persisting intermediate results for debugging, caching expensive computations, or ensuring data is written to an audit log at a specific pipeline stage). This is orthogonal to `on_error=STOP`.

### 3.11. No Silent Type Wrapping
### 3.12. No Silent Type Wrapping
**Decision:** The framework never silently coerces scalar values into iterables. If a producer outputs `str` and a consumer expects `Iterator[str]`, a `ValidationError` is raised at build time. Users must explicitly declare `Iterator[str]` as the output type and `yield` the single item.
**Reason:** Implicit wrapping hides design errors and breaks the type contract. Explicit yield makes the data flow visible and predictable.

### 3.12. Inline Executors (Single-File Runtime)
### 3.13. Inline Executors (Single-File Runtime)
**Decision:** The sync and async execution engines each live in a single file (`executor.py`). The previous sub-components (`SyncStreamManager`, `SyncNodeRunner`, `SyncDependencyResolver`, and their async counterparts) were stateless classes that existed only as namespaces. They were replaced by plain functions and inlined into the executor.
**Reason:** Simpler dependency graph, no fake "classes" without state, easier to understand the full execution flow in one file.

### 3.13. Observable Execution (`step_output_observers`)
### 3.14. Observable Execution (`step_output_observers`)
**Decision:** The executor accepts an optional list of observer callbacks via `step_output_observers`. Each observer is called with `(step_name, output)` every time a step produces an output. For stream outputs, the sync executor tees the stream so the observer receives an independent copy; the async executor creates a dedicated pump task.
**Reason:** Enables test infrastructure (capturing step outputs for spec compliance tests) without modifying production logic. Follows the Observer pattern — the executor doesn't know what observers do, only that they exist.

Expand All @@ -232,7 +238,7 @@ For uneven multi-stream each-mode, exhaustion is modeled with `None` padding rat
- when a stream fails under `OnError.CONTINUE`, observers see the valid prefix that was already produced
- observer behavior is a public contract and is covered by corpus/spec tests, not only unit tests

### 3.14. PipelineStopException with Context
### 3.15. PipelineStopException with Context
**Decision:** `PipelineStopException` carries `step_name` and `cause` (the original exception). It uses `raise ... from` to preserve the full stack trace.
**Reason:** When a pipeline stops, callers need to know which step caused the stop and why. An empty exception is useless for debugging.

Expand Down
1 change: 1 addition & 0 deletions docs/ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This roadmap outlines the planned features and architectural evolutions for the
- **Explicit step mode** — `StepMode.AUTO | EACH | ALL` with build-time validation and DAG-resolved semantics
- **No silent wrapping** — scalar producer cannot feed iterable consumer
- **Executor rewrite** — single-file sync and async executors; `step_output_observers` for test injection; composite key fan-out (no TeeWrapper); zip multi-stream unroll
- **Bounded handoff (`max_in_flight`)** — every compiled `DagNode` carries `max_in_flight` (default `1`); sync and async runners enforce bounded producer-to-consumer handoff from DAG metadata; documented with real sync/async I/O-bound examples and covered by runtime and corpus tests
- **Sync/async parity fixes** — async iteration failure handling, branch-aware materialization context, uneven unroll termination, and preserved valid prefixes under `OnError.CONTINUE`
- **PipelineStopException** — carries `step_name` + `cause` + `raise ... from`
- **Observer/runtime contract coverage** — mixed fan-out, partial stream failure, and explicit-mode corpus specs
Expand Down
6 changes: 3 additions & 3 deletions docs/user_docs/comparisons/hamilton.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ the data model underneath is fundamentally different.
|---|---|---|
| **Default flow** | Lazy streaming (`Iterator[T]`) | DataFrame columns (materialized) |
| **Memory** | One item per step — generators | Entire column in memory |
| **Multiple consumers** | Auto `tee` in lockstep | Single consumer per column |
| **Multiple consumers** | Auto `tee` in lockstep, bounded handoff when configured | Single consumer per column |
| **Materialization** | Consumer-driven: ask for `list[T]` → materialize | Always materialized |
| **Generators** | Native: `yield` in any step | Not supported at user level |
| **Streaming to disk** | Transparent via materializer factories | Manual code in each function |
Expand Down Expand Up @@ -102,11 +102,11 @@ the data model underneath is fundamentally different.

| Use case | SynaFlow | Hamilton |
|---|---|---|
| **Streaming millions of rows** | ✅ lockstep tee, one item at a time | ❌ full DataFrame in memory |
| **Streaming millions of rows** | ✅ lockstep + bounded handoff, one item at a time | ❌ full DataFrame in memory |
| **Feature engineering** | Possible but not specialized | ✅ purpose-built |
| **Notebook to production** | ✅ plain Python functions | ✅ `@parameterize` decorators |
| **Event-based processing** | ✅ lazy by default, idempotent | ❌ batch-oriented |
| **Multiple consumers, one producer** | ✅ auto `tee` | ❌ single consumer per column |
| **Multiple consumers, one producer** | ✅ auto `tee` + `max_in_flight` window | ❌ single consumer per column |
| **Persistence to disk/S3/DB** | ✅ materializer factories | ❌ manual code |
| **Sync + async from same definition** | ✅ identical DAG | ❌ sync only |
| **Export to Airflow/Prefect** | ✅ DAG JSON contract | ✅ via Hamilton UI |
Expand Down
4 changes: 4 additions & 0 deletions docs/user_docs/comparisons/java-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,5 +207,9 @@ incremental processing that have no equivalent in the Java Streams API.
- You need **persistence** (disk, database, cloud storage).
- You need **lazy lockstep** consumption across multiple consumers without
manual `tee` management.
- You need a **bounded ahead window** (like `max_in_flight`) for I/O-bound
patterns. In Java, this pattern requires moving to Project Reactor, RxJava,
or JDK 9+ `java.util.concurrent.Flow` to use reactive backpressure / `prefetch`
buffers. Standard Java Streams are synchronous and lockstep-only.
- You need **sync/async parity** — the same pipeline definition runs in both.
- Your data doesn't fit in memory but you still want the Streams-like API.
15 changes: 14 additions & 1 deletion docs/user_docs/comparisons/linq.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ SynaFlow, it chains transformations over data — `Select`, `Where`, `GroupBy`,
| **Parallelism** | `.AsParallel()` / PLINQ | Sync/async parity, custom runners |
| **Persistence** | In-memory only | Disk, S3, Redis, DB via materializers |
| **Smart binding** | ❌ | ✅ singular/plural/suffix resolution |
| **Multi-consumer** | Single pipeline, single consumer | Auto `tee` for multiple consumers in lockstep |
| **Multi-consumer** | Single pipeline, single consumer | Auto `tee` for multiple consumers in lockstep, with bounded `max_in_flight` when needed |
| **Where it runs** | .NET CLR | Single Python process (or export to Airflow/Prefect) |

## Deferred execution
Expand Down Expand Up @@ -121,8 +121,21 @@ language.

## When SynaFlow goes further

### Persistence

LINQ streams are in-memory by design. SynaFlow's materializers let
`ToList()` / `ToDictionary()` target **disk, S3, Redis, or any backend**
without changing the consumer code. See
[Java Streams comparison](java-streams.md) for more on the
protocol-over-concrete-type design.

### Bounded streams (vs. standard LINQ / TPL)

Standard LINQ (`IEnumerable`) executes purely pull-based in lockstep on a single thread. It has no concept of a "bounded ahead" buffer between streaming steps.

In C#, when you need to let a producer task run ahead of a consumer task up to a specific limit (e.g. for I/O-bound operations), you typically transition to:
* **`System.Threading.Channels`** (using a bounded channel `Channel.CreateBounded<T>(N)`).
* **TPL Dataflow blocks** (using `BoundedCapacity` on blocks like `TransformBlock`).

SynaFlow embeds this capability directly into your DAG definition via `max_in_flight=N` on the step without requiring you to write channel or queue plumbing.

6 changes: 4 additions & 2 deletions docs/user_docs/core-concepts/build-vs-run.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ flowchart LR
| Phase | What happens | When | Output |
|---|---|---|---|
| **Build-time** | Type validation, mode resolution, materializer assignment, circular dependency check, sync/async consistency | `pipeline(...)` is called | `Dag` object, serializable JSON |
| **Run-time** | Topological execution, lockstep streaming, `tee` forking, observer dispatch, error handling | `run()` / `async_run()` is called | Step outputs, side effects |
| **Run-time** | Topological execution, lockstep streaming, bounded handoff via `max_in_flight`, `tee` forking, observer dispatch, error handling | `run()` / `async_run()` is called | Step outputs, side effects |

## Why this matters

Expand All @@ -52,6 +52,7 @@ print(p.to_dict())
"fn": "producer",
"mode": "all",
"on_error": "continue",
"max_in_flight": 1,
"materializer": "memory_materializer",
"materialized_deps": [],
"each_mode_deps": []
Expand All @@ -60,7 +61,8 @@ print(p.to_dict())
}
```

All semantic decisions — mode, `each_mode_deps`, `materialized_deps` — are
All semantic decisions — mode, `max_in_flight`, `each_mode_deps`,
`materialized_deps` — are
resolved at build time and frozen in the JSON. Runners don't re-infer
semantics; they execute the contract.

Expand Down
4 changes: 3 additions & 1 deletion docs/user_docs/core-concepts/how-dag-is-wired.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,6 @@ SynaFlow builds the entire DAG without a single line of manual wiring.
## Next

Now that you understand the wiring, see how [Lockstep Data Flow](lockstep-flow.md)
uses this structure to achieve extreme memory efficiency.
uses this structure to achieve extreme memory efficiency, and how
[Max In Flight](max-in-flight.md) adds a bounded ahead window for I/O-bound
streaming patterns.
5 changes: 5 additions & 0 deletions docs/user_docs/core-concepts/lockstep-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ SynaFlow's streaming engine guarantees **extreme memory efficiency** by processi
pipelines in lockstep — one item flows entirely through the DAG before the next
item is produced.

This is the default behavior because every step starts with
`max_in_flight=1`. If you need a bounded window between two stages, see
[Max In Flight](max-in-flight.md), which includes real sync and async HTTP
examples for I/O-bound pipelines.

## A Streaming Pipeline

=== "Sync"
Expand Down
Loading
Loading