From a7c7dfa069a7dcfd6dc578c8b6d71ce758d1ab3c Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 16:00:03 +0800 Subject: [PATCH] feat(orchestration): AgentExecutor seam + parallel combinator (Phase 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces a new `orchestration` module: the AgentExecutor trait — the seam between the framework's orchestration grammar and the host's placement/transport/scheduling (书安OS). The framework owns the grammar + serializable data contracts (AgentStepSpec / StepOutcome) and ships a local default; a host implements AgentExecutor to place steps on remote nodes. concurrency_hint() is a *hint*, not a hard local bound — that is what lets orchestration scale past one process. - `AgentExecutor` trait + `AgentStepSpec`/`StepOutcome` (serializable for transport + future workflow checkpoints). - `execute_steps_parallel` — the barrier (parallel) primitive over the seam, reusing run_ordered_parallel_with_limit (order-preserving, per-branch panic isolation). Later combinators (pipeline, phases) build on this. - `impl AgentExecutor for TaskExecutor` — the in-box local executor (runs each step as a child AgentLoop on this node's tokio runtime). - TaskExecutor::execute_parallel now routes through the seam, so the model- driven fan-out and the programmable grammar share one execution path (load-bearing, not vestigial). Additive + zero-regression: model-driven delegation behavior is unchanged. Tests: 4 orchestration unit tests (mock executor — fan-out, order, concurrency hint, failure+panic isolation), all 59 task tests green, and the real-LLM parallel-delegation integration tests pass against .a3s/config.acl. --- core/src/lib.rs | 2 + core/src/orchestration/executor.rs | 292 +++++++++++++++++++++++++++++ core/src/orchestration/mod.rs | 26 +++ core/src/tools/task.rs | 133 ++++++++----- 4 files changed, 404 insertions(+), 49 deletions(-) create mode 100644 core/src/orchestration/executor.rs create mode 100644 core/src/orchestration/mod.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index 090078f..f9a0127 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -94,6 +94,7 @@ pub mod llm; pub mod loop_checkpoint; pub mod mcp; pub mod memory; +pub mod orchestration; pub(crate) mod ordered_parallel; pub mod permissions; pub mod planning; @@ -134,6 +135,7 @@ pub use llm::{ ContentBlock, HttpMetricsCallback, HttpMetricsRecord, ImageSource, LlmClient, LlmResponse, Message, OpenAiClient, TokenUsage, }; +pub use orchestration::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome}; pub use prompts::{AgentStyle, DetectionConfidence, PlanningMode, SystemPromptSlots}; pub use run::{ ActiveToolSnapshot, InMemoryRunStore, RunEventRecord, RunHandle, RunRecord, RunSnapshot, diff --git a/core/src/orchestration/executor.rs b/core/src/orchestration/executor.rs new file mode 100644 index 0000000..a2cde09 --- /dev/null +++ b/core/src/orchestration/executor.rs @@ -0,0 +1,292 @@ +//! The `AgentExecutor` seam and its barrier (`parallel`) primitive. + +use crate::agent::{AgentEvent, DEFAULT_MAX_PARALLEL_TASKS}; +use crate::ordered_parallel::run_ordered_parallel_with_limit; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::broadcast; + +/// A single unit of orchestrated agent work — *what* to run, independent of +/// *where* it runs. +/// +/// Serializable on purpose: a host (书安OS) may ship it to another node, and +/// a future workflow checkpoint persists it. The orchestration layer assigns +/// `task_id`; everything else mirrors a delegated task. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AgentStepSpec { + /// Stable id for this step. Flows into lifecycle events (and, later, + /// workflow checkpoints) so a step can be correlated and resumed. + pub task_id: String, + /// Registry key of the agent to run (e.g. `"explore"`, `"review"`). + pub agent: String, + /// Short human label for display/tracking. + pub description: String, + /// The instruction handed to the child agent. + pub prompt: String, + /// Optional per-step tool-round cap. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub max_steps: Option, + /// Parent session id, for lifecycle-event correlation. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub parent_session_id: Option, +} + +impl AgentStepSpec { + /// A step that runs `agent` with `prompt`, identified by `task_id`. + pub fn new( + task_id: impl Into, + agent: impl Into, + description: impl Into, + prompt: impl Into, + ) -> Self { + Self { + task_id: task_id.into(), + agent: agent.into(), + description: description.into(), + prompt: prompt.into(), + max_steps: None, + parent_session_id: None, + } + } + + pub fn with_max_steps(mut self, max_steps: usize) -> Self { + self.max_steps = Some(max_steps); + self + } + + pub fn with_parent_session_id(mut self, parent_session_id: impl Into) -> Self { + self.parent_session_id = Some(parent_session_id.into()); + self + } +} + +/// The result of running one [`AgentStepSpec`] to completion. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct StepOutcome { + pub task_id: String, + pub session_id: String, + pub agent: String, + pub output: String, + pub success: bool, +} + +impl StepOutcome { + /// A failed outcome for a step that could not start (e.g. unknown agent) + /// or whose fan-out branch panicked. `session_id` mirrors the id the + /// local executor would have derived, so failed steps remain addressable. + pub fn failed( + task_id: impl Into, + agent: impl Into, + message: impl Into, + ) -> Self { + let task_id = task_id.into(); + let session_id = format!("task-run-{task_id}"); + Self { + task_id, + session_id, + agent: agent.into(), + output: message.into(), + success: false, + } + } +} + +/// Runs agent steps — the seam between the framework's orchestration grammar +/// and the host's placement / transport / scheduling. +/// +/// The in-box [`TaskExecutor`](crate::tools::TaskExecutor) runs every step +/// locally (in-process, tokio). A host such as 书安OS implements this trait to +/// place steps on remote nodes; the orchestration combinators are written +/// purely against the trait and never observe where a step actually ran. The +/// framework deliberately does **not** own placement, transport, or +/// cross-node scheduling — those are the host's. +#[async_trait] +pub trait AgentExecutor: Send + Sync { + /// Run one step to completion. + /// + /// Failures are reported as `StepOutcome { success: false, .. }` rather + /// than a hard error, so a fan-out can continue when one branch fails. + /// `event_tx`, when present, receives the step's lifecycle/progress + /// [`AgentEvent`]s. + async fn execute_step( + &self, + spec: AgentStepSpec, + event_tx: Option>, + ) -> StepOutcome; + + /// Advisory ceiling on how many steps the orchestration layer should run + /// concurrently. The local default returns its `max_parallel_tasks`; a + /// scheduler-backed host may return its cluster-wide target. It is a + /// *hint*, not a hard local bound — that is what lets orchestration scale + /// past a single process. + fn concurrency_hint(&self) -> usize { + DEFAULT_MAX_PARALLEL_TASKS + } +} + +/// Fan `specs` out across the executor, bounded by its +/// [`concurrency_hint`](AgentExecutor::concurrency_hint), preserving input +/// order. A panicked branch becomes a failed [`StepOutcome`] without dropping +/// the others. +/// +/// This is the barrier (`parallel`) primitive — it awaits every step. Later +/// combinators (pipeline, phases) build on the same seam. +pub async fn execute_steps_parallel( + executor: Arc, + specs: Vec, + event_tx: Option>, +) -> Vec { + let limit = executor.concurrency_hint(); + // Keep (task_id, agent) by index so a panicked branch still yields a + // correctly-labelled failed outcome (mirrors TaskExecutor's fallback). + let labels: Vec<(String, String)> = specs + .iter() + .map(|s| (s.task_id.clone(), s.agent.clone())) + .collect(); + + let results = run_ordered_parallel_with_limit(specs, limit, move |_idx, spec| { + let executor = Arc::clone(&executor); + let event_tx = event_tx.clone(); + async move { executor.execute_step(spec, event_tx).await } + }) + .await; + + results + .into_iter() + .map(|result| match result.output { + Ok(outcome) => outcome, + Err(error) => { + let (task_id, agent) = labels + .get(result.index) + .cloned() + .unwrap_or_else(|| ("unknown".to_string(), "unknown".to_string())); + StepOutcome::failed(task_id, agent, error.to_string()) + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + + /// Executor with no LLM — records peak concurrency and synthesizes + /// outcomes from the spec, so the combinator can be tested in isolation. + struct MockExecutor { + hint: usize, + active: Arc, + max_active: Arc, + } + + impl MockExecutor { + fn new(hint: usize) -> Self { + Self { + hint, + active: Arc::new(AtomicUsize::new(0)), + max_active: Arc::new(AtomicUsize::new(0)), + } + } + } + + #[async_trait] + impl AgentExecutor for MockExecutor { + async fn execute_step( + &self, + spec: AgentStepSpec, + _event_tx: Option>, + ) -> StepOutcome { + let now = self.active.fetch_add(1, Ordering::SeqCst) + 1; + self.max_active.fetch_max(now, Ordering::SeqCst); + tokio::time::sleep(Duration::from_millis(20)).await; + self.active.fetch_sub(1, Ordering::SeqCst); + + // `boom` panics (exercise branch-panic isolation); `fail` + // returns an unsuccessful outcome. + assert!(spec.agent != "boom", "boom"); + StepOutcome { + task_id: spec.task_id.clone(), + session_id: format!("task-run-{}", spec.task_id), + agent: spec.agent.clone(), + output: format!("ran: {}", spec.prompt), + success: spec.agent != "fail", + } + } + fn concurrency_hint(&self) -> usize { + self.hint + } + } + + fn spec(id: &str, agent: &str) -> AgentStepSpec { + AgentStepSpec::new(id, agent, "d", format!("prompt-{id}")) + } + + #[tokio::test] + async fn fans_out_in_input_order() { + let exec: Arc = Arc::new(MockExecutor::new(8)); + let specs = vec![spec("a", "explore"), spec("b", "review"), spec("c", "plan")]; + let out = execute_steps_parallel(exec, specs, None).await; + assert_eq!( + out.iter().map(|o| o.task_id.as_str()).collect::>(), + vec!["a", "b", "c"], + "results preserve input order" + ); + assert!(out.iter().all(|o| o.success)); + assert_eq!(out[0].output, "ran: prompt-a"); + } + + #[tokio::test] + async fn respects_concurrency_hint() { + let mock = MockExecutor::new(2); + let max_active = Arc::clone(&mock.max_active); + let exec: Arc = Arc::new(mock); + let specs = (0..6).map(|i| spec(&i.to_string(), "explore")).collect(); + let _ = execute_steps_parallel(exec, specs, None).await; + assert_eq!( + max_active.load(Ordering::SeqCst), + 2, + "never more than concurrency_hint steps run at once" + ); + } + + #[tokio::test] + async fn isolates_failed_and_panicked_steps() { + let exec: Arc = Arc::new(MockExecutor::new(8)); + let specs = vec![ + spec("ok", "explore"), + spec("bad", "fail"), + spec("crash", "boom"), + spec("ok2", "review"), + ]; + let out = execute_steps_parallel(exec, specs, None).await; + assert_eq!(out.len(), 4, "every step yields a result"); + assert!(out[0].success); + assert!( + !out[1].success, + "explicit failure surfaces as success=false" + ); + assert!( + !out[2].success && out[2].agent == "boom", + "a panicked branch becomes a labelled failed outcome, not a drop" + ); + assert!(out[3].success, "later steps unaffected by an earlier panic"); + } + + #[tokio::test] + async fn default_concurrency_hint_is_the_framework_default() { + struct Bare; + #[async_trait] + impl AgentExecutor for Bare { + async fn execute_step( + &self, + spec: AgentStepSpec, + _tx: Option>, + ) -> StepOutcome { + StepOutcome::failed(spec.task_id, spec.agent, "unused") + } + } + assert_eq!(Bare.concurrency_hint(), DEFAULT_MAX_PARALLEL_TASKS); + } +} diff --git a/core/src/orchestration/mod.rs b/core/src/orchestration/mod.rs new file mode 100644 index 0000000..0fa90a1 --- /dev/null +++ b/core/src/orchestration/mod.rs @@ -0,0 +1,26 @@ +//! Programmable, deterministic multi-agent orchestration. +//! +//! Today an agent fans work out only by *model-driven* delegation (the LLM +//! decides to call the `task` / `parallel_task` tool). This module adds a +//! *programmable* layer: a developer expresses fan-out / pipelines / +//! verification panels as code, so the orchestration is reproducible, +//! testable, budget-bounded, and (later) resumable — independent of what the +//! model chooses to do. +//! +//! ## The framework / host boundary +//! +//! Everything here is written against one seam, [`AgentExecutor`]: "run this +//! step, give me the result." The framework owns the **grammar** (which steps, +//! how they compose, the concurrency *hint*, the data contracts +//! [`AgentStepSpec`] / [`StepOutcome`]) and ships a default executor that runs +//! every step locally. The **placement** of those steps across a cluster — +//! transport, scheduling, where a step actually executes — belongs to the +//! host (书安OS), which implements [`AgentExecutor`]. The combinators never +//! observe where a step ran, so the same orchestration scales from one process +//! to a cluster without change. +//! +//! The in-box implementation is [`crate::tools::TaskExecutor`]. + +mod executor; + +pub use executor::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome}; diff --git a/core/src/tools/task.rs b/core/src/tools/task.rs index bd82e3b..d713115 100644 --- a/core/src/tools/task.rs +++ b/core/src/tools/task.rs @@ -17,6 +17,7 @@ use crate::agent::{AgentConfig, AgentEvent, AgentLoop}; use crate::llm::LlmClient; use crate::mcp::manager::McpManager; +use crate::orchestration::{AgentExecutor, AgentStepSpec, StepOutcome}; use crate::subagent::AgentRegistry; use crate::tools::types::{Tool, ToolContext, ToolOutput}; use anyhow::{Context, Result}; @@ -496,67 +497,101 @@ impl TaskExecutor { /// Execute multiple tasks in parallel. /// /// Spawns all tasks concurrently and waits for all to complete. - /// Returns results in the same order as the input tasks. + /// Returns results in the same order as the input tasks. Routed through + /// the [`AgentExecutor`](crate::orchestration::AgentExecutor) seam so the + /// same fan-out works whether steps run locally (default) or are placed + /// on remote nodes by a host. pub async fn execute_parallel( self: &Arc, tasks: Vec, event_tx: Option>, parent_session_id: Option<&str>, ) -> Vec { - let fallback_agents = tasks - .iter() - .map(|params| params.agent.clone()) - .collect::>(); - let executor = Arc::clone(self); - let parent_session_id = parent_session_id.map(|s| s.to_string()); - let results = crate::ordered_parallel::run_ordered_parallel_with_limit( - tasks, - self.max_parallel_tasks, - move |_idx, params| { - let executor = Arc::clone(&executor); - let tx = event_tx.clone(); - let parent = parent_session_id.clone(); - async move { - match executor - .execute(params.clone(), tx, parent.as_deref()) - .await - { - Ok(result) => result, - Err(e) => TaskResult { - output: format!("Task failed: {}", e), - session_id: String::new(), - agent: params.agent, - success: false, - task_id: format!("task-{}", uuid::Uuid::new_v4()), - }, - } - } - }, - ) - .await; - - results + let parent = parent_session_id.map(|s| s.to_string()); + let specs = tasks .into_iter() - .map(|result| match result.output { - Ok(task_result) => task_result, - Err(error) => { - tracing::error!("Parallel task failed: {}", error); - TaskResult { - output: format!("Task failed: {}", error), - session_id: String::new(), - agent: fallback_agents - .get(result.index) - .cloned() - .unwrap_or_else(|| "unknown".to_string()), - success: false, - task_id: format!("task-{}", uuid::Uuid::new_v4()), - } - } + .map(|params| AgentStepSpec { + task_id: format!("task-{}", uuid::Uuid::new_v4()), + agent: params.agent, + description: params.description, + prompt: params.prompt, + max_steps: params.max_steps, + parent_session_id: parent.clone(), }) + .collect(); + + let executor: Arc = Arc::::clone(self); + crate::orchestration::execute_steps_parallel(executor, specs, event_tx) + .await + .into_iter() + .map(TaskResult::from) .collect() } } +impl From for StepOutcome { + fn from(r: TaskResult) -> Self { + StepOutcome { + task_id: r.task_id, + session_id: r.session_id, + agent: r.agent, + output: r.output, + success: r.success, + } + } +} + +impl From for TaskResult { + fn from(o: StepOutcome) -> Self { + TaskResult { + output: o.output, + session_id: o.session_id, + agent: o.agent, + success: o.success, + task_id: o.task_id, + } + } +} + +/// The local, in-process executor: every step runs as a child `AgentLoop` on +/// this node's tokio runtime. This is the default; a host (书安OS) substitutes +/// its own [`AgentExecutor`] to place steps across a cluster. +#[async_trait] +impl AgentExecutor for TaskExecutor { + async fn execute_step( + &self, + spec: AgentStepSpec, + event_tx: Option>, + ) -> StepOutcome { + let agent = spec.agent.clone(); + let task_id = spec.task_id.clone(); + let params = TaskParams { + agent: spec.agent, + description: spec.description, + prompt: spec.prompt, + background: false, + max_steps: spec.max_steps, + }; + match self + .execute_with_task_id( + task_id.clone(), + params, + event_tx, + spec.parent_session_id.as_deref(), + true, + ) + .await + { + Ok(result) => result.into(), + Err(e) => StepOutcome::failed(task_id, agent, format!("Task failed: {e}")), + } + } + + fn concurrency_hint(&self) -> usize { + self.max_parallel_tasks + } +} + /// Get the JSON schema for TaskParams pub fn task_params_schema() -> serde_json::Value { serde_json::json!({