Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions core/src/agent_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,35 @@ impl AgentSession {
SessionView::from_session(self).id()
}

/// An [`AgentExecutor`](crate::orchestration::AgentExecutor) backed by this
/// session — runs each orchestrated step as a child agent on this node,
/// inheriting the session's agent registry, LLM client, workspace, MCP
/// tools, and subagent tracker.
///
/// This is what the orchestration combinators
/// ([`execute_steps_parallel`](crate::orchestration::execute_steps_parallel),
/// [`execute_pipeline`](crate::orchestration::execute_pipeline),
/// [`execute_steps_parallel_resumable`](crate::orchestration::execute_steps_parallel_resumable))
/// run against; a host can instead supply its own executor to place steps
/// across a cluster.
pub fn agent_executor(&self) -> Arc<dyn crate::orchestration::AgentExecutor> {
let executor = crate::tools::TaskExecutor::with_mcp(
Arc::clone(&self.agent_registry),
Arc::clone(&self.llm_client),
self.workspace.display().to_string(),
Arc::clone(&self.mcp_manager),
)
.with_subagent_tracker(Arc::clone(&self.subagent_tasks))
.with_max_parallel_tasks(self.config.max_parallel_tasks);
Arc::new(executor)
}

/// The session's persistence store, if one is configured — needed by the
/// resumable orchestration combinator to journal workflow progress.
pub fn session_store(&self) -> Option<Arc<dyn crate::store::SessionStore>> {
self.session_store.clone()
}

