From 50eab373210006d472bb05fdc06fddeee71d4053 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 29 May 2026 16:38:43 +0800 Subject: [PATCH] feat(orchestration): resumable + migratable workflow checkpoints (Phase 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A workflow journals its completed steps so an interrupted run picks up from the last completed step — on this node or, since the checkpoint is serializable and the executor is a parameter, on another one (host-driven migration). Step-boundary analogue of LoopCheckpoint (which checkpoints at tool-round boundaries one level down). - WorkflowCheckpoint { schema_version, workflow_id, steps, checkpoint_ms } + ensure_loadable() (rejects future schema versions, mirrors LoopCheckpoint). - SessionStore gains save/load/delete_workflow_checkpoint (default no-ops); file store writes crash-atomically (temp+fsync+rename) and rejects future versions on load; memory store mirrors it. - execute_steps_parallel_resumable: loads prior progress, skips completed steps (reusing cached outcomes), runs only the rest, rewrites the checkpoint at each step boundary, merges in spec order, and clears on full success. Records ONLY successful steps — a failed step retries on resume (its effect didn't complete) while a succeeded step's work is never redone. Tests: checkpoint round-trip + future-version rejection + pre-v1 default; resumable skips-completed + clears-on-success (with a fresh executor = the migration path) + retains a successes-only checkpoint on partial failure; a real-LLM \#[ignore] resumable workflow passing against .a3s/config.acl. Store suite (36) + orchestration (14) green; fmt + clippy --lib --bins clean. --- core/src/lib.rs | 5 +- core/src/orchestration/checkpoint.rs | 139 ++++++++++++ core/src/orchestration/combinators.rs | 249 ++++++++++++++++++++++ core/src/orchestration/mod.rs | 4 +- core/src/store/file_store.rs | 93 ++++++++ core/src/store/memory_store.rs | 39 ++++ core/src/store/mod.rs | 28 +++ core/tests/test_orchestration_real_llm.rs | 37 +++- 8 files changed, 590 insertions(+), 4 deletions(-) create mode 100644 core/src/orchestration/checkpoint.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index 1a99c1d..3ee8975 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -136,8 +136,9 @@ pub use llm::{ Message, OpenAiClient, TokenUsage, }; pub use orchestration::{ - execute_pipeline, execute_steps_parallel, AgentExecutor, AgentStepSpec, PipelineStage, - StepOutcome, + execute_pipeline, execute_steps_parallel, execute_steps_parallel_resumable, AgentExecutor, + AgentStepSpec, PipelineStage, StepOutcome, WorkflowCheckpoint, WorkflowStepRecord, + WORKFLOW_CHECKPOINT_SCHEMA_VERSION, }; pub use prompts::{AgentStyle, DetectionConfidence, PlanningMode, SystemPromptSlots}; pub use run::{ diff --git a/core/src/orchestration/checkpoint.rs b/core/src/orchestration/checkpoint.rs new file mode 100644 index 0000000..0013f51 --- /dev/null +++ b/core/src/orchestration/checkpoint.rs @@ -0,0 +1,139 @@ +//! Workflow-level checkpoints: journal completed steps so an interrupted +//! orchestration resumes from the longest completed prefix — on this node or, +//! because the checkpoint is serializable and the executor is pluggable, on +//! another one (host-driven migration). +//! +//! This is the step-boundary analogue of [`LoopCheckpoint`](crate::loop_checkpoint::LoopCheckpoint), +//! which checkpoints at tool-round boundaries one level down. + +use super::executor::StepOutcome; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Schema version. Bumped on incompatible format changes; loads from a future +/// version are rejected (see [`WorkflowCheckpoint::ensure_loadable`]). +pub const WORKFLOW_CHECKPOINT_SCHEMA_VERSION: u32 = 1; + +/// One completed step within a workflow. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct WorkflowStepRecord { + /// Matches the [`AgentStepSpec::task_id`](super::AgentStepSpec) of the + /// step that produced this outcome. + pub task_id: String, + /// The completed step's result. + pub outcome: StepOutcome, +} + +/// Snapshot of a workflow's completed steps at a step boundary. +/// +/// (`StepOutcome` contains a `serde_json::Value`, which is not `Eq`, so this +/// derives `PartialEq` only.) +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct WorkflowCheckpoint { + /// Schema version — see [`WORKFLOW_CHECKPOINT_SCHEMA_VERSION`]. + #[serde(default)] + pub schema_version: u32, + /// Logical workflow identifier the checkpoint is keyed by. + pub workflow_id: String, + /// The steps completed so far. A resuming run skips these and re-dispatches + /// only the rest. + pub steps: Vec, + /// Wall-clock timestamp when the checkpoint was written (Unix epoch ms). + pub checkpoint_ms: u64, +} + +impl WorkflowCheckpoint { + /// Build a checkpoint from a map of completed `task_id -> outcome`. + pub fn from_completed( + workflow_id: impl Into, + completed: &HashMap, + checkpoint_ms: u64, + ) -> Self { + let steps = completed + .iter() + .map(|(task_id, outcome)| WorkflowStepRecord { + task_id: task_id.clone(), + outcome: outcome.clone(), + }) + .collect(); + Self { + schema_version: WORKFLOW_CHECKPOINT_SCHEMA_VERSION, + workflow_id: workflow_id.into(), + steps, + checkpoint_ms, + } + } + + /// The completed steps as a `task_id -> outcome` map. + pub fn completed(&self) -> HashMap { + self.steps + .iter() + .map(|r| (r.task_id.clone(), r.outcome.clone())) + .collect() + } + + /// Reject a checkpoint written by a *newer*, incompatible schema version + /// than this build understands — mirrors + /// [`LoopCheckpoint::ensure_loadable`](crate::loop_checkpoint::LoopCheckpoint::ensure_loadable). + /// Field additions are absorbed by `#[serde(default)]`, so older (incl. + /// pre-v1 `0`) checkpoints always remain loadable. + pub fn ensure_loadable(&self) -> anyhow::Result<()> { + if self.schema_version > WORKFLOW_CHECKPOINT_SCHEMA_VERSION { + anyhow::bail!( + "workflow checkpoint {} has schema version {} but this build supports at \ + most {}; refusing to resume from an incompatible future checkpoint", + self.workflow_id, + self.schema_version, + WORKFLOW_CHECKPOINT_SCHEMA_VERSION + ); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn outcome(id: &str) -> StepOutcome { + StepOutcome { + task_id: id.to_string(), + session_id: format!("task-run-{id}"), + agent: "a".to_string(), + output: "o".to_string(), + success: true, + structured: None, + } + } + + #[test] + fn round_trips_and_exposes_completed_map() { + let mut completed = HashMap::new(); + completed.insert("t1".to_string(), outcome("t1")); + let cp = WorkflowCheckpoint::from_completed("wf", &completed, 123); + let back: WorkflowCheckpoint = + serde_json::from_str(&serde_json::to_string(&cp).unwrap()).unwrap(); + assert_eq!(back, cp); + assert_eq!(back.schema_version, WORKFLOW_CHECKPOINT_SCHEMA_VERSION); + assert_eq!(back.checkpoint_ms, 123); + assert_eq!(back.completed().get("t1").unwrap().task_id, "t1"); + } + + #[test] + fn ensure_loadable_rejects_only_future_versions() { + let mut cp = WorkflowCheckpoint::from_completed("wf", &HashMap::new(), 0); + cp.ensure_loadable().expect("current version loadable"); + cp.schema_version = 0; + cp.ensure_loadable().expect("pre-v1 loadable"); + cp.schema_version = WORKFLOW_CHECKPOINT_SCHEMA_VERSION + 1; + let err = cp.ensure_loadable().unwrap_err(); + assert!(err.to_string().contains("schema version"), "got: {err}"); + } + + #[test] + fn pre_v1_payload_without_schema_version_loads() { + let json = r#"{"workflow_id":"wf","steps":[],"checkpoint_ms":0}"#; + let cp: WorkflowCheckpoint = serde_json::from_str(json).unwrap(); + assert_eq!(cp.schema_version, 0); + } +} diff --git a/core/src/orchestration/combinators.rs b/core/src/orchestration/combinators.rs index af5a087..a9beb43 100644 --- a/core/src/orchestration/combinators.rs +++ b/core/src/orchestration/combinators.rs @@ -5,12 +5,22 @@ //! one genuinely new scheduling shape, where each item flows through a chain //! of stages independently — no barrier between stages. +use super::checkpoint::WorkflowCheckpoint; use super::executor::{AgentExecutor, AgentStepSpec, StepOutcome}; use crate::agent::AgentEvent; use crate::ordered_parallel::run_ordered_parallel_with_limit; +use crate::store::SessionStore; +use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast; +fn now_epoch_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + /// A pipeline stage: given the previous stage's outcome (`None` before the /// first stage) and the original item, produce the next step to run — or /// `None` to stop this item's chain early. @@ -75,6 +85,123 @@ where .collect() } +/// Like [`execute_steps_parallel`](super::execute_steps_parallel), but +/// **resumable**: progress is journaled to `store` under `workflow_id`, so an +/// interrupted run picks up from the last completed step. +/// +/// On entry, any steps already recorded in a prior checkpoint are skipped and +/// their cached outcomes reused; only the rest are dispatched. As each step +/// completes, the checkpoint is rewritten (the step boundary), so a crash +/// mid-run loses at most the in-flight steps. Because the checkpoint is +/// serializable and the executor is a parameter, a host can resume an +/// interrupted workflow on a *different* node by passing that node's executor. +/// +/// Results are returned in the original `specs` order. On full success the +/// checkpoint is deleted (the workflow is terminal); only a crash leaves one +/// behind for resume. +pub async fn execute_steps_parallel_resumable( + executor: Arc, + specs: Vec, + workflow_id: &str, + store: Arc, + event_tx: Option>, +) -> Vec { + // Prior progress (the store's load rejects a future-version checkpoint). + let done: HashMap = match store.load_workflow_checkpoint(workflow_id).await + { + Ok(Some(cp)) => cp.completed(), + _ => HashMap::new(), + }; + + let pending: Vec = specs + .iter() + .filter(|s| !done.contains_key(&s.task_id)) + .cloned() + .collect(); + let labels: Vec<(String, String)> = pending + .iter() + .map(|s| (s.task_id.clone(), s.agent.clone())) + .collect(); + + // Accumulator seeded with prior progress; persisted at every step boundary. + let acc = Arc::new(tokio::sync::Mutex::new(done.clone())); + let limit = executor.concurrency_hint(); + let workflow_id_owned = workflow_id.to_string(); + let store_steps = Arc::clone(&store); + + let results = run_ordered_parallel_with_limit(pending, limit, move |_idx, spec| { + let executor = Arc::clone(&executor); + let event_tx = event_tx.clone(); + let acc = Arc::clone(&acc); + let store = Arc::clone(&store_steps); + let workflow_id = workflow_id_owned.clone(); + async move { + let outcome = executor.execute_step(spec, event_tx).await; + // Step boundary: record only *successful* steps, so a failed step + // is retried on resume (its effect didn't complete) while a + // succeeded step's work is never redone. + if outcome.success { + let mut guard = acc.lock().await; + guard.insert(outcome.task_id.clone(), outcome.clone()); + let checkpoint = + WorkflowCheckpoint::from_completed(&workflow_id, &guard, now_epoch_ms()); + if let Err(e) = store + .save_workflow_checkpoint(&workflow_id, &checkpoint) + .await + { + // Losing a checkpoint must not fail the live run. + tracing::warn!( + workflow_id = %workflow_id, + error = %e, + "workflow checkpoint save failed; run continues" + ); + } + } + outcome + } + }) + .await; + + let mut fresh: HashMap = HashMap::new(); + for result in results { + match result.output { + Ok(outcome) => { + fresh.insert(outcome.task_id.clone(), outcome); + } + Err(error) => { + if let Some((task_id, agent)) = labels.get(result.index).cloned() { + fresh.insert( + task_id.clone(), + StepOutcome::failed(task_id, agent, error.to_string()), + ); + } + } + } + } + + // Merge cached + freshly-run, in the original spec order. + let merged: Vec = specs + .iter() + .map(|s| { + done.get(&s.task_id) + .cloned() + .or_else(|| fresh.remove(&s.task_id)) + .unwrap_or_else(|| { + StepOutcome::failed( + s.task_id.clone(), + s.agent.clone(), + "step produced no outcome", + ) + }) + }) + .collect(); + + if merged.iter().all(|o| o.success) { + let _ = store.delete_workflow_checkpoint(workflow_id).await; + } + merged +} + #[cfg(test)] mod tests { use super::*; @@ -237,4 +364,126 @@ mod tests { assert!(out[1].is_none(), "panicked chain becomes None, not a drop"); assert!(out[2].as_ref().unwrap().success, "later chains unaffected"); } + + /// Records which task ids it actually ran; always succeeds. + struct RecordingExecutor { + ran: Arc>>, + } + + #[async_trait] + impl AgentExecutor for RecordingExecutor { + async fn execute_step( + &self, + spec: AgentStepSpec, + _event_tx: Option>, + ) -> StepOutcome { + self.ran.lock().await.push(spec.task_id.clone()); + StepOutcome { + task_id: spec.task_id.clone(), + session_id: format!("task-run-{}", spec.task_id), + agent: spec.agent.clone(), + output: format!("ran:{}", spec.task_id), + success: true, + structured: None, + } + } + fn concurrency_hint(&self) -> usize { + 4 + } + } + + #[tokio::test] + async fn resumable_skips_completed_then_clears_on_success() { + use crate::store::MemorySessionStore; + let store: Arc = Arc::new(MemorySessionStore::new()); + + // Pre-seed: step "a" already completed on a prior run (possibly on + // another node — this exercises the migration path too). + let mut done = std::collections::HashMap::new(); + done.insert( + "a".to_string(), + StepOutcome { + task_id: "a".into(), + session_id: "task-run-a".into(), + agent: "explore".into(), + output: "cached-a".into(), + success: true, + structured: None, + }, + ); + store + .save_workflow_checkpoint( + "wf-1", + &WorkflowCheckpoint::from_completed("wf-1", &done, 1), + ) + .await + .unwrap(); + + // A FRESH executor resumes (the node that runs the rest is not the one + // that completed "a"). + let ran = Arc::new(tokio::sync::Mutex::new(Vec::new())); + let exec: Arc = Arc::new(RecordingExecutor { + ran: Arc::clone(&ran), + }); + let specs = vec![ + AgentStepSpec::new("a", "explore", "d", "pa"), + AgentStepSpec::new("b", "review", "d", "pb"), + ]; + + let out = + execute_steps_parallel_resumable(exec, specs, "wf-1", Arc::clone(&store), None).await; + + assert_eq!( + *ran.lock().await, + vec!["b".to_string()], + "only the not-yet-completed step runs" + ); + assert_eq!(out.len(), 2); + assert_eq!(out[0].task_id, "a"); + assert_eq!( + out[0].output, "cached-a", + "completed step returns its cached outcome, unchanged" + ); + assert_eq!(out[1].task_id, "b"); + assert!(out.iter().all(|o| o.success)); + assert!( + store + .load_workflow_checkpoint("wf-1") + .await + .unwrap() + .is_none(), + "a fully-succeeded workflow clears its checkpoint" + ); + } + + #[tokio::test] + async fn resumable_retains_checkpoint_recording_only_successes_on_partial_failure() { + use crate::store::MemorySessionStore; + let store: Arc = Arc::new(MemorySessionStore::new()); + // EchoExecutor fails the agent named "fail". + let exec: Arc = Arc::new(EchoExecutor::new()); + let specs = vec![ + AgentStepSpec::new("ok", "explore", "d", "p"), + AgentStepSpec::new("bad", "fail", "d", "p"), + ]; + + let out = + execute_steps_parallel_resumable(exec, specs, "wf-2", Arc::clone(&store), None).await; + assert!(out[0].success); + assert!(!out[1].success); + + // Not all succeeded → checkpoint retained, recording only the success + // so the failed step retries on resume. + let cp = store + .load_workflow_checkpoint("wf-2") + .await + .unwrap() + .expect("checkpoint retained on partial failure"); + let completed = cp.completed(); + assert!(completed.contains_key("ok"), "succeeded step is recorded"); + assert!( + !completed.contains_key("bad"), + "failed step is NOT recorded → it retries on resume" + ); + } } diff --git a/core/src/orchestration/mod.rs b/core/src/orchestration/mod.rs index 04c778c..a7c2c21 100644 --- a/core/src/orchestration/mod.rs +++ b/core/src/orchestration/mod.rs @@ -21,8 +21,10 @@ //! //! The in-box implementation is [`crate::tools::TaskExecutor`]. +mod checkpoint; mod combinators; mod executor; -pub use combinators::{execute_pipeline, PipelineStage}; +pub use checkpoint::{WorkflowCheckpoint, WorkflowStepRecord, WORKFLOW_CHECKPOINT_SCHEMA_VERSION}; +pub use combinators::{execute_pipeline, execute_steps_parallel_resumable, PipelineStage}; pub use executor::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome}; diff --git a/core/src/store/file_store.rs b/core/src/store/file_store.rs index 1ea5c89..3015ac0 100644 --- a/core/src/store/file_store.rs +++ b/core/src/store/file_store.rs @@ -1,5 +1,6 @@ use super::{SessionData, SessionStore}; use crate::loop_checkpoint::LoopCheckpoint; +use crate::orchestration::WorkflowCheckpoint; use crate::run::RunRecord; use crate::subagent_task_tracker::SubagentTaskSnapshot; use crate::tools::ArtifactStore; @@ -81,6 +82,12 @@ impl FileSessionStore { .join("loop_checkpoints") .join(format!("{}.json", safe_session_id(run_id))) } + + fn workflow_checkpoint_path(&self, workflow_id: &str) -> PathBuf { + self.dir + .join("workflow_checkpoints") + .join(format!("{}.json", safe_session_id(workflow_id))) + } } fn safe_session_id(id: &str) -> String { @@ -489,6 +496,92 @@ impl SessionStore for FileSessionStore { Ok(()) } + async fn save_workflow_checkpoint( + &self, + workflow_id: &str, + checkpoint: &WorkflowCheckpoint, + ) -> Result<()> { + let path = self.workflow_checkpoint_path(workflow_id); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await.with_context(|| { + format!( + "Failed to create workflow checkpoint directory: {}", + parent.display() + ) + })?; + } + let json = serde_json::to_string_pretty(checkpoint).with_context(|| { + format!("Failed to serialize workflow checkpoint for {workflow_id}") + })?; + + // Crash-atomic write (temp + fsync + rename) — same rationale as + // loop checkpoints: a truncated checkpoint would fail to resume. + let unique_suffix = format!( + "{}.{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or(0), + std::process::id() + ); + let temp_path = path.with_extension(format!("json.{}.tmp", unique_suffix)); + let mut file = fs::File::create(&temp_path).await.with_context(|| { + format!( + "Failed to create workflow checkpoint temp file: {}", + temp_path.display() + ) + })?; + file.write_all(json.as_bytes()) + .await + .with_context(|| format!("Failed to write workflow checkpoint for {workflow_id}"))?; + file.sync_all() + .await + .with_context(|| format!("Failed to fsync workflow checkpoint for {workflow_id}"))?; + fs::rename(&temp_path, &path).await.with_context(|| { + format!( + "Failed to rename workflow checkpoint into place: {}", + path.display() + ) + })?; + Ok(()) + } + + async fn load_workflow_checkpoint( + &self, + workflow_id: &str, + ) -> Result> { + let path = self.workflow_checkpoint_path(workflow_id); + if !path.exists() { + return Ok(None); + } + let json = fs::read_to_string(&path).await.with_context(|| { + format!("Failed to read workflow checkpoint from {}", path.display()) + })?; + let checkpoint: WorkflowCheckpoint = serde_json::from_str(&json).with_context(|| { + format!( + "Failed to parse workflow checkpoint from {}", + path.display() + ) + })?; + // Refuse a future, incompatible schema version (see ensure_loadable). + checkpoint.ensure_loadable()?; + Ok(Some(checkpoint)) + } + + async fn delete_workflow_checkpoint(&self, workflow_id: &str) -> Result<()> { + let path = self.workflow_checkpoint_path(workflow_id); + if path.exists() { + fs::remove_file(&path).await.with_context(|| { + format!( + "Failed to delete workflow checkpoint for {}: {}", + workflow_id, + path.display() + ) + })?; + } + Ok(()) + } + async fn health_check(&self) -> Result<()> { // Verify directory exists and is writable let probe = self.dir.join(".health_check"); diff --git a/core/src/store/memory_store.rs b/core/src/store/memory_store.rs index 5aeb8d8..627b995 100644 --- a/core/src/store/memory_store.rs +++ b/core/src/store/memory_store.rs @@ -1,5 +1,6 @@ use super::{SessionData, SessionStore}; use crate::loop_checkpoint::LoopCheckpoint; +use crate::orchestration::WorkflowCheckpoint; use crate::run::RunRecord; use crate::subagent_task_tracker::SubagentTaskSnapshot; use crate::tools::ArtifactStore; @@ -21,6 +22,7 @@ pub struct MemorySessionStore { verification_reports: tokio::sync::RwLock>>, subagent_tasks: tokio::sync::RwLock>>, loop_checkpoints: tokio::sync::RwLock>, + workflow_checkpoints: tokio::sync::RwLock>, } impl MemorySessionStore { @@ -33,6 +35,7 @@ impl MemorySessionStore { verification_reports: tokio::sync::RwLock::new(HashMap::new()), subagent_tasks: tokio::sync::RwLock::new(HashMap::new()), loop_checkpoints: tokio::sync::RwLock::new(HashMap::new()), + workflow_checkpoints: tokio::sync::RwLock::new(HashMap::new()), } } } @@ -170,6 +173,42 @@ impl SessionStore for MemorySessionStore { Ok(()) } + async fn save_workflow_checkpoint( + &self, + workflow_id: &str, + checkpoint: &WorkflowCheckpoint, + ) -> Result<()> { + self.workflow_checkpoints + .write() + .await + .insert(workflow_id.to_string(), checkpoint.clone()); + Ok(()) + } + + async fn load_workflow_checkpoint( + &self, + workflow_id: &str, + ) -> Result> { + match self + .workflow_checkpoints + .read() + .await + .get(workflow_id) + .cloned() + { + Some(cp) => { + cp.ensure_loadable()?; + Ok(Some(cp)) + } + None => Ok(None), + } + } + + async fn delete_workflow_checkpoint(&self, workflow_id: &str) -> Result<()> { + self.workflow_checkpoints.write().await.remove(workflow_id); + Ok(()) + } + fn backend_name(&self) -> &str { "memory" } diff --git a/core/src/store/mod.rs b/core/src/store/mod.rs index 553ae9d..e80a76e 100644 --- a/core/src/store/mod.rs +++ b/core/src/store/mod.rs @@ -170,6 +170,34 @@ pub trait SessionStore: Send + Sync { Ok(()) } + /// Persist a workflow checkpoint, overwriting any earlier one for the same + /// `workflow_id`. The resumable orchestration combinators call this at each + /// step boundary so an interrupted workflow resumes from the last + /// completed step (here or, after migration, on another node). + async fn save_workflow_checkpoint( + &self, + _workflow_id: &str, + _checkpoint: &crate::orchestration::WorkflowCheckpoint, + ) -> Result<()> { + Ok(()) + } + + /// Load the latest workflow checkpoint for `workflow_id`. + async fn load_workflow_checkpoint( + &self, + _workflow_id: &str, + ) -> Result> { + Ok(None) + } + + /// Delete the workflow checkpoint for `workflow_id`, if present. Called + /// when a workflow reaches a terminal state in-process; only a crash should + /// leave one behind for resume. Deleting a non-existent checkpoint is a + /// no-op success. + async fn delete_workflow_checkpoint(&self, _workflow_id: &str) -> Result<()> { + Ok(()) + } + /// Health check — verify the store backend is reachable and operational async fn health_check(&self) -> Result<()> { Ok(()) diff --git a/core/tests/test_orchestration_real_llm.rs b/core/tests/test_orchestration_real_llm.rs index 0d2c8b8..bcde5f8 100644 --- a/core/tests/test_orchestration_real_llm.rs +++ b/core/tests/test_orchestration_real_llm.rs @@ -13,8 +13,10 @@ use std::sync::Arc; use a3s_code_core::config::CodeConfig; use a3s_code_core::llm::create_client_with_config; use a3s_code_core::orchestration::{ - execute_pipeline, AgentExecutor, AgentStepSpec, PipelineStage, StepOutcome, + execute_pipeline, execute_steps_parallel_resumable, AgentExecutor, AgentStepSpec, + PipelineStage, StepOutcome, }; +use a3s_code_core::store::{MemorySessionStore, SessionStore}; use a3s_code_core::subagent::AgentRegistry; use a3s_code_core::tools::TaskExecutor; @@ -149,3 +151,36 @@ async fn real_pipeline_chains_two_agent_stages() { "stage 2 produced output derived from stage 1" ); } + +/// Phase 4: a resumable workflow runs real steps, journals progress to a +/// store, and clears its checkpoint on full success. +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_resumable_workflow_runs_and_clears_checkpoint() { + let (executor, _workspace) = local_executor(); + let exec: Arc = Arc::new(executor); + let store: Arc = Arc::new(MemorySessionStore::new()); + + let specs = vec![ + AgentStepSpec::new("rw-1", "general", "q1", "Reply with one word: ready").with_max_steps(2), + AgentStepSpec::new("rw-2", "general", "q2", "Reply with one word: go").with_max_steps(2), + ]; + + let out = + execute_steps_parallel_resumable(exec, specs, "real-wf", Arc::clone(&store), None).await; + + assert_eq!(out.len(), 2); + assert!( + out.iter().all(|o| o.success), + "both steps succeed: {:?}", + out.iter().map(|o| &o.output).collect::>() + ); + assert!( + store + .load_workflow_checkpoint("real-wf") + .await + .unwrap() + .is_none(), + "a fully-succeeded workflow clears its checkpoint" + ); +}