diff --git a/core/src/agent_api.rs b/core/src/agent_api.rs index e04463f..a200514 100644 --- a/core/src/agent_api.rs +++ b/core/src/agent_api.rs @@ -920,6 +920,35 @@ impl AgentSession { SessionView::from_session(self).id() } + /// An [`AgentExecutor`](crate::orchestration::AgentExecutor) backed by this + /// session — runs each orchestrated step as a child agent on this node, + /// inheriting the session's agent registry, LLM client, workspace, MCP + /// tools, and subagent tracker. + /// + /// This is what the orchestration combinators + /// ([`execute_steps_parallel`](crate::orchestration::execute_steps_parallel), + /// [`execute_pipeline`](crate::orchestration::execute_pipeline), + /// [`execute_steps_parallel_resumable`](crate::orchestration::execute_steps_parallel_resumable)) + /// run against; a host can instead supply its own executor to place steps + /// across a cluster. + pub fn agent_executor(&self) -> Arc { + let executor = crate::tools::TaskExecutor::with_mcp( + Arc::clone(&self.agent_registry), + Arc::clone(&self.llm_client), + self.workspace.display().to_string(), + Arc::clone(&self.mcp_manager), + ) + .with_subagent_tracker(Arc::clone(&self.subagent_tasks)) + .with_max_parallel_tasks(self.config.max_parallel_tasks); + Arc::new(executor) + } + + /// The session's persistence store, if one is configured — needed by the + /// resumable orchestration combinator to journal workflow progress. + pub fn session_store(&self) -> Option> { + self.session_store.clone() + } + /// Return the definitions of all tools currently registered in this session. /// /// The list reflects the live state of the tool executor — tools added via diff --git a/sdk/node/src/lib.rs b/sdk/node/src/lib.rs index e6e70fa..d7b5a85 100644 --- a/sdk/node/src/lib.rs +++ b/sdk/node/src/lib.rs @@ -56,6 +56,10 @@ use a3s_code_core::hooks::{ HookMatcher as RustHookMatcher, HookResponse as RustHookResponse, }; use a3s_code_core::llm::{ContentBlock as RustContentBlock, Message as RustMessage}; +use a3s_code_core::orchestration::{ + execute_steps_parallel, execute_steps_parallel_resumable, AgentStepSpec as RustAgentStepSpec, + StepOutcome as RustStepOutcome, +}; use a3s_code_core::permissions::{ PermissionDecision as RustPermissionDecision, PermissionPolicy as RustPermissionPolicy, PermissionRule as RustPermissionRule, @@ -3012,6 +3016,67 @@ pub struct McpServerStatusEntry { // Session // ============================================================================ +/// One unit of orchestrated agent work — what to run, independent of where. +#[napi(object)] +#[derive(Clone)] +pub struct AgentStepSpecObject { + /// Stable id for this step (assigned by the caller). + pub task_id: String, + /// Registry key of the agent to run (e.g. "explore", "review"). + pub agent: String, + /// Short label for display/tracking. + pub description: String, + /// Instruction handed to the child agent. + pub prompt: String, + /// Optional per-step tool-round cap. + pub max_steps: Option, + /// Optional parent session id for event correlation. + pub parent_session_id: Option, + /// When set, the step must return JSON conforming to this schema; the + /// validated object lands in `StepOutcomeObject.structured`. + pub output_schema: Option, +} + +impl From for RustAgentStepSpec { + fn from(o: AgentStepSpecObject) -> Self { + RustAgentStepSpec { + task_id: o.task_id, + agent: o.agent, + description: o.description, + prompt: o.prompt, + max_steps: o.max_steps.map(|n| n as usize), + parent_session_id: o.parent_session_id, + output_schema: o.output_schema, + } + } +} + +/// The result of running one orchestrated step. +#[napi(object)] +#[derive(Clone)] +pub struct StepOutcomeObject { + pub task_id: String, + pub session_id: String, + pub agent: String, + pub output: String, + pub success: bool, + /// Schema-validated structured output, when the step requested one. + pub structured: Option, +} + +impl From for StepOutcomeObject { + fn from(o: RustStepOutcome) -> Self { + StepOutcomeObject { + task_id: o.task_id, + session_id: o.session_id, + agent: o.agent, + output: o.output, + success: o.success, + structured: o.structured, + } + } +} + /// Workspace-bound session. All LLM and tool operations happen here. #[napi] pub struct Session { @@ -3070,6 +3135,60 @@ impl Session { Ok(AgentResult::from(result)) } + /// Run `specs` as a fan-out of agent steps, bounded by the session's + /// configured parallelism, and resolve with each step's outcome in input + /// order. A failed step surfaces as `success: false` without failing the + /// batch. + #[napi] + pub async fn parallel( + &self, + specs: Vec, + ) -> napi::Result> { + let session = self.inner.clone(); + let rust_specs: Vec = specs.into_iter().map(Into::into).collect(); + let outcomes = get_runtime() + .spawn(async move { + let executor = session.agent_executor(); + execute_steps_parallel(executor, rust_specs, None).await + }) + .await + .map_err(|e| napi::Error::from_reason(format!("Task join error: {e}")))?; + Ok(outcomes.into_iter().map(StepOutcomeObject::from).collect()) + } + + /// Like `parallel`, but resumable: progress is journaled under + /// `workflowId` via the session's `sessionStore`, so an interrupted run + /// skips already-completed steps. Rejects when no `sessionStore` is + /// configured. + #[napi] + pub async fn parallel_resumable( + &self, + specs: Vec, + workflow_id: String, + ) -> napi::Result> { + let session = self.inner.clone(); + let rust_specs: Vec = specs.into_iter().map(Into::into).collect(); + let outcomes = get_runtime() + .spawn(async move { + let Some(store) = session.session_store() else { + return Err("parallelResumable requires a sessionStore on the session"); + }; + let executor = session.agent_executor(); + Ok(execute_steps_parallel_resumable( + executor, + rust_specs, + &workflow_id, + store, + None, + ) + .await) + }) + .await + .map_err(|e| napi::Error::from_reason(format!("Task join error: {e}")))? + .map_err(napi::Error::from_reason)?; + Ok(outcomes.into_iter().map(StepOutcomeObject::from).collect()) + } + /// Send a prompt or request and get a streaming event iterator. /// /// Returns an `EventStream`. Use `for await (const event of stream)` or call `.next()` manually. @@ -5374,6 +5493,39 @@ impl From for RustSearchConfig { mod tests { use super::*; + #[test] + fn orchestration_object_conversions_round_trip_fields() { + let schema = serde_json::json!({ "type": "object" }); + let spec = AgentStepSpecObject { + task_id: "t1".into(), + agent: "explore".into(), + description: "d".into(), + prompt: "p".into(), + max_steps: Some(5), + parent_session_id: Some("parent".into()), + output_schema: Some(schema.clone()), + }; + let rust: RustAgentStepSpec = spec.into(); + assert_eq!(rust.task_id, "t1"); + assert_eq!(rust.agent, "explore"); + assert_eq!(rust.max_steps, Some(5)); + assert_eq!(rust.parent_session_id.as_deref(), Some("parent")); + assert_eq!(rust.output_schema, Some(schema)); + + let outcome = RustStepOutcome { + task_id: "t1".into(), + session_id: "task-run-t1".into(), + agent: "explore".into(), + output: "out".into(), + success: true, + structured: Some(serde_json::json!({ "k": 1 })), + }; + let obj = StepOutcomeObject::from(outcome); + assert_eq!(obj.task_id, "t1"); + assert!(obj.success); + assert_eq!(obj.structured, Some(serde_json::json!({ "k": 1 }))); + } + fn sdk_test_config() -> a3s_code_core::CodeConfig { a3s_code_core::CodeConfig { default_model: Some("openai/gpt-4o".to_string()),