Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 162 additions & 1 deletion sdk/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ use a3s_code_core::hooks::{
};
use a3s_code_core::llm::{ContentBlock as RustContentBlock, Message as RustMessage};
use a3s_code_core::orchestration::{
execute_steps_parallel, execute_steps_parallel_resumable, AgentStepSpec as RustAgentStepSpec,
execute_pipeline, execute_steps_parallel, execute_steps_parallel_resumable,
AgentStepSpec as RustAgentStepSpec, PipelineStage as RustPipelineStage,
StepOutcome as RustStepOutcome,
};
use a3s_code_core::permissions::{
Expand Down Expand Up @@ -85,6 +86,7 @@ use a3s_code_core::{
SessionOptions as RustSessionOptions,
};
use napi::Either;
use napi::Env;

// AHP Type Bindings
mod ahp_types;
Expand Down Expand Up @@ -3189,6 +3191,77 @@ impl Session {
Ok(outcomes.into_iter().map(StepOutcomeObject::from).collect())
}

/// Run each item through a chain of `stages`, with no barrier between
/// stages — item A can be in stage 3 while item B is still in stage 1.
///
/// Each stage is a function `(ctx) => spec | null` where `ctx` is
/// `{ previous: StepOutcomeObject | null, item: any }`. Return an
/// `AgentStepSpecObject` (camelCase keys) to run that step, or `null` to
/// stop the item's chain. A chain also stops when a step fails.
///
/// IMPORTANT: a stage callback MUST NOT throw — in this napi version a JS
/// throw at return-conversion aborts the process (same constraint as
/// `setBudgetGuard`). Wrap your logic in try/catch and return `null` on
/// error. A stage that hangs past `timeoutMs` (default 30s) fails closed
/// (treated as `null`, stopping that chain) rather than blocking forever.
///
/// This is a *synchronous* napi method that returns a Promise via a
/// deferred: the JS stage functions (which are not `Send`) are converted
/// to thread-safe functions on the JS thread here, then the chains run on
/// the worker runtime and resolve the Promise — so the event loop is never
/// blocked and no non-`Send` value crosses the async boundary.
#[napi(
ts_args_type = "items: Array<any>, stages: Array<(ctx: { previous: StepOutcomeObject | null, item: any }) => AgentStepSpecObject | null>, timeoutMs?: number",
ts_return_type = "Promise<Array<StepOutcomeObject | null>>"
)]
pub fn pipeline(
&self,
env: Env,
items: Vec<serde_json::Value>,
stages: Vec<napi::JsFunction>,
timeout_ms: Option<u32>,
) -> napi::Result<napi::JsObject> {
use napi::threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction};
// Single-object arg so the JS stage signature is `(ctx) => ...`.
let single_obj = |ctx: ThreadSafeCallContext<serde_json::Value>| {
Ok(vec![ctx.env.to_js_value(&ctx.value)?])
};
let timeout = timeout_ms.map(|t| t as u64).unwrap_or(30_000);

// Build the thread-safe functions on the JS thread (JsFunction is not
// Send), then wrap each as a synchronous PipelineStage the combinator
// can call from the worker runtime.
let rust_stages: Vec<RustPipelineStage<serde_json::Value>> = stages
.into_iter()
.map(|f| {
let tsfn: ThreadsafeFunction<
serde_json::Value,
napi::threadsafe_function::ErrorStrategy::Fatal,
> = f.create_threadsafe_function(0, single_obj)?;
let stage = Arc::new(NodePipelineStage {
tsfn,
timeout_ms: timeout,
});
let pipeline_stage: RustPipelineStage<serde_json::Value> =
Arc::new(move |prev, item| stage.invoke(prev, item));
Ok::<_, napi::Error>(pipeline_stage)
})
.collect::<napi::Result<Vec<_>>>()?;

