Skip to content

feat: max_in_flight — bounded stream handoff#33

Merged
mvallebr merged 17 commits into
mainfrom
feat/max-in-flight-clean
Jun 18, 2026
Merged

feat: max_in_flight — bounded stream handoff#33
mvallebr merged 17 commits into
mainfrom
feat/max-in-flight-clean

Conversation

@mvallebr

@mvallebr mvallebr commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Summary

Adds max_in_flight — controls how many items a producing stream can emit ahead of the downstream consumer. Default 1 preserves exact current lockstep behavior.

API

step("name", fn=process, max_in_flight=30)

Implementation

DAG model:

  • Step.max_in_flight and DagNode.max_in_flight default to 1
  • Serialized in DAG JSON (always present)
  • Validated at build time: must be int >= 1
  • Runner consults compiled DAG, never Step directly
  • Sub-pipeline expansion preserves configured max_in_flight

Sync runner:

  • New BoundedIterator (deque-backed) wraps progressive stream outputs when max_in_flight > 1
  • Fan-out: BoundedIterator applied BEFORE itertools.tee — bounds producer at source

Async runner:

  • Publish queues sized by max(100, node.max_in_flight) — backward compat at default
  • Unroll queues use producer node's max_in_flight

Tests: 20 new (10 build-time + 5 sync + 5 async).

  • Explicit build-time check for invalid types (e.g. boolean True rejected).
  • Explicit runner contract test to guarantee that both sync and async executors use compiled DagNode.max_in_flight rather than direct Step values.
  • Build-time validation for generated adapter steps (max_in_flight=1).

Known v1 limitations:

  • Async: queue minimum of 100 needed to prevent pump deadlock with sequential put across multiple consumer queues. max_in_flight > 100 honored exactly.
  • Sync fan-out: itertools.tee internal buffer not bounded per-branch. Source bounded, but slow branches can accumulate tee backlog.
  • Full bounded handoff per-branch requires deeper executor redesign (separate pump tasks per consumer queue, tee replacement).

Corpus: All snapshots (including the new max_in_flight_threadpool.py) updated and fully validated.

445 tests passing.

- Add max_in_flight: int = 1 to Step and DagNode
- Validate max_in_flight >= 1 and integer at build time
- Serialize max_in_flight in DAG JSON (always present)
- Sync: BoundedIterator deque-backed wrapper for max_in_flight > 1
  (max_in_flight=1 preserves exact current behavior)
- Async: use max(max_in_flight, 100) for queue sizing
- Update all 18 corpus snapshots with max_in_flight: 1
@mvallebr mvallebr force-pushed the feat/max-in-flight-clean branch from 35b37ac to 944c416 Compare June 17, 2026 06:01
- Add test_dag_builder_max_in_flight.py (8 build-time validation tests)
- Add test_runner_max_in_flight.py for sync and async (4 tests each)
- Fix BoundedIterator: propagate exceptions immediately, don't buffer
- Fix test parity by using identical test function names in sync and async
@mvallebr mvallebr force-pushed the feat/max-in-flight-clean branch from 944c416 to 27c0fdd Compare June 17, 2026 06:01
mvallebr added 2 commits June 17, 2026 07:28
- Add 10 unit tests for BoundedIterator edge cases
- Fix exception handling: buffer items before raising, only raise
  pending exception when buffer is empty
- maxsize validation, empty source, partial iteration covered
…zing

- Sync fan-out: apply BoundedIterator to source BEFORE itertools.tee
  so producer advancement is bounded at the source level
- Async queue sizing: use node.max_in_flight directly when > 1,
  keep 100 for default=1 (backward compatible)
- Add tests: sync fan-out bounded, async bounded ahead verification,
  parity between sync and async test names
@mvallebr mvallebr force-pushed the feat/max-in-flight-clean branch from f1ff2d2 to 5d8059d Compare June 17, 2026 21:24
mvallebr added 13 commits June 17, 2026 22:38
- Async _unroll_step: look up producer node's max_in_flight instead
  of using consumer node's (fixes reviewer issue #1)
- Keep 100 minimum queue size for publish/unroll to prevent pump
  deadlock (v1 limitation — reviewer issue #2 acknowledged)
- Sync tee fan-out limitation documented as v1 constraint
Codex changes:
- AsyncQueueBranch: implement __aiter__/__anext__ for direct async iteration
- Async executor: _attach_argument_cleanup, _close_stream_arguments
- Async executor: queue sizing uses max(2, max_in_flight+1) for EOF
- Async executor: _resolve_queue handles AsyncQueueBranch type
- Sync executor: observer threads via SyncFanout branches
- Sync executor: _observer_threads with cleanup on pipeline finish
- Sync executor: _notify_observers in publish stream paths
- 12 new tests: ahead_distance_bounded, producer_blocks,
  terminal_lazy_drains, flattening_stream_internal_items (sync+async)
- 3 new observer tests: does_not_force_eager, does_not_consume_slots,
  bound_is_unchanged (sync+async)
@mvallebr mvallebr merged commit 4879c70 into main Jun 18, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant