From 6a6d0418c093285f9ef451fbe9c76064ad6e8d35 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 17:07:09 +0800 Subject: [PATCH] feat(orchestration): Node pipeline with callback stages (Phase 5b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Session.pipeline(items, stages, timeoutMs?) exposes execute_pipeline to JS. Each stage is (ctx: { previous, item }) => spec | null; chains run with no inter-stage barrier and stop on a null stage or a failed step. FFI design (honors the #32 contract): - Synchronous napi method returning a Promise via env.create_deferred(): JS stage functions (not Send) are converted to ThreadsafeFunctions on the JS thread, then the chains run on the worker runtime and resolve the Promise — the event loop is never blocked and no non-Send value crosses the async boundary (an async fn taking Vec won't compile, by design). - Each stage call uses call_with_return_value + a fail-closed timeout (default 30s) and lenient JsUnknown parsing: a null/unreadable return or a hang stops that chain (None) rather than fabricating a step or blocking. Mirrors the proven setBudgetGuard bridge; stage callbacks MUST NOT throw (documented) — same napi return-conversion-abort constraint. Limitation: per-stage outputSchema isn't supported on pipeline stages yet (the lenient parse can't read an arbitrary schema property); use parallel for schema-validated steps. Conversion test green; fmt + clippy --lib clean. --- sdk/node/src/lib.rs | 163 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 162 insertions(+), 1 deletion(-) diff --git a/sdk/node/src/lib.rs b/sdk/node/src/lib.rs index d7b5a85..0974153 100644 --- a/sdk/node/src/lib.rs +++ b/sdk/node/src/lib.rs @@ -57,7 +57,8 @@ use a3s_code_core::hooks::{ }; use a3s_code_core::llm::{ContentBlock as RustContentBlock, Message as RustMessage}; use a3s_code_core::orchestration::{ - execute_steps_parallel, execute_steps_parallel_resumable, AgentStepSpec as RustAgentStepSpec, + execute_pipeline, execute_steps_parallel, execute_steps_parallel_resumable, + AgentStepSpec as RustAgentStepSpec, PipelineStage as RustPipelineStage, StepOutcome as RustStepOutcome, }; use a3s_code_core::permissions::{ @@ -85,6 +86,7 @@ use a3s_code_core::{ SessionOptions as RustSessionOptions, }; use napi::Either; +use napi::Env; // AHP Type Bindings mod ahp_types; @@ -3189,6 +3191,77 @@ impl Session { Ok(outcomes.into_iter().map(StepOutcomeObject::from).collect()) } + /// Run each item through a chain of `stages`, with no barrier between + /// stages — item A can be in stage 3 while item B is still in stage 1. + /// + /// Each stage is a function `(ctx) => spec | null` where `ctx` is + /// `{ previous: StepOutcomeObject | null, item: any }`. Return an + /// `AgentStepSpecObject` (camelCase keys) to run that step, or `null` to + /// stop the item's chain. A chain also stops when a step fails. + /// + /// IMPORTANT: a stage callback MUST NOT throw — in this napi version a JS + /// throw at return-conversion aborts the process (same constraint as + /// `setBudgetGuard`). Wrap your logic in try/catch and return `null` on + /// error. A stage that hangs past `timeoutMs` (default 30s) fails closed + /// (treated as `null`, stopping that chain) rather than blocking forever. + /// + /// This is a *synchronous* napi method that returns a Promise via a + /// deferred: the JS stage functions (which are not `Send`) are converted + /// to thread-safe functions on the JS thread here, then the chains run on + /// the worker runtime and resolve the Promise — so the event loop is never + /// blocked and no non-`Send` value crosses the async boundary. + #[napi( + ts_args_type = "items: Array, stages: Array<(ctx: { previous: StepOutcomeObject | null, item: any }) => AgentStepSpecObject | null>, timeoutMs?: number", + ts_return_type = "Promise>" + )] + pub fn pipeline( + &self, + env: Env, + items: Vec, + stages: Vec, + timeout_ms: Option, + ) -> napi::Result { + use napi::threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction}; + // Single-object arg so the JS stage signature is `(ctx) => ...`. + let single_obj = |ctx: ThreadSafeCallContext| { + Ok(vec![ctx.env.to_js_value(&ctx.value)?]) + }; + let timeout = timeout_ms.map(|t| t as u64).unwrap_or(30_000); + + // Build the thread-safe functions on the JS thread (JsFunction is not + // Send), then wrap each as a synchronous PipelineStage the combinator + // can call from the worker runtime. + let rust_stages: Vec> = stages + .into_iter() + .map(|f| { + let tsfn: ThreadsafeFunction< + serde_json::Value, + napi::threadsafe_function::ErrorStrategy::Fatal, + > = f.create_threadsafe_function(0, single_obj)?; + let stage = Arc::new(NodePipelineStage { + tsfn, + timeout_ms: timeout, + }); + let pipeline_stage: RustPipelineStage = + Arc::new(move |prev, item| stage.invoke(prev, item)); + Ok::<_, napi::Error>(pipeline_stage) + }) + .collect::>>()?; + + let session = self.inner.clone(); + let (deferred, promise) = env.create_deferred::>, _>()?; + get_runtime().spawn(async move { + let executor = session.agent_executor(); + let outcomes = execute_pipeline(executor, items, rust_stages, None).await; + let mapped: Vec> = outcomes + .into_iter() + .map(|o| o.map(StepOutcomeObject::from)) + .collect(); + deferred.resolve(move |_env| Ok(mapped)); + }); + Ok(promise) + } + /// Send a prompt or request and get a streaming event iterator. /// /// Returns an `EventStream`. Use `for await (const event of stream)` or call `.next()` manually. @@ -4821,6 +4894,94 @@ struct NodeBudgetGuard { unsafe impl Send for NodeBudgetGuard {} unsafe impl Sync for NodeBudgetGuard {} +/// Bridges a JS pipeline-stage function to a synchronous `PipelineStage`. +struct NodePipelineStage { + tsfn: napi::threadsafe_function::ThreadsafeFunction< + serde_json::Value, + napi::threadsafe_function::ErrorStrategy::Fatal, + >, + timeout_ms: u64, +} + +// SAFETY: ThreadsafeFunction is designed to be sent across threads. +unsafe impl Send for NodePipelineStage {} +unsafe impl Sync for NodePipelineStage {} + +impl NodePipelineStage { + fn invoke( + &self, + prev: Option<&RustStepOutcome>, + item: &serde_json::Value, + ) -> Option { + let previous = prev + .map(|o| serde_json::to_value(o).unwrap_or(serde_json::Value::Null)) + .unwrap_or(serde_json::Value::Null); + let payload = serde_json::json!({ "previous": previous, "item": item }); + + let (tx, rx) = std::sync::mpsc::sync_channel::>(1); + self.tsfn.call_with_return_value( + payload, + napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking, + move |ret: napi::JsUnknown| { + // Fail-closed: a null/unreadable return stops this chain rather + // than fabricating a step. + let _ = tx.send(parse_js_step_spec(ret)); + Ok(()) + }, + ); + // Fail-closed on timeout/throw: under Fatal strategy a JS throw means + // the return closure never fires, so the channel stays empty — treat + // as None (stop this chain) instead of blocking forever. + tokio::task::block_in_place(|| { + rx.recv_timeout(std::time::Duration::from_millis(self.timeout_ms)) + .unwrap_or(None) + }) + } +} + +/// Parse a JS pipeline-stage return value into an `AgentStepSpec`, or `None` +/// for `null`/`undefined`/unreadable input (which stops the chain). Accepts +/// camelCase (the SDK convention) and snake_case keys. +fn parse_js_step_spec(val: napi::JsUnknown) -> Option { + use napi::{JsObject, ValueType}; + if !matches!(val.get_type().ok()?, ValueType::Object) { + return None; + } + let obj = unsafe { val.cast::() }; + let get_str = |keys: &[&str]| -> Option { + for k in keys { + if let Ok(s) = obj.get_named_property::(k) { + if let Some(v) = s.into_utf8().ok().and_then(|s| s.into_owned().ok()) { + return Some(v); + } + } + } + None + }; + let task_id = get_str(&["taskId", "task_id"])?; + let agent = get_str(&["agent"])?; + let prompt = get_str(&["prompt"])?; + let description = get_str(&["description"]).unwrap_or_default(); + let max_steps = ["maxSteps", "max_steps"] + .iter() + .find_map(|k| obj.get_named_property::(k).ok()) + .and_then(|n| n.get_uint32().ok()) + .map(|n| n as usize); + let parent_session_id = get_str(&["parentSessionId", "parent_session_id"]); + Some(RustAgentStepSpec { + task_id, + agent, + description, + prompt, + max_steps, + parent_session_id, + // Per-stage `outputSchema` is not yet supported on pipeline stages + // (the lenient JsUnknown parse here can't read an arbitrary JSON-schema + // property safely). Use `parallel` for schema-validated steps. + output_schema: None, + }) +} + impl NodeBudgetGuard { fn call_decision( &self,