/// Return the definitions of all tools currently registered in this session.
///
/// The list reflects the live state of the tool executor — tools added via
Expand Down
152 changes: 152 additions & 0 deletions sdk/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ use a3s_code_core::hooks::{
HookMatcher as RustHookMatcher, HookResponse as RustHookResponse,
};
use a3s_code_core::llm::{ContentBlock as RustContentBlock, Message as RustMessage};
use a3s_code_core::orchestration::{
execute_steps_parallel, execute_steps_parallel_resumable, AgentStepSpec as RustAgentStepSpec,
StepOutcome as RustStepOutcome,
};
use a3s_code_core::permissions::{
PermissionDecision as RustPermissionDecision, PermissionPolicy as RustPermissionPolicy,
PermissionRule as RustPermissionRule,
Expand Down Expand Up @@ -3012,6 +3016,67 @@ pub struct McpServerStatusEntry {
// Session
// ============================================================================

/// One unit of orchestrated agent work — what to run, independent of where.
#[napi(object)]
#[derive(Clone)]
pub struct AgentStepSpecObject {
/// Stable id for this step (assigned by the caller).
pub task_id: String,
/// Registry key of the agent to run (e.g. "explore", "review").
pub agent: String,
/// Short label for display/tracking.
pub description: String,
/// Instruction handed to the child agent.
pub prompt: String,
/// Optional per-step tool-round cap.
pub max_steps: Option<u32>,
/// Optional parent session id for event correlation.
pub parent_session_id: Option<String>,
/// When set, the step must return JSON conforming to this schema; the
/// validated object lands in `StepOutcomeObject.structured`.
pub output_schema: Option<serde_json::Value>,
}

impl From<AgentStepSpecObject> for RustAgentStepSpec {
fn from(o: AgentStepSpecObject) -> Self {
RustAgentStepSpec {
task_id: o.task_id,
agent: o.agent,
description: o.description,
prompt: o.prompt,
max_steps: o.max_steps.map(|n| n as usize),
parent_session_id: o.parent_session_id,
output_schema: o.output_schema,
}
}
}

/// The result of running one orchestrated step.
#[napi(object)]
#[derive(Clone)]
pub struct StepOutcomeObject {
pub task_id: String,
pub session_id: String,
pub agent: String,
pub output: String,
pub success: bool,
/// Schema-validated structured output, when the step requested one.
pub structured: Option<serde_json::Value>,
}

impl From<RustStepOutcome> for StepOutcomeObject {
fn from(o: RustStepOutcome) -> Self {
StepOutcomeObject {
task_id: o.task_id,
session_id: o.session_id,
agent: o.agent,
output: o.output,
success: o.success,
structured: o.structured,
}
}
}

/// Workspace-bound session. All LLM and tool operations happen here.
#[napi]
pub struct Session {
Expand Down Expand Up @@ -3070,6 +3135,60 @@ impl Session {
Ok(AgentResult::from(result))
}

/// Run `specs` as a fan-out of agent steps, bounded by the session's
/// configured parallelism, and resolve with each step's outcome in input
/// order. A failed step surfaces as `success: false` without failing the
/// batch.
#[napi]
pub async fn parallel(
&self,
specs: Vec<AgentStepSpecObject>,
) -> napi::Result<Vec<StepOutcomeObject>> {
let session = self.inner.clone();
let rust_specs: Vec<RustAgentStepSpec> = specs.into_iter().map(Into::into).collect();
let outcomes = get_runtime()
.spawn(async move {
let executor = session.agent_executor();
execute_steps_parallel(executor, rust_specs, None).await
})
.await
.map_err(|e| napi::Error::from_reason(format!("Task join error: {e}")))?;
Ok(outcomes.into_iter().map(StepOutcomeObject::from).collect())
}

/// Like `parallel`, but resumable: progress is journaled under
/// `workflowId` via the session's `sessionStore`, so an interrupted run
/// skips already-completed steps. Rejects when no `sessionStore` is
/// configured.
#[napi]
pub async fn parallel_resumable(
&self,
specs: Vec<AgentStepSpecObject>,
workflow_id: String,
) -> napi::Result<Vec<StepOutcomeObject>> {
let session = self.inner.clone();
let rust_specs: Vec<RustAgentStepSpec> = specs.into_iter().map(Into::into).collect();
let outcomes = get_runtime()
.spawn(async move {
let Some(store) = session.session_store() else {
return Err("parallelResumable requires a sessionStore on the session");
};
let executor = session.agent_executor();
Ok(execute_steps_parallel_resumable(
executor,
rust_specs,
&workflow_id,
store,
None,
)
.await)
})
.await
.map_err(|e| napi::Error::from_reason(format!("Task join error: {e}")))?
.map_err(napi::Error::from_reason)?;
Ok(outcomes.into_iter().map(StepOutcomeObject::from).collect())
}

/// Send a prompt or request and get a streaming event iterator.
///
/// Returns an `EventStream`. Use `for await (const event of stream)` or call `.next()` manually.
Expand Down Expand Up @@ -5374,6 +5493,39 @@ impl From<SearchConfig> for RustSearchConfig {
mod tests {
use super::*;

#[test]
fn orchestration_object_conversions_round_trip_fields() {
let schema = serde_json::json!({ "type": "object" });
let spec = AgentStepSpecObject {
task_id: "t1".into(),
agent: "explore".into(),
description: "d".into(),
prompt: "p".into(),
max_steps: Some(5),
parent_session_id: Some("parent".into()),
output_schema: Some(schema.clone()),
};
let rust: RustAgentStepSpec = spec.into();
assert_eq!(rust.task_id, "t1");
assert_eq!(rust.agent, "explore");
assert_eq!(rust.max_steps, Some(5));
assert_eq!(rust.parent_session_id.as_deref(), Some("parent"));
assert_eq!(rust.output_schema, Some(schema));

let outcome = RustStepOutcome {
task_id: "t1".into(),
session_id: "task-run-t1".into(),
agent: "explore".into(),
output: "out".into(),
success: true,
structured: Some(serde_json::json!({ "k": 1 })),
};
let obj = StepOutcomeObject::from(outcome);
assert_eq!(obj.task_id, "t1");
assert!(obj.success);
assert_eq!(obj.structured, Some(serde_json::json!({ "k": 1 })));
}

fn sdk_test_config() -> a3s_code_core::CodeConfig {
a3s_code_core::CodeConfig {
default_model: Some("openai/gpt-4o".to_string()),
Expand Down
Loading