Skip to content

feat(orchestration): pipeline combinator (Workflow Phase 3)#53

Merged
ZhiXiao-Lin merged 1 commit into
mainfrom
feat/orchestration-pipeline
May 29, 2026
Merged

feat(orchestration): pipeline combinator (Workflow Phase 3)#53
ZhiXiao-Lin merged 1 commit into
mainfrom
feat/orchestration-pipeline

Conversation

@ZhiXiao-Lin
Copy link
Copy Markdown
Contributor

Phase 3 of the Workflow integration (builds on #51, #52). Adds execute_pipeline — the scheduling shape parallel() can't express.

What

  • execute_pipeline: each item flows through a chain of stages independently, with no barrier between stages — item A can be in stage 3 while item B is still in stage 1 (wall-clock = slowest single chain, not sum-of-slowest-per-stage).
  • PipelineStage<I> = Fn(Option<&StepOutcome>, &I) -> Option<AgentStepSpec>: pure spec-builders that can branch on the prior stage's outcome (e.g. "verify the finding the review stage produced"). The executor runs them.
  • Chains run concurrently bounded by the executor's concurrency_hint; a chain stops early on a None stage or a failed step; a panicking stage isolates to its own chain (→ None). Reuses run_ordered_parallel_with_limit; built entirely on the Phase-1 AgentExecutor seam.

Deferred (Rule 6 — no consumer yet): phase-grouping events (would churn the persisted AgentEvent schema + SDK mapping for no current consumer) and a budget-aware loop_until. Both land when the SDK/UI (Phase 5) needs them.

Verification

  • 4 mock-based pipeline tests: chaining + prior-output visibility, stop-on-failure/None, no-barrier concurrency bound, panic isolation.
  • Real-LLM #[ignore] two-stage chain (stage 2's prompt derived from stage 1's live output) passing against .a3s/config.acl.
  • cargo fmt --all --check + cargo clippy --lib --bins -D warnings clean.

Adds execute_pipeline — the one scheduling shape parallel() can't express:
each item flows through a chain of stages independently, with NO barrier
between stages, so item A can be in stage 3 while item B is still in stage 1
(wall-clock = slowest single chain, not sum-of-slowest-per-stage).

- PipelineStage<I> = Fn(Option<&StepOutcome>, &I) -> Option<AgentStepSpec>:
  pure spec-builders that can branch on the prior stage's outcome (e.g.
  "verify the finding the review stage produced"). The executor runs them.
- Chains run concurrently bounded by the executor's concurrency_hint; a chain
  stops early on a None stage or a failed step; a panicking stage isolates to
  its own chain (-> None) without dropping the others. Reuses
  run_ordered_parallel_with_limit; built entirely on the AgentExecutor seam.

Deferred (Rule 6 — no consumer yet): phase-grouping events (would churn the
persisted AgentEvent schema + SDK mapping) and a budget-aware loop_until;
both land when the SDK/UI (Phase 5) needs them.

Tests: 4 mock-based pipeline tests (chaining + prior-output visibility, stop
on failure/None, no-barrier concurrency bound, panic isolation) and a real-LLM
\#[ignore] two-stage chain passing against .a3s/config.acl. fmt + clippy
--lib --bins clean.
@ZhiXiao-Lin ZhiXiao-Lin merged commit e786d8e into main May 29, 2026
1 check passed
@ZhiXiao-Lin ZhiXiao-Lin deleted the feat/orchestration-pipeline branch May 29, 2026 08:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants