From 7d7130e74b45a2708b47b3307fcfccf157c08105 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 16:25:19 +0800 Subject: [PATCH] feat(orchestration): pipeline combinator (Phase 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds execute_pipeline — the one scheduling shape parallel() can't express: each item flows through a chain of stages independently, with NO barrier between stages, so item A can be in stage 3 while item B is still in stage 1 (wall-clock = slowest single chain, not sum-of-slowest-per-stage). - PipelineStage = Fn(Option<&StepOutcome>, &I) -> Option: pure spec-builders that can branch on the prior stage's outcome (e.g. "verify the finding the review stage produced"). The executor runs them. - Chains run concurrently bounded by the executor's concurrency_hint; a chain stops early on a None stage or a failed step; a panicking stage isolates to its own chain (-> None) without dropping the others. Reuses run_ordered_parallel_with_limit; built entirely on the AgentExecutor seam. Deferred (Rule 6 — no consumer yet): phase-grouping events (would churn the persisted AgentEvent schema + SDK mapping) and a budget-aware loop_until; both land when the SDK/UI (Phase 5) needs them. Tests: 4 mock-based pipeline tests (chaining + prior-output visibility, stop on failure/None, no-barrier concurrency bound, panic isolation) and a real-LLM \#[ignore] two-stage chain passing against .a3s/config.acl. fmt + clippy --lib --bins clean. --- core/src/lib.rs | 5 +- core/src/orchestration/combinators.rs | 240 ++++++++++++++++++++++ core/src/orchestration/mod.rs | 2 + core/tests/test_orchestration_real_llm.rs | 60 +++++- 4 files changed, 305 insertions(+), 2 deletions(-) create mode 100644 core/src/orchestration/combinators.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index f9a0127..1a99c1d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -135,7 +135,10 @@ pub use llm::{ ContentBlock, HttpMetricsCallback, HttpMetricsRecord, ImageSource, LlmClient, LlmResponse, Message, OpenAiClient, TokenUsage, }; -pub use orchestration::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome}; +pub use orchestration::{ + execute_pipeline, execute_steps_parallel, AgentExecutor, AgentStepSpec, PipelineStage, + StepOutcome, +}; pub use prompts::{AgentStyle, DetectionConfidence, PlanningMode, SystemPromptSlots}; pub use run::{ ActiveToolSnapshot, InMemoryRunStore, RunEventRecord, RunHandle, RunRecord, RunSnapshot, diff --git a/core/src/orchestration/combinators.rs b/core/src/orchestration/combinators.rs new file mode 100644 index 0000000..af5a087 --- /dev/null +++ b/core/src/orchestration/combinators.rs @@ -0,0 +1,240 @@ +//! Orchestration combinators built on the [`AgentExecutor`] seam. +//! +//! [`execute_steps_parallel`](super::execute_steps_parallel) (in `executor`) +//! is the barrier (`parallel`) primitive. This module adds `pipeline`: the +//! one genuinely new scheduling shape, where each item flows through a chain +//! of stages independently — no barrier between stages. + +use super::executor::{AgentExecutor, AgentStepSpec, StepOutcome}; +use crate::agent::AgentEvent; +use crate::ordered_parallel::run_ordered_parallel_with_limit; +use std::sync::Arc; +use tokio::sync::broadcast; + +/// A pipeline stage: given the previous stage's outcome (`None` before the +/// first stage) and the original item, produce the next step to run — or +/// `None` to stop this item's chain early. +/// +/// Stages are pure spec-builders; the executor runs them. A stage can branch +/// on the prior result (e.g. "verify the finding the review stage produced"). +pub type PipelineStage = + Arc, &I) -> Option + Send + Sync>; + +/// Run each item through `stages` as an independent chain. +/// +/// All chains run concurrently, bounded by the executor's +/// [`concurrency_hint`](AgentExecutor::concurrency_hint) — there is **no +/// barrier between stages**, so item A can be in stage 3 while item B is still +/// in stage 1. Wall-clock is the slowest single chain, not the +/// sum-of-slowest-per-stage that a barrier `parallel` per stage would incur. +/// +/// A chain stops early when a stage returns `None` or when a step fails +/// (later stages would only build on a failed result). Returns each item's +/// last outcome (`None` if its first stage produced no spec), preserving input +/// order. A stage closure that panics isolates to that one chain (its result +/// becomes `None`) without dropping the others. +pub async fn execute_pipeline( + executor: Arc, + items: Vec, + stages: Vec>, + event_tx: Option>, +) -> Vec> +where + I: Send + 'static, +{ + let limit = executor.concurrency_hint(); + let stages = Arc::new(stages); + + let results = run_ordered_parallel_with_limit(items, limit, move |_idx, item| { + let executor = Arc::clone(&executor); + let stages = Arc::clone(&stages); + let event_tx = event_tx.clone(); + async move { + let mut prev: Option = None; + for stage in stages.iter() { + let Some(spec) = stage(prev.as_ref(), &item) else { + break; + }; + let outcome = executor.execute_step(spec, event_tx.clone()).await; + let succeeded = outcome.success; + prev = Some(outcome); + if !succeeded { + break; + } + } + prev + } + }) + .await; + + // A panicked chain (Err) yields `None`; a normal chain yields its last + // outcome. Order is preserved by `run_ordered_parallel_with_limit`. + results + .into_iter() + .map(|result| result.output.unwrap_or(None)) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + + /// Echoes the prompt into the output; fails for agent `"fail"`; panics for + /// agent `"boom"`. Records peak concurrency. + struct EchoExecutor { + active: Arc, + max_active: Arc, + } + + impl EchoExecutor { + fn new() -> Self { + Self { + active: Arc::new(AtomicUsize::new(0)), + max_active: Arc::new(AtomicUsize::new(0)), + } + } + } + + #[async_trait] + impl AgentExecutor for EchoExecutor { + async fn execute_step( + &self, + spec: AgentStepSpec, + _event_tx: Option>, + ) -> StepOutcome { + let now = self.active.fetch_add(1, Ordering::SeqCst) + 1; + self.max_active.fetch_max(now, Ordering::SeqCst); + tokio::time::sleep(Duration::from_millis(15)).await; + self.active.fetch_sub(1, Ordering::SeqCst); + assert!(spec.agent != "boom", "boom"); + StepOutcome { + task_id: spec.task_id.clone(), + session_id: format!("task-run-{}", spec.task_id), + agent: spec.agent.clone(), + output: spec.prompt.clone(), + success: spec.agent != "fail", + structured: None, + } + } + fn concurrency_hint(&self) -> usize { + 4 + } + } + + fn stage(f: F) -> PipelineStage + where + F: Fn(Option<&StepOutcome>, &I) -> Option + Send + Sync + 'static, + { + Arc::new(f) + } + + #[tokio::test] + async fn each_item_chains_through_stages_and_later_stages_see_prior_output() { + let exec: Arc = Arc::new(EchoExecutor::new()); + // Stage 1: run agent "explore" with the item as the prompt. + // Stage 2: run agent "review" with a prompt derived from stage 1's output. + let stages = vec![ + stage(|_prev: Option<&StepOutcome>, item: &&str| { + Some(AgentStepSpec::new("s1", "explore", "d", *item)) + }), + stage(|prev: Option<&StepOutcome>, _item: &&str| { + let prior = prev.map(|o| o.output.clone()).unwrap_or_default(); + Some(AgentStepSpec::new( + "s2", + "review", + "d", + format!("review of: {prior}"), + )) + }), + ]; + let out = execute_pipeline(exec, vec!["alpha", "beta"], stages, None).await; + + assert_eq!(out.len(), 2, "one result per item, order preserved"); + // Each item's final outcome is stage 2, whose prompt was derived from + // stage 1's output (the item text). + assert_eq!(out[0].as_ref().unwrap().output, "review of: alpha"); + assert_eq!(out[1].as_ref().unwrap().output, "review of: beta"); + assert!(out.iter().all(|o| o.as_ref().unwrap().success)); + } + + #[tokio::test] + async fn chain_stops_on_failure_and_on_none_stage() { + let exec: Arc = Arc::new(EchoExecutor::new()); + // First item: stage 1 fails (agent "fail") → stage 2 must not run. + // Second item: stage 1 ok, stage 2 returns None → chain stops at stage 1. + let stages = vec![ + stage(|_p: Option<&StepOutcome>, item: &&str| { + let agent = if *item == "x" { "fail" } else { "explore" }; + Some(AgentStepSpec::new("s1", agent, "d", *item)) + }), + stage(|_p: Option<&StepOutcome>, item: &&str| { + if *item == "y" { + None // stop the second item's chain at stage 1 + } else { + Some(AgentStepSpec::new("s2", "review", "d", "second")) + } + }), + ]; + let out = execute_pipeline(exec, vec!["x", "y"], stages, None).await; + + let first = out[0].as_ref().unwrap(); + assert!(!first.success, "failed stage 1 surfaces"); + assert_eq!( + first.output, "x", + "stage 2 did not run after stage 1 failed" + ); + + let second = out[1].as_ref().unwrap(); + assert!(second.success); + assert_eq!( + second.output, "y", + "stage 2 returned None → chain stopped at stage 1" + ); + } + + #[tokio::test] + async fn no_barrier_between_stages_bounded_by_hint() { + let echo = EchoExecutor::new(); + let max_active = Arc::clone(&echo.max_active); + let exec: Arc = Arc::new(echo); + let stages = vec![ + stage(|_p: Option<&StepOutcome>, item: &usize| { + Some(AgentStepSpec::new( + format!("s1-{item}"), + "explore", + "d", + "p", + )) + }), + stage(|_p: Option<&StepOutcome>, item: &usize| { + Some(AgentStepSpec::new(format!("s2-{item}"), "review", "d", "p")) + }), + ]; + let items: Vec = (0..8).collect(); + let out = execute_pipeline(exec, items, stages, None).await; + assert_eq!(out.len(), 8); + assert!(out.iter().all(|o| o.is_some())); + // concurrency_hint is 4: chains run concurrently but never exceed it. + assert!( + max_active.load(Ordering::SeqCst) <= 4, + "concurrency never exceeds the executor's hint" + ); + } + + #[tokio::test] + async fn panicking_stage_isolates_to_its_chain() { + let exec: Arc = Arc::new(EchoExecutor::new()); + let stages = vec![stage(|_p: Option<&StepOutcome>, item: &&str| { + // The middle item routes to the panicking agent. + Some(AgentStepSpec::new("s1", *item, "d", "p")) + })]; + let out = execute_pipeline(exec, vec!["explore", "boom", "review"], stages, None).await; + assert_eq!(out.len(), 3); + assert!(out[0].as_ref().unwrap().success); + assert!(out[1].is_none(), "panicked chain becomes None, not a drop"); + assert!(out[2].as_ref().unwrap().success, "later chains unaffected"); + } +} diff --git a/core/src/orchestration/mod.rs b/core/src/orchestration/mod.rs index 0fa90a1..04c778c 100644 --- a/core/src/orchestration/mod.rs +++ b/core/src/orchestration/mod.rs @@ -21,6 +21,8 @@ //! //! The in-box implementation is [`crate::tools::TaskExecutor`]. +mod combinators; mod executor; +pub use combinators::{execute_pipeline, PipelineStage}; pub use executor::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome}; diff --git a/core/tests/test_orchestration_real_llm.rs b/core/tests/test_orchestration_real_llm.rs index c01a048..0d2c8b8 100644 --- a/core/tests/test_orchestration_real_llm.rs +++ b/core/tests/test_orchestration_real_llm.rs @@ -12,7 +12,9 @@ use std::sync::Arc; use a3s_code_core::config::CodeConfig; use a3s_code_core::llm::create_client_with_config; -use a3s_code_core::orchestration::{AgentExecutor, AgentStepSpec}; +use a3s_code_core::orchestration::{ + execute_pipeline, AgentExecutor, AgentStepSpec, PipelineStage, StepOutcome, +}; use a3s_code_core::subagent::AgentRegistry; use a3s_code_core::tools::TaskExecutor; @@ -91,3 +93,59 @@ async fn real_execute_step_with_schema_returns_validated_object() { "object has a boolean `is_systems_language`: {object}" ); } + +/// Phase 3: a two-stage pipeline chains live agents — stage 2's prompt is +/// derived from stage 1's output, with no barrier between them. +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_pipeline_chains_two_agent_stages() { + let (executor, _workspace) = local_executor(); + let exec: Arc = Arc::new(executor); + + let stages: Vec> = vec![ + Arc::new(|_prev: Option<&StepOutcome>, topic: &&str| { + Some( + AgentStepSpec::new( + "real-p1", + "general", + "summarize", + format!("In one sentence, what is {topic}?"), + ) + .with_max_steps(2), + ) + }), + Arc::new(|prev: Option<&StepOutcome>, _topic: &&str| { + let summary = prev.map(|o| o.output.clone()).unwrap_or_default(); + Some( + AgentStepSpec::new( + "real-p2", + "general", + "classify", + format!( + "Reply with exactly one word, YES or NO: does this describe a \ + programming language?\n\nText: {summary}" + ), + ) + .with_max_steps(2), + ) + }), + ]; + + let out = execute_pipeline(exec, vec!["the Rust programming language"], stages, None).await; + + assert_eq!(out.len(), 1); + let final_outcome = out[0].as_ref().expect("the chain produced a final outcome"); + assert!( + final_outcome.success, + "pipeline chain succeeded: {}", + final_outcome.output + ); + assert_eq!( + final_outcome.task_id, "real-p2", + "the returned outcome is the last stage's" + ); + assert!( + !final_outcome.output.trim().is_empty(), + "stage 2 produced output derived from stage 1" + ); +}