From 235deadaff6aa41ff02ba893a4642c1dba56c64a Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 18:26:08 +0800 Subject: [PATCH] test(orchestration): comprehensive unit + real-LLM coverage (#43) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the coverage gaps surfaced by an adversarial coverage audit across the new orchestration layer. +21 unit tests, +4 real-LLM integration tests (all run green against .a3s/config.acl), plus a small fix and a de-flake. Unit: - combinators: empty inputs, zero concurrency_hint (\.max(1) clamp), first-stage None, resumable ignores checkpointed-but-dropped specs, and resumable re-runs all when the checkpoint is unreadable (future version). - store: FileSessionStore workflow-checkpoint round-trip, crash-atomic no-temp-leftovers, and future-version rejection through both stores. - schema coercion FAILURE path (previously untested): demote-to-failure with the marker, isolation from a sibling in a fan-out, and failed-run-skips-coercion. - #31 persisted-schema fuzz extended to AgentStepSpec / StepOutcome / WorkflowCheckpoint (round-trip stability + forward-compat unknown-field tolerance) — the cross-node-migration types were previously excluded. - Python SDK conversion helpers (py_to_step_spec / step_outcome_to_py) and the PythonPipelineStage bridge (None-stops, raise-fails-closed, snake_case ctx['previous']) — previously zero coverage. Real-LLM (#[ignore], .a3s/config.acl): pure parallel fan-out of distinct agents (order + per-branch sentinels), multi-item pipeline (no-barrier shape with >1 item), the actual RESUME path (completed step served from checkpoint, only the rest run live), and a nested/array output_schema. Fix: the resumable combinator now distinguishes Ok(None) from a load Err and logs a warning before re-running from scratch (was a silent swallow). De-flake: the pipeline real-LLM tests no longer assert on stage-2 output *content* (some models return an empty final turn for a one-word instruction); chaining is proven by reaching stage 2 + the deterministic mock test. --- core/src/orchestration/combinators.rs | 182 ++++++++++- core/src/store/tests.rs | 119 +++++++ core/src/tools/task.rs | 113 +++++++ core/tests/test_orchestration_real_llm.rs | 294 +++++++++++++++++- core/tests/test_persisted_schema_roundtrip.rs | 67 ++++ sdk/python/src/lib.rs | 150 +++++++++ 6 files changed, 916 insertions(+), 9 deletions(-) diff --git a/core/src/orchestration/combinators.rs b/core/src/orchestration/combinators.rs index a9beb43..e35c404 100644 --- a/core/src/orchestration/combinators.rs +++ b/core/src/orchestration/combinators.rs @@ -106,11 +106,23 @@ pub async fn execute_steps_parallel_resumable( store: Arc, event_tx: Option>, ) -> Vec { - // Prior progress (the store's load rejects a future-version checkpoint). + // Prior progress. An unreadable checkpoint — e.g. one written by a newer, + // incompatible schema version, which the store rejects via + // `ensure_loadable` — is treated as *no* prior progress: the workflow + // re-runs from scratch rather than resuming from state it can't interpret. + // That's a fail-safe (do the work), but surface it rather than swallow it. let done: HashMap = match store.load_workflow_checkpoint(workflow_id).await { Ok(Some(cp)) => cp.completed(), - _ => HashMap::new(), + Ok(None) => HashMap::new(), + Err(e) => { + tracing::warn!( + workflow_id = %workflow_id, + error = %e, + "workflow checkpoint unreadable; re-running the workflow from scratch" + ); + HashMap::new() + } }; let pending: Vec = specs @@ -486,4 +498,170 @@ mod tests { "failed step is NOT recorded → it retries on resume" ); } + + struct ZeroHintExecutor; + #[async_trait] + impl AgentExecutor for ZeroHintExecutor { + async fn execute_step( + &self, + spec: AgentStepSpec, + _event_tx: Option>, + ) -> StepOutcome { + StepOutcome { + task_id: spec.task_id.clone(), + session_id: format!("task-run-{}", spec.task_id), + agent: spec.agent.clone(), + output: "ok".to_string(), + success: true, + structured: None, + } + } + fn concurrency_hint(&self) -> usize { + 0 + } + } + + #[tokio::test] + async fn empty_inputs_return_empty() { + let exec: Arc = Arc::new(EchoExecutor::new()); + assert!( + crate::orchestration::execute_steps_parallel(Arc::clone(&exec), vec![], None) + .await + .is_empty() + ); + let stages: Vec> = + vec![stage(|_p: Option<&StepOutcome>, item: &&str| { + Some(AgentStepSpec::new("s", "explore", "d", *item)) + })]; + assert!(execute_pipeline(exec, Vec::<&str>::new(), stages, None) + .await + .is_empty()); + } + + #[tokio::test] + async fn zero_concurrency_hint_still_makes_progress() { + // The .max(1) clamp in run_ordered_parallel_with_limit keeps a 0-hint + // executor serialized-but-live instead of deadlocking on 0 permits. + let exec: Arc = Arc::new(ZeroHintExecutor); + let specs = vec![ + AgentStepSpec::new("a", "explore", "d", "p"), + AgentStepSpec::new("b", "explore", "d", "p"), + AgentStepSpec::new("c", "explore", "d", "p"), + ]; + let out = crate::orchestration::execute_steps_parallel(exec, specs, None).await; + assert_eq!( + out.iter().map(|o| o.task_id.as_str()).collect::>(), + vec!["a", "b", "c"] + ); + assert!(out.iter().all(|o| o.success)); + } + + #[tokio::test] + async fn pipeline_first_stage_none_yields_none_outcome() { + let exec: Arc = Arc::new(EchoExecutor::new()); + let stages: Vec> = + vec![stage(|_p: Option<&StepOutcome>, item: &&str| { + if *item == "skip" { + None + } else { + Some(AgentStepSpec::new("s", "explore", "d", *item)) + } + })]; + let out = execute_pipeline(exec, vec!["skip", "run"], stages, None).await; + assert!( + out[0].is_none(), + "a first-stage None yields a None outcome (chain never started)" + ); + assert!(out[1].as_ref().unwrap().success); + } + + fn cached(task_id: &str, agent: &str, output: &str) -> StepOutcome { + StepOutcome { + task_id: task_id.to_string(), + session_id: format!("task-run-{task_id}"), + agent: agent.to_string(), + output: output.to_string(), + success: true, + structured: None, + } + } + + #[tokio::test] + async fn resumable_reruns_all_when_checkpoint_load_errors() { + use crate::store::MemorySessionStore; + let store: Arc = Arc::new(MemorySessionStore::new()); + + // A checkpoint written by a *newer*, incompatible schema version: the + // store rejects it on load. The resumable combinator must treat that as + // no prior progress and re-run everything (fail-safe), not panic or + // silently resume from state it can't read. + let mut done = std::collections::HashMap::new(); + done.insert("a".to_string(), cached("a", "explore", "old")); + let mut cp = WorkflowCheckpoint::from_completed("wf-err", &done, 1); + cp.schema_version = crate::orchestration::WORKFLOW_CHECKPOINT_SCHEMA_VERSION + 1; + store.save_workflow_checkpoint("wf-err", &cp).await.unwrap(); + + let ran = Arc::new(tokio::sync::Mutex::new(Vec::new())); + let exec: Arc = Arc::new(RecordingExecutor { + ran: Arc::clone(&ran), + }); + let specs = vec![ + AgentStepSpec::new("a", "explore", "d", "pa"), + AgentStepSpec::new("b", "review", "d", "pb"), + ]; + let out = + execute_steps_parallel_resumable(exec, specs, "wf-err", Arc::clone(&store), None).await; + + let mut ran_ids = ran.lock().await.clone(); + ran_ids.sort(); + assert_eq!( + ran_ids, + vec!["a".to_string(), "b".to_string()], + "an unreadable (future-version) checkpoint is ignored → all steps re-run" + ); + assert_eq!(out.len(), 2); + assert!(out.iter().all(|o| o.success)); + } + + #[tokio::test] + async fn resumable_ignores_checkpointed_steps_absent_from_new_specs() { + use crate::store::MemorySessionStore; + let store: Arc = Arc::new(MemorySessionStore::new()); + + // Prior checkpoint completed {a, b}; the new run drops "a", reorders, + // and adds "c". Output follows the NEW specs; "b" is reused; the stale + // "a" simply doesn't appear; only "c" actually runs. + let mut done = std::collections::HashMap::new(); + done.insert("a".to_string(), cached("a", "explore", "cached-a")); + done.insert("b".to_string(), cached("b", "review", "cached-b")); + store + .save_workflow_checkpoint( + "wf-x", + &WorkflowCheckpoint::from_completed("wf-x", &done, 1), + ) + .await + .unwrap(); + + let ran = Arc::new(tokio::sync::Mutex::new(Vec::new())); + let exec: Arc = Arc::new(RecordingExecutor { + ran: Arc::clone(&ran), + }); + let specs = vec![ + AgentStepSpec::new("b", "review", "d", "pb"), + AgentStepSpec::new("c", "plan", "d", "pc"), + ]; + let out = + execute_steps_parallel_resumable(exec, specs, "wf-x", Arc::clone(&store), None).await; + + assert_eq!( + *ran.lock().await, + vec!["c".to_string()], + "cached b reused, stale a dropped, only new c runs" + ); + assert_eq!(out.len(), 2); + assert_eq!(out[0].task_id, "b"); + assert_eq!(out[0].output, "cached-b"); + assert_eq!(out[1].task_id, "c"); + assert!(out.iter().all(|o| o.success)); + } } diff --git a/core/src/store/tests.rs b/core/src/store/tests.rs index a8e3904..cc61094 100644 --- a/core/src/store/tests.rs +++ b/core/src/store/tests.rs @@ -808,3 +808,122 @@ async fn test_file_store_checkpoint_write_is_atomic_no_temp_leftovers() { "the final checkpoint file must exist, got: {names:?}" ); } + +fn sample_workflow_checkpoint(wf_id: &str) -> crate::orchestration::WorkflowCheckpoint { + use crate::orchestration::{ + StepOutcome, WorkflowCheckpoint, WorkflowStepRecord, WORKFLOW_CHECKPOINT_SCHEMA_VERSION, + }; + let outcome = |id: &str, structured| StepOutcome { + task_id: id.to_string(), + session_id: format!("task-run-{id}"), + agent: "explore".to_string(), + output: format!("out-{id}"), + success: true, + structured, + }; + WorkflowCheckpoint { + schema_version: WORKFLOW_CHECKPOINT_SCHEMA_VERSION, + workflow_id: wf_id.to_string(), + steps: vec![ + WorkflowStepRecord { + task_id: "a".into(), + outcome: outcome("a", None), + }, + WorkflowStepRecord { + task_id: "b".into(), + outcome: outcome("b", Some(serde_json::json!({ "k": 1 }))), + }, + ], + checkpoint_ms: 1_700_000_000_000, + } +} + +#[tokio::test] +async fn test_file_store_workflow_checkpoint_roundtrip() { + let dir = tempdir().unwrap(); + let store = FileSessionStore::new(dir.path()).await.unwrap(); + let cp = sample_workflow_checkpoint("wf-1"); + store.save_workflow_checkpoint("wf-1", &cp).await.unwrap(); + + let loaded = store + .load_workflow_checkpoint("wf-1") + .await + .unwrap() + .expect("present"); + assert_eq!(loaded, cp); + assert_eq!(loaded.completed().len(), 2); + + store.delete_workflow_checkpoint("wf-1").await.unwrap(); + assert!(store + .load_workflow_checkpoint("wf-1") + .await + .unwrap() + .is_none()); + // Idempotent on a missing file. + store.delete_workflow_checkpoint("wf-1").await.unwrap(); +} + +#[tokio::test] +async fn test_memory_store_rejects_future_workflow_checkpoint() { + let store = MemorySessionStore::new(); + let mut future = sample_workflow_checkpoint("wf-future"); + future.schema_version = crate::orchestration::WORKFLOW_CHECKPOINT_SCHEMA_VERSION + 1; + store + .save_workflow_checkpoint("wf-future", &future) + .await + .unwrap(); + let err = store + .load_workflow_checkpoint("wf-future") + .await + .unwrap_err(); + assert!(err.to_string().contains("schema version"), "got: {err}"); + + store + .save_workflow_checkpoint("wf-ok", &sample_workflow_checkpoint("wf-ok")) + .await + .unwrap(); + assert!(store + .load_workflow_checkpoint("wf-ok") + .await + .unwrap() + .is_some()); +} + +#[tokio::test] +async fn test_file_store_rejects_future_workflow_checkpoint() { + let dir = tempdir().unwrap(); + let store = FileSessionStore::new(dir.path()).await.unwrap(); + let mut future = sample_workflow_checkpoint("wf-f"); + future.schema_version = crate::orchestration::WORKFLOW_CHECKPOINT_SCHEMA_VERSION + 1; + store + .save_workflow_checkpoint("wf-f", &future) + .await + .unwrap(); + let err = store.load_workflow_checkpoint("wf-f").await.unwrap_err(); + assert!(err.to_string().contains("schema version"), "got: {err}"); +} + +#[tokio::test] +async fn test_file_store_workflow_checkpoint_atomic_no_temp_leftovers() { + let dir = tempdir().unwrap(); + let store = FileSessionStore::new(dir.path()).await.unwrap(); + store + .save_workflow_checkpoint("wf-z", &sample_workflow_checkpoint("wf-z")) + .await + .unwrap(); + + let ckpt_dir = dir.path().join("workflow_checkpoints"); + let mut entries = tokio::fs::read_dir(&ckpt_dir).await.unwrap(); + let mut names = Vec::new(); + while let Some(e) = entries.next_entry().await.unwrap() { + names.push(e.file_name().to_string_lossy().to_string()); + } + assert!( + names.iter().all(|n| !n.contains(".tmp")), + "no temp leftovers after atomic write, got: {names:?}" + ); + assert!( + names.iter().any(|n| n == "wf-z.json"), + "the final workflow checkpoint file must exist, got: {names:?}" + ); +} diff --git a/core/src/tools/task.rs b/core/src/tools/task.rs index b304dc8..3dbe44c 100644 --- a/core/src/tools/task.rs +++ b/core/src/tools/task.rs @@ -1712,6 +1712,119 @@ mod tests { ); } + /// The agent's turn returns text; the coercion call (`emit_step_output`) + /// always returns an object that VIOLATES the schema, so `generate_blocking` + /// exhausts its repairs and bails. + struct SchemaFailClient; + + #[async_trait::async_trait] + impl LlmClient for SchemaFailClient { + async fn complete( + &self, + messages: &[Message], + system: Option<&str>, + tools: &[ToolDefinition], + ) -> Result { + if system == Some(crate::prompts::PRE_ANALYSIS_SYSTEM) { + return Ok(pre_analysis_response(messages)); + } + if tools.iter().any(|t| t.name == "emit_step_output") { + // `{}` is missing the required `verdict` field → schema invalid. + return Ok(MockLlmClient::tool_call_response( + "coerce-fail", + "emit_step_output", + serde_json::json!({}), + )); + } + Ok(text_response("some answer")) + } + + async fn complete_streaming( + &self, + _messages: &[Message], + _system: Option<&str>, + _tools: &[ToolDefinition], + _cancel_token: tokio_util::sync::CancellationToken, + ) -> Result> { + anyhow::bail!("streaming unused") + } + } + + #[tokio::test] + async fn execute_step_with_schema_demotes_step_on_coercion_failure() { + let workspace = tempfile::tempdir().unwrap(); + let executor = TaskExecutor::new( + Arc::new(AgentRegistry::new()), + Arc::new(SchemaFailClient), + workspace.path().to_string_lossy().to_string(), + ); + let spec = AgentStepSpec::new("step-x", "general", "assess", "Assess the thing.") + .with_output_schema(verdict_schema()); + + let outcome = executor.execute_step(spec, None).await; + + assert!( + !outcome.success, + "a step whose output can't satisfy the schema is demoted to failure" + ); + assert_eq!(outcome.structured, None, "no validated object on failure"); + assert!( + outcome.output.contains("[structured output failed"), + "the demotion marker is appended: {}", + outcome.output + ); + } + + #[tokio::test] + async fn parallel_isolates_schema_coercion_failure_from_sibling() { + let workspace = tempfile::tempdir().unwrap(); + let executor: Arc = Arc::new(TaskExecutor::new( + Arc::new(AgentRegistry::new()), + Arc::new(SchemaFailClient), + workspace.path().to_string_lossy().to_string(), + )); + // A plain step (no schema → succeeds) alongside a schema'd step whose + // coercion fails. The failure must not drop or fail the sibling. + let specs = vec![ + AgentStepSpec::new("plain", "general", "d", "p"), + AgentStepSpec::new("schemad", "general", "d", "p").with_output_schema(verdict_schema()), + ]; + let out = crate::orchestration::execute_steps_parallel(executor, specs, None).await; + + assert_eq!(out.len(), 2); + assert_eq!(out[0].task_id, "plain"); + assert!(out[0].success, "no-schema sibling unaffected"); + assert_eq!(out[0].structured, None); + assert_eq!(out[1].task_id, "schemad"); + assert!(!out[1].success, "schema-failing step surfaces as failure"); + assert_eq!(out[1].structured, None); + assert!(out[1].output.contains("[structured output failed")); + } + + #[tokio::test] + async fn failed_step_with_schema_skips_coercion() { + let workspace = tempfile::tempdir().unwrap(); + let executor = TaskExecutor::new( + Arc::new(AgentRegistry::new()), + Arc::new(SchemaCoercionClient), + workspace.path().to_string_lossy().to_string(), + ); + // Unknown agent → the run fails BEFORE coercion. The failure is the + // run error, not a coercion failure — coercion must not run. + let spec = AgentStepSpec::new("step-y", "no-such-agent", "d", "p") + .with_output_schema(verdict_schema()); + + let outcome = executor.execute_step(spec, None).await; + + assert!(!outcome.success); + assert_eq!(outcome.structured, None); + assert!( + !outcome.output.contains("[structured output failed"), + "coercion never ran — failure is the run error, not a coercion failure: {}", + outcome.output + ); + } + struct StaticLlmClient { text: String, } diff --git a/core/tests/test_orchestration_real_llm.rs b/core/tests/test_orchestration_real_llm.rs index bcde5f8..265fadc 100644 --- a/core/tests/test_orchestration_real_llm.rs +++ b/core/tests/test_orchestration_real_llm.rs @@ -7,14 +7,18 @@ //! cargo test -p a3s-code-core --test test_orchestration_real_llm -- --ignored --nocapture //! ``` +use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; +use async_trait::async_trait; +use tokio::sync::Mutex; + use a3s_code_core::config::CodeConfig; use a3s_code_core::llm::create_client_with_config; use a3s_code_core::orchestration::{ - execute_pipeline, execute_steps_parallel_resumable, AgentExecutor, AgentStepSpec, - PipelineStage, StepOutcome, + execute_pipeline, execute_steps_parallel, execute_steps_parallel_resumable, AgentExecutor, + AgentStepSpec, PipelineStage, StepOutcome, WorkflowCheckpoint, }; use a3s_code_core::store::{MemorySessionStore, SessionStore}; use a3s_code_core::subagent::AgentRegistry; @@ -142,13 +146,15 @@ async fn real_pipeline_chains_two_agent_stages() { "pipeline chain succeeded: {}", final_outcome.output ); + // Reaching stage 2 is the chaining proof: stage 2 only runs when stage 1 + // succeeded, and its prompt is built from stage 1's output by the closure. + // We don't assert on stage 2's output *content* — some models return an + // empty final turn for a one-word YES/NO instruction. Deterministic + // prev-output visibility is covered by the mock test + // `each_item_chains_through_stages_and_later_stages_see_prior_output`. assert_eq!( final_outcome.task_id, "real-p2", - "the returned outcome is the last stage's" - ); - assert!( - !final_outcome.output.trim().is_empty(), - "stage 2 produced output derived from stage 1" + "the chain ran through to stage 2" ); } @@ -184,3 +190,277 @@ async fn real_resumable_workflow_runs_and_clears_checkpoint() { "a fully-succeeded workflow clears its checkpoint" ); } + +/// Wraps a real executor to record which task_ids actually reached it — used to +/// prove that a resumed workflow skips already-completed steps (no live call). +struct CountingExecutor { + inner: Arc, + ran: Arc>>, +} + +#[async_trait] +impl AgentExecutor for CountingExecutor { + async fn execute_step( + &self, + spec: AgentStepSpec, + event_tx: Option>, + ) -> StepOutcome { + self.ran.lock().await.push(spec.task_id.clone()); + self.inner.execute_step(spec, event_tx).await + } + fn concurrency_hint(&self) -> usize { + self.inner.concurrency_hint() + } +} + +/// Phase 1 live: pure `execute_steps_parallel` fan-out of several distinct +/// agents — order preserved, each branch independent (its own sentinel). +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_parallel_fans_out_independent_agents() { + let (executor, _ws) = local_executor(); + let exec: Arc = Arc::new(executor); + + let specs = vec![ + AgentStepSpec::new( + "p-alpha", + "general", + "echo", + "Reply with exactly the word ALPHA.", + ) + .with_max_steps(2), + AgentStepSpec::new( + "p-bravo", + "general", + "echo", + "Reply with exactly the word BRAVO.", + ) + .with_max_steps(2), + AgentStepSpec::new( + "p-charlie", + "general", + "echo", + "Reply with exactly the word CHARLIE.", + ) + .with_max_steps(2), + ]; + let out = execute_steps_parallel(exec, specs, None).await; + + assert_eq!(out.len(), 3, "one outcome per spec"); + assert_eq!( + out.iter().map(|o| o.task_id.as_str()).collect::>(), + vec!["p-alpha", "p-bravo", "p-charlie"], + "results preserve input order regardless of completion order" + ); + assert!( + out.iter().all(|o| o.success), + "all branches succeed: {:?}", + out.iter().map(|o| &o.output).collect::>() + ); + // Each branch saw only its own prompt (no cross-contamination). + assert!( + out[0].output.contains("ALPHA"), + "p-alpha: {}", + out[0].output + ); + assert!( + out[1].output.contains("BRAVO"), + "p-bravo: {}", + out[1].output + ); + assert!( + out[2].output.contains("CHARLIE"), + "p-charlie: {}", + out[2].output + ); +} + +/// Phase 3 live: a multi-item pipeline — the no-inter-stage-barrier shape with +/// >1 item, each chaining stage 2 off stage 1's live output. +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_pipeline_runs_multiple_items_concurrently() { + let (executor, _ws) = local_executor(); + let exec: Arc = Arc::new(executor); + + let stages: Vec> = vec![ + Arc::new(|_p: Option<&StepOutcome>, topic: &&str| { + Some( + AgentStepSpec::new( + "p1", + "general", + "summarize", + format!("In one sentence: {topic}"), + ) + .with_max_steps(2), + ) + }), + Arc::new(|prev: Option<&StepOutcome>, _t: &&str| { + let s = prev.map(|o| o.output.clone()).unwrap_or_default(); + Some( + AgentStepSpec::new( + "p2", + "general", + "classify", + format!("Reply YES or NO: is the following about software?\n\n{s}"), + ) + .with_max_steps(2), + ) + }), + ]; + let items = vec![ + "the Rust programming language", + "the Pacific Ocean", + "the Linux kernel", + ]; + let out = execute_pipeline(exec, items, stages, None).await; + + assert_eq!(out.len(), 3, "one result per item, order preserved"); + for (i, r) in out.iter().enumerate() { + let outcome = r + .as_ref() + .unwrap_or_else(|| panic!("item {i} produced no outcome")); + assert!( + outcome.success, + "item {i} chain succeeded: {}", + outcome.output + ); + // Reaching stage 2 proves chaining (see the 1-item test's note); + // output content isn't asserted (model nondeterminism). + assert_eq!(outcome.task_id, "p2", "item {i} ran through to stage 2"); + } +} + +/// Phase 4 live: the RESUME path — a workflow with a pre-existing checkpoint +/// re-runs ONLY the not-yet-completed steps against the real model; the +/// completed step is served from the checkpoint with no live call. +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_resumable_workflow_resumes_skipping_completed_steps() { + let (executor, _ws) = local_executor(); + let store: Arc = Arc::new(MemorySessionStore::new()); + + // Pre-seed: step "rw-1" already completed on a prior run. + let mut done = HashMap::new(); + done.insert( + "rw-1".to_string(), + StepOutcome { + task_id: "rw-1".into(), + session_id: "task-run-rw-1".into(), + agent: "general".into(), + output: "CACHED-SENTINEL".into(), + success: true, + structured: None, + }, + ); + store + .save_workflow_checkpoint( + "real-W", + &WorkflowCheckpoint::from_completed("real-W", &done, 1), + ) + .await + .unwrap(); + + let ran = Arc::new(Mutex::new(Vec::new())); + let counting: Arc = Arc::new(CountingExecutor { + inner: Arc::new(executor), + ran: Arc::clone(&ran), + }); + let specs = vec![ + AgentStepSpec::new("rw-1", "general", "cached", "unused — should be skipped") + .with_max_steps(2), + AgentStepSpec::new( + "rw-2", + "general", + "live", + "Reply with exactly the word DONE.", + ) + .with_max_steps(2), + ]; + let out = + execute_steps_parallel_resumable(counting, specs, "real-W", Arc::clone(&store), None).await; + + assert_eq!( + *ran.lock().await, + vec!["rw-2".to_string()], + "only the uncompleted step hit the real executor; rw-1 came from the checkpoint" + ); + assert_eq!(out[0].task_id, "rw-1"); + assert_eq!( + out[0].output, "CACHED-SENTINEL", + "completed step returns its cached outcome, no second LLM call" + ); + assert!( + out[1].success, + "the resumed step ran live: {}", + out[1].output + ); + assert!( + store + .load_workflow_checkpoint("real-W") + .await + .unwrap() + .is_none(), + "the now-complete workflow clears its checkpoint" + ); +} + +/// Phase 2 live: a step with a NESTED/array schema returns a validated object +/// through the real tool-mode coercion + repair loop. +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_execute_step_with_nested_schema_returns_validated_object() { + let (executor, _ws) = local_executor(); + let schema = serde_json::json!({ + "type": "object", + "properties": { + "languages": { + "type": "array", + "minItems": 2, + "items": { + "type": "object", + "properties": { + "name": { "type": "string" }, + "compiled": { "type": "boolean" } + }, + "required": ["name", "compiled"] + } + } + }, + "required": ["languages"] + }); + let spec = AgentStepSpec::new( + "nested", + "general", + "list languages", + "List two well-known systems programming languages and whether each is compiled.", + ) + .with_output_schema(schema) + .with_max_steps(2); + + let outcome = executor.execute_step(spec, None).await; + + assert!( + outcome.success, + "nested-schema step should succeed: {}", + outcome.output + ); + let object = outcome.structured.expect("validated structured object"); + let languages = object + .get("languages") + .and_then(|v| v.as_array()) + .expect("languages is an array"); + assert!(languages.len() >= 2, "at least 2 languages: {object}"); + for lang in languages { + assert!( + lang.get("name").and_then(|v| v.as_str()).is_some(), + "each language has a string name: {lang}" + ); + assert!( + lang.get("compiled") + .map(|v| v.is_boolean()) + .unwrap_or(false), + "each language has a boolean compiled: {lang}" + ); + } +} diff --git a/core/tests/test_persisted_schema_roundtrip.rs b/core/tests/test_persisted_schema_roundtrip.rs index 92fddf6..3e3e96a 100644 --- a/core/tests/test_persisted_schema_roundtrip.rs +++ b/core/tests/test_persisted_schema_roundtrip.rs @@ -30,6 +30,10 @@ use a3s_code_core::llm::{ ContentBlock, ImageSource, Message, TokenUsage, ToolResultContent, ToolResultContentField, }; use a3s_code_core::loop_checkpoint::{LoopCheckpoint, LOOP_CHECKPOINT_SCHEMA_VERSION}; +use a3s_code_core::orchestration::{ + AgentStepSpec, StepOutcome, WorkflowCheckpoint, WorkflowStepRecord, + WORKFLOW_CHECKPOINT_SCHEMA_VERSION, +}; use a3s_code_core::permissions::{PermissionPolicy, PermissionRule}; use a3s_code_core::planning::Task; use a3s_code_core::run::{RunEventRecord, RunRecord, RunSnapshot, RunStatus}; @@ -581,6 +585,57 @@ where // 1. Round-trip stability fuzz across the persisted graph. // ───────────────────────────────────────────────────────────────────── +// Orchestration persisted types (#43): serializable, and WorkflowCheckpoint +// is explicitly designed for cross-node migration — so round-trip stability +// and forward/backward compat matter for exactly these. +fn gen_agent_step_spec(rng: &mut Rng) -> AgentStepSpec { + AgentStepSpec { + task_id: rng.string(), + agent: rng.string(), + description: rng.string(), + prompt: rng.string(), + max_steps: rng.opt_usize(), + parent_session_id: rng.opt_string(), + // Exercise BOTH arms — output_schema is skip_serializing_if=None. + output_schema: if rng.boolean() { + Some(rng.json_value_non_null(2)) + } else { + None + }, + } +} + +fn gen_step_outcome(rng: &mut Rng) -> StepOutcome { + StepOutcome { + task_id: rng.string(), + session_id: rng.string(), + agent: rng.string(), + output: rng.string(), + success: rng.boolean(), + structured: if rng.boolean() { + Some(rng.json_value_non_null(2)) + } else { + None + }, + } +} + +fn gen_workflow_checkpoint(rng: &mut Rng) -> WorkflowCheckpoint { + let n = rng.below(4); + WorkflowCheckpoint { + // Stay at-or-below the current version (future versions are rejected). + schema_version: rng.below(u64::from(WORKFLOW_CHECKPOINT_SCHEMA_VERSION) + 1) as u32, + workflow_id: rng.string(), + steps: (0..n) + .map(|_| WorkflowStepRecord { + task_id: rng.string(), + outcome: gen_step_outcome(rng), + }) + .collect(), + checkpoint_ms: rng.u64_small(), + } +} + #[test] fn fuzz_roundtrip_all_persisted_types() { const SEEDS: u64 = 600; @@ -602,6 +657,15 @@ fn fuzz_roundtrip_all_persisted_types() { ); assert_roundtrip_stable(&gen_message(&mut rng), &format!("Message/{seed}")); assert_roundtrip_stable(&gen_session_data(&mut rng), &format!("SessionData/{seed}")); + assert_roundtrip_stable( + &gen_agent_step_spec(&mut rng), + &format!("AgentStepSpec/{seed}"), + ); + assert_roundtrip_stable(&gen_step_outcome(&mut rng), &format!("StepOutcome/{seed}")); + assert_roundtrip_stable( + &gen_workflow_checkpoint(&mut rng), + &format!("WorkflowCheckpoint/{seed}"), + ); } } @@ -764,6 +828,9 @@ fn forward_compat_unknown_fields_ignored_on_persisted_types() { assert_unknown_fields_ignored(&gen_verification_report(&mut rng), "VerificationReport"); assert_unknown_fields_ignored(&gen_run_record(&mut rng), "RunRecord"); assert_unknown_fields_ignored(&gen_session_data(&mut rng), "SessionData"); + assert_unknown_fields_ignored(&gen_agent_step_spec(&mut rng), "AgentStepSpec"); + assert_unknown_fields_ignored(&gen_step_outcome(&mut rng), "StepOutcome"); + assert_unknown_fields_ignored(&gen_workflow_checkpoint(&mut rng), "WorkflowCheckpoint"); } // ───────────────────────────────────────────────────────────────────── diff --git a/sdk/python/src/lib.rs b/sdk/python/src/lib.rs index 0c218ab..6859120 100644 --- a/sdk/python/src/lib.rs +++ b/sdk/python/src/lib.rs @@ -7255,4 +7255,154 @@ mod tests { _ => panic!("expected streamable-http transport"), } } + + // ---- orchestration conversion + pipeline-stage bridge (#43) ---- + + #[test] + fn py_to_step_spec_parses_full_dict() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let dict = PyDict::new(py); + dict.set_item("task_id", "t1").unwrap(); + dict.set_item("agent", "explore").unwrap(); + dict.set_item("description", "d").unwrap(); + dict.set_item("prompt", "p").unwrap(); + dict.set_item("max_steps", 5u32).unwrap(); + dict.set_item("parent_session_id", "parent").unwrap(); + let schema = PyDict::new(py); + schema.set_item("type", "object").unwrap(); + dict.set_item("output_schema", &schema).unwrap(); + + let spec = py_to_step_spec(py, dict.as_any()).unwrap(); + assert_eq!(spec.task_id, "t1"); + assert_eq!(spec.agent, "explore"); + assert_eq!(spec.prompt, "p"); + assert_eq!(spec.max_steps, Some(5)); + assert_eq!(spec.parent_session_id.as_deref(), Some("parent")); + assert!(spec.output_schema.is_some()); + }); + } + + #[test] + fn py_to_step_spec_minimal_defaults_optionals() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let dict = PyDict::new(py); + dict.set_item("task_id", "t1").unwrap(); + dict.set_item("agent", "explore").unwrap(); + dict.set_item("description", "d").unwrap(); + dict.set_item("prompt", "p").unwrap(); + let spec = py_to_step_spec(py, dict.as_any()).unwrap(); + assert_eq!(spec.max_steps, None); + assert_eq!(spec.parent_session_id, None); + assert_eq!(spec.output_schema, None); + }); + } + + #[test] + fn py_to_step_spec_missing_required_field_errors() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let dict = PyDict::new(py); + dict.set_item("task_id", "t1").unwrap(); + dict.set_item("agent", "explore").unwrap(); + dict.set_item("description", "d").unwrap(); + // No "prompt" — a required field with no serde default. + let err = py_to_step_spec(py, dict.as_any()).unwrap_err(); + assert!( + err.to_string().contains("AgentStepSpec") || err.to_string().contains("prompt"), + "got: {err}" + ); + }); + } + + #[test] + fn step_outcome_to_py_uses_snake_case_keys() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let outcome = RustStepOutcome { + task_id: "t1".into(), + session_id: "task-run-t1".into(), + agent: "explore".into(), + output: "o".into(), + success: true, + structured: Some(serde_json::json!({ "k": 1 })), + }; + let obj = step_outcome_to_py(py, &outcome).unwrap(); + let bound = obj.bind(py); + let dict = bound.downcast::().unwrap(); + // snake_case keys — the casing the pipeline `ctx['previous']` relies on. + assert_eq!( + dict.get_item("task_id") + .unwrap() + .unwrap() + .extract::() + .unwrap(), + "t1" + ); + assert_eq!( + dict.get_item("session_id") + .unwrap() + .unwrap() + .extract::() + .unwrap(), + "task-run-t1" + ); + assert!(dict + .get_item("success") + .unwrap() + .unwrap() + .extract::() + .unwrap()); + assert!(dict.get_item("structured").unwrap().is_some()); + }); + } + + #[test] + fn python_pipeline_stage_none_raise_and_spec() { + pyo3::prepare_freethreaded_python(); + let (none_cb, raise_cb, spec_cb) = Python::with_gil(|py| { + let none_cb = py.eval(c"lambda ctx: None", None, None).unwrap().unbind(); + // A raising stage must fail closed (caught → None), not abort. + let raise_cb = py.eval(c"lambda ctx: 1 / 0", None, None).unwrap().unbind(); + // Reads ctx['previous']['task_id'] (snake_case) and returns a spec. + let spec_cb = py + .eval( + c"lambda ctx: {'task_id': 'ps', 'agent': 'review', 'description': 'd', 'prompt': 'prev=' + str(ctx['previous']['task_id'])}", + None, + None, + ) + .unwrap() + .unbind(); + (none_cb, raise_cb, spec_cb) + }); + + assert!(PythonPipelineStage { callback: none_cb } + .invoke(None, &serde_json::json!({ "x": 1 })) + .is_none()); + assert!( + PythonPipelineStage { callback: raise_cb } + .invoke(None, &serde_json::json!({ "x": 1 })) + .is_none(), + "a raising stage fails closed to None" + ); + + let prev = RustStepOutcome { + task_id: "prior".into(), + session_id: "s".into(), + agent: "a".into(), + output: "o".into(), + success: true, + structured: None, + }; + let spec = PythonPipelineStage { callback: spec_cb } + .invoke(Some(&prev), &serde_json::json!({ "x": 1 })) + .expect("spec returned"); + assert_eq!(spec.task_id, "ps"); + assert!( + spec.prompt.contains("prior"), + "ctx['previous']['task_id'] (snake_case) was readable: {}", + spec.prompt + ); + } }