let session = self.inner.clone();
let (deferred, promise) = env.create_deferred::<Vec<Option<StepOutcomeObject>>, _>()?;
get_runtime().spawn(async move {
let executor = session.agent_executor();
let outcomes = execute_pipeline(executor, items, rust_stages, None).await;
let mapped: Vec<Option<StepOutcomeObject>> = outcomes
.into_iter()
.map(|o| o.map(StepOutcomeObject::from))
.collect();
deferred.resolve(move |_env| Ok(mapped));
});
Ok(promise)
}

/// Send a prompt or request and get a streaming event iterator.
///
/// Returns an `EventStream`. Use `for await (const event of stream)` or call `.next()` manually.
Expand Down Expand Up @@ -4821,6 +4894,94 @@ struct NodeBudgetGuard {
unsafe impl Send for NodeBudgetGuard {}
unsafe impl Sync for NodeBudgetGuard {}

/// Bridges a JS pipeline-stage function to a synchronous `PipelineStage`.
struct NodePipelineStage {
tsfn: napi::threadsafe_function::ThreadsafeFunction<
serde_json::Value,
napi::threadsafe_function::ErrorStrategy::Fatal,
>,
timeout_ms: u64,
}

// SAFETY: ThreadsafeFunction is designed to be sent across threads.
unsafe impl Send for NodePipelineStage {}
unsafe impl Sync for NodePipelineStage {}

impl NodePipelineStage {
fn invoke(
&self,
prev: Option<&RustStepOutcome>,
item: &serde_json::Value,
) -> Option<RustAgentStepSpec> {
let previous = prev
.map(|o| serde_json::to_value(o).unwrap_or(serde_json::Value::Null))
.unwrap_or(serde_json::Value::Null);
let payload = serde_json::json!({ "previous": previous, "item": item });

let (tx, rx) = std::sync::mpsc::sync_channel::<Option<RustAgentStepSpec>>(1);
self.tsfn.call_with_return_value(
payload,
napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking,
move |ret: napi::JsUnknown| {
// Fail-closed: a null/unreadable return stops this chain rather
// than fabricating a step.
let _ = tx.send(parse_js_step_spec(ret));
Ok(())
},
);
// Fail-closed on timeout/throw: under Fatal strategy a JS throw means
// the return closure never fires, so the channel stays empty — treat
// as None (stop this chain) instead of blocking forever.
tokio::task::block_in_place(|| {
rx.recv_timeout(std::time::Duration::from_millis(self.timeout_ms))
.unwrap_or(None)
})
}
}

/// Parse a JS pipeline-stage return value into an `AgentStepSpec`, or `None`
/// for `null`/`undefined`/unreadable input (which stops the chain). Accepts
/// camelCase (the SDK convention) and snake_case keys.
fn parse_js_step_spec(val: napi::JsUnknown) -> Option<RustAgentStepSpec> {
use napi::{JsObject, ValueType};
if !matches!(val.get_type().ok()?, ValueType::Object) {
return None;
}
let obj = unsafe { val.cast::<JsObject>() };
let get_str = |keys: &[&str]| -> Option<String> {
for k in keys {
if let Ok(s) = obj.get_named_property::<napi::JsString>(k) {
if let Some(v) = s.into_utf8().ok().and_then(|s| s.into_owned().ok()) {
return Some(v);
}
}
}
None
};
let task_id = get_str(&["taskId", "task_id"])?;
let agent = get_str(&["agent"])?;
let prompt = get_str(&["prompt"])?;
let description = get_str(&["description"]).unwrap_or_default();
let max_steps = ["maxSteps", "max_steps"]
.iter()
.find_map(|k| obj.get_named_property::<napi::JsNumber>(k).ok())
.and_then(|n| n.get_uint32().ok())
.map(|n| n as usize);
let parent_session_id = get_str(&["parentSessionId", "parent_session_id"]);
Some(RustAgentStepSpec {
task_id,
agent,
description,
prompt,
max_steps,
parent_session_id,
// Per-stage `outputSchema` is not yet supported on pipeline stages
// (the lenient JsUnknown parse here can't read an arbitrary JSON-schema
// property safely). Use `parallel` for schema-validated steps.
output_schema: None,
})
}

impl NodeBudgetGuard {
fn call_decision(
&self,
Expand Down
Loading