diff --git a/core/src/lib.rs b/core/src/lib.rs index 1a99c1d..3ee8975 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -136,8 +136,9 @@ pub use llm::{ Message, OpenAiClient, TokenUsage, }; pub use orchestration::{ - execute_pipeline, execute_steps_parallel, AgentExecutor, AgentStepSpec, PipelineStage, - StepOutcome, + execute_pipeline, execute_steps_parallel, execute_steps_parallel_resumable, AgentExecutor, + AgentStepSpec, PipelineStage, StepOutcome, WorkflowCheckpoint, WorkflowStepRecord, + WORKFLOW_CHECKPOINT_SCHEMA_VERSION, }; pub use prompts::{AgentStyle, DetectionConfidence, PlanningMode, SystemPromptSlots}; pub use run::{ diff --git a/core/src/orchestration/checkpoint.rs b/core/src/orchestration/checkpoint.rs new file mode 100644 index 0000000..0013f51 --- /dev/null +++ b/core/src/orchestration/checkpoint.rs @@ -0,0 +1,139 @@ +//! Workflow-level checkpoints: journal completed steps so an interrupted +//! orchestration resumes from the longest completed prefix — on this node or, +//! because the checkpoint is serializable and the executor is pluggable, on +//! another one (host-driven migration). +//! +//! This is the step-boundary analogue of [`LoopCheckpoint`](crate::loop_checkpoint::LoopCheckpoint), +//! which checkpoints at tool-round boundaries one level down. + +use super::executor::StepOutcome; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Schema version. Bumped on incompatible format changes; loads from a future +/// version are rejected (see [`WorkflowCheckpoint::ensure_loadable`]). +pub const WORKFLOW_CHECKPOINT_SCHEMA_VERSION: u32 = 1; + +/// One completed step within a workflow. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct WorkflowStepRecord { + /// Matches the [`AgentStepSpec::task_id`](super::AgentStepSpec) of the + /// step that produced this outcome. + pub task_id: String, + /// The completed step's result. + pub outcome: StepOutcome, +} + +/// Snapshot of a workflow's completed steps at a step boundary. +/// +/// (`StepOutcome` contains a `serde_json::Value`, which is not `Eq`, so this +/// derives `PartialEq` only.) +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct WorkflowCheckpoint { + /// Schema version — see [`WORKFLOW_CHECKPOINT_SCHEMA_VERSION`]. + #[serde(default)] + pub schema_version: u32, + /// Logical workflow identifier the checkpoint is keyed by. + pub workflow_id: String, + /// The steps completed so far. A resuming run skips these and re-dispatches + /// only the rest. + pub steps: Vec, + /// Wall-clock timestamp when the checkpoint was written (Unix epoch ms). + pub checkpoint_ms: u64, +} + +impl WorkflowCheckpoint { + /// Build a checkpoint from a map of completed `task_id -> outcome`. + pub fn from_completed( + workflow_id: impl Into, + completed: &HashMap, + checkpoint_ms: u64, + ) -> Self { + let steps = completed + .iter() + .map(|(task_id, outcome)| WorkflowStepRecord { + task_id: task_id.clone(), + outcome: outcome.clone(), + }) + .collect(); + Self { + schema_version: WORKFLOW_CHECKPOINT_SCHEMA_VERSION, + workflow_id: workflow_id.into(), + steps, + checkpoint_ms, + } + } + + /// The completed steps as a `task_id -> outcome` map. + pub fn completed(&self) -> HashMap { + self.steps + .iter() + .map(|r| (r.task_id.clone(), r.outcome.clone())) + .collect() + } + + /// Reject a checkpoint written by a *newer*, incompatible schema version + /// than this build understands — mirrors + /// [`LoopCheckpoint::ensure_loadable`](crate::loop_checkpoint::LoopCheckpoint::ensure_loadable). + /// Field additions are absorbed by `#[serde(default)]`, so older (incl. + /// pre-v1 `0`) checkpoints always remain loadable. + pub fn ensure_loadable(&self) -> anyhow::Result<()> { + if self.schema_version > WORKFLOW_CHECKPOINT_SCHEMA_VERSION { + anyhow::bail!( + "workflow checkpoint {} has schema version {} but this build supports at \ + most {}; refusing to resume from an incompatible future checkpoint", + self.workflow_id, + self.schema_version, + WORKFLOW_CHECKPOINT_SCHEMA_VERSION + ); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn outcome(id: &str) -> StepOutcome { + StepOutcome { + task_id: id.to_string(), + session_id: format!("task-run-{id}"), + agent: "a".to_string(), + output: "o".to_string(), + success: true, + structured: None, + } + } + + #[test] + fn round_trips_and_exposes_completed_map() { + let mut completed = HashMap::new(); + completed.insert("t1".to_string(), outcome("t1")); + let cp = WorkflowCheckpoint::from_completed("wf", &completed, 123); + let back: WorkflowCheckpoint = + serde_json::from_str(&serde_json::to_string(&cp).unwrap()).unwrap(); + assert_eq!(back, cp); + assert_eq!(back.schema_version, WORKFLOW_CHECKPOINT_SCHEMA_VERSION); + assert_eq!(back.checkpoint_ms, 123); + assert_eq!(back.completed().get("t1").unwrap().task_id, "t1"); + } + + #[test] + fn ensure_loadable_rejects_only_future_versions() { + let mut cp = WorkflowCheckpoint::from_completed("wf", &HashMap::new(), 0); + cp.ensure_loadable().expect("current version loadable"); + cp.schema_version = 0; + cp.ensure_loadable().expect("pre-v1 loadable"); + cp.schema_version = WORKFLOW_CHECKPOINT_SCHEMA_VERSION + 1; + let err = cp.ensure_loadable().unwrap_err(); + assert!(err.to_string().contains("schema version"), "got: {err}"); + } + + #[test] + fn pre_v1_payload_without_schema_version_loads() { + let json = r#"{"workflow_id":"wf","steps":[],"checkpoint_ms":0}"#; + let cp: WorkflowCheckpoint = serde_json::from_str(json).unwrap(); + assert_eq!(cp.schema_version, 0); + } +} diff --git a/core/src/orchestration/combinators.rs b/core/src/orchestration/combinators.rs index af5a087..a9beb43 100644 --- a/core/src/orchestration/combinators.rs +++ b/core/src/orchestration/combinators.rs @@ -5,12 +5,22 @@ //! one genuinely new scheduling shape, where each item flows through a chain //! of stages independently — no barrier between stages. +use super::checkpoint::WorkflowCheckpoint; use super::executor::{AgentExecutor, AgentStepSpec, StepOutcome}; use crate::agent::AgentEvent; use crate::ordered_parallel::run_ordered_parallel_with_limit; +use crate::store::SessionStore; +use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast; +fn now_epoch_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + /// A pipeline stage: given the previous stage's outcome (`None` before the /// first stage) and the original item, produce the next step to run — or /// `None` to stop this item's chain early. @@ -75,6 +85,123 @@ where .collect() } +/// Like [`execute_steps_parallel`](super::execute_steps_parallel), but +/// **resumable**: progress is journaled to `store` under `workflow_id`, so an +/// interrupted run picks up from the last completed step. +/// +/// On entry, any steps already recorded in a prior checkpoint are skipped and +/// their cached outcomes reused; only the rest are dispatched. As each step +/// completes, the checkpoint is rewritten (the step boundary), so a crash +/// mid-run loses at most the in-flight steps. Because the checkpoint is +/// serializable and the executor is a parameter, a host can resume an +/// interrupted workflow on a *different* node by passing that node's executor. +/// +/// Results are returned in the original `specs` order. On full success the +/// checkpoint is deleted (the workflow is terminal); only a crash leaves one +/// behind for resume. +pub async fn execute_steps_parallel_resumable( + executor: Arc, + specs: Vec, + workflow_id: &str, + store: Arc, + event_tx: Option>, +) -> Vec { + // Prior progress (the store's load rejects a future-version checkpoint). + let done: HashMap = match store.load_workflow_checkpoint(workflow_id).await + { + Ok(Some(cp)) => cp.completed(), + _ => HashMap::new(), + }; + + let pending: Vec = specs + .iter() + .filter(|s| !done.contains_key(&s.task_id)) + .cloned() + .collect(); + let labels: Vec<(String, String)> = pending + .iter() + .map(|s| (s.task_id.clone(), s.agent.clone())) + .collect(); + + // Accumulator seeded with prior progress; persisted at every step boundary. + let acc = Arc::new(tokio::sync::Mutex::new(done.clone())); + let limit = executor.concurrency_hint(); + let workflow_id_owned = workflow_id.to_string(); + let store_steps = Arc::clone(&store); + + let results = run_ordered_parallel_with_limit(pending, limit, move |_idx, spec| { + let executor = Arc::clone(&executor); + let event_tx = event_tx.clone(); + let acc = Arc::clone(&acc); + let store = Arc::clone(&store_steps); + let workflow_id = workflow_id_owned.clone(); + async move { + let outcome = executor.execute_step(spec, event_tx).await; + // Step boundary: record only *successful* steps, so a failed step + // is retried on resume (its effect didn't complete) while a + // succeeded step's work is never redone. + if outcome.success { + let mut guard = acc.lock().await; + guard.insert(outcome.task_id.clone(), outcome.clone()); + let checkpoint = + WorkflowCheckpoint::from_completed(&workflow_id, &guard, now_epoch_ms()); + if let Err(e) = store + .save_workflow_checkpoint(&workflow_id, &checkpoint) + .await + { + // Losing a checkpoint must not fail the live run. + tracing::warn!( + workflow_id = %workflow_id, + error = %e, + "workflow checkpoint save failed; run continues" + ); + } + } + outcome + } + }) + .await; + + let mut fresh: HashMap = HashMap::new(); + for result in results { + match result.output { + Ok(outcome) => { + fresh.insert(outcome.task_id.clone(), outcome); + } + Err(error) => { + if let Some((task_id, agent)) = labels.get(result.index).cloned() { + fresh.insert( + task_id.clone(), + StepOutcome::failed(task_id, agent, error.to_string()), + ); + } + } + } + } + + // Merge cached + freshly-run, in the original spec order. + let merged: Vec = specs + .iter() + .map(|s| { + done.get(&s.task_id) + .cloned() + .or_else(|| fresh.remove(&s.task_id)) + .unwrap_or_else(|| { + StepOutcome::failed( + s.task_id.clone(), + s.agent.clone(), + "step produced no outcome", + ) + }) + }) + .collect(); + + if merged.iter().all(|o| o.success) { + let _ = store.delete_workflow_checkpoint(workflow_id).await; + } + merged +} + #[cfg(test)] mod tests { use super::*; @@ -237,4 +364,126 @@ mod tests { assert!(out[1].is_none(), "panicked chain becomes None, not a drop"); assert!(out[2].as_ref().unwrap().success, "later chains unaffected"); } + + /// Records which task ids it actually ran; always succeeds. + struct RecordingExecutor { + ran: Arc>>, + } + + #[async_trait] + impl AgentExecutor for RecordingExecutor { + async fn execute_step( + &self, + spec: AgentStepSpec, + _event_tx: Option>, + ) -> StepOutcome { + self.ran.lock().await.push(spec.task_id.clone()); + StepOutcome { + task_id: spec.task_id.clone(), + session_id: format!("task-run-{}", spec.task_id), + agent: spec.agent.clone(), + output: format!("ran:{}", spec.task_id), + success: true, + structured: None, + } + } + fn concurrency_hint(&self) -> usize { + 4 + } + } + + #[tokio::test] + async fn resumable_skips_completed_then_clears_on_success() { + use crate::store::MemorySessionStore; + let store: Arc = Arc::new(MemorySessionStore::new()); + + // Pre-seed: step "a" already completed on a prior run (possibly on + // another node — this exercises the migration path too). + let mut done = std::collections::HashMap::new(); + done.insert( + "a".to_string(), + StepOutcome { + task_id: "a".into(), + session_id: "task-run-a".into(), + agent: "explore".into(), + output: "cached-a".into(), + success: true, + structured: None, + }, + ); + store + .save_workflow_checkpoint( + "wf-1", + &WorkflowCheckpoint::from_completed("wf-1", &done, 1), + ) + .await + .unwrap(); + + // A FRESH executor resumes (the node that runs the rest is not the one + // that completed "a"). + let ran = Arc::new(tokio::sync::Mutex::new(Vec::new())); + let exec: Arc = Arc::new(RecordingExecutor { + ran: Arc::clone(&ran), + }); + let specs = vec![ + AgentStepSpec::new("a", "explore", "d", "pa"), + AgentStepSpec::new("b", "review", "d", "pb"), + ]; + + let out = + execute_steps_parallel_resumable(exec, specs, "wf-1", Arc::clone(&store), None).await; + + assert_eq!( + *ran.lock().await, + vec!["b".to_string()], + "only the not-yet-completed step runs" + ); + assert_eq!(out.len(), 2); + assert_eq!(out[0].task_id, "a"); + assert_eq!( + out[0].output, "cached-a", + "completed step returns its cached outcome, unchanged" + ); + assert_eq!(out[1].task_id, "b"); + assert!(out.iter().all(|o| o.success)); + assert!( + store + .load_workflow_checkpoint("wf-1") + .await + .unwrap() + .is_none(), + "a fully-succeeded workflow clears its checkpoint" + ); + } + + #[tokio::test] + async fn resumable_retains_checkpoint_recording_only_successes_on_partial_failure() { + use crate::store::MemorySessionStore; + let store: Arc = Arc::new(MemorySessionStore::new()); + // EchoExecutor fails the agent named "fail". + let exec: Arc = Arc::new(EchoExecutor::new()); + let specs = vec![ + AgentStepSpec::new("ok", "explore", "d", "p"), + AgentStepSpec::new("bad", "fail", "d", "p"), + ]; + + let out = + execute_steps_parallel_resumable(exec, specs, "wf-2", Arc::clone(&store), None).await; + assert!(out[0].success); + assert!(!out[1].success); + + // Not all succeeded → checkpoint retained, recording only the success + // so the failed step retries on resume. + let cp = store + .load_workflow_checkpoint("wf-2") + .await + .unwrap() + .expect("checkpoint retained on partial failure"); + let completed = cp.completed(); + assert!(completed.contains_key("ok"), "succeeded step is recorded"); + assert!( + !completed.contains_key("bad"), + "failed step is NOT recorded → it retries on resume" + ); + } } diff --git a/core/src/orchestration/mod.rs b/core/src/orchestration/mod.rs index 04c778c..a7c2c21 100644 --- a/core/src/orchestration/mod.rs +++ b/core/src/orchestration/mod.rs @@ -21,8 +21,10 @@ //! //! The in-box implementation is [`crate::tools::TaskExecutor`]. +mod checkpoint; mod combinators; mod executor; -pub use combinators::{execute_pipeline, PipelineStage}; +pub use checkpoint::{WorkflowCheckpoint, WorkflowStepRecord, WORKFLOW_CHECKPOINT_SCHEMA_VERSION}; +pub use combinators::{execute_pipeline, execute_steps_parallel_resumable, PipelineStage}; pub use executor::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome}; diff --git a/core/src/store/file_store.rs b/core/src/store/file_store.rs index 1ea5c89..3015ac0 100644 --- a/core/src/store/file_store.rs +++ b/core/src/store/file_store.rs @@ -1,5 +1,6 @@ use super::{SessionData, SessionStore}; use crate::loop_checkpoint::LoopCheckpoint; +use crate::orchestration::WorkflowCheckpoint; use crate::run::RunRecord; use crate::subagent_task_tracker::SubagentTaskSnapshot; use crate::tools::ArtifactStore; @@ -81,6 +82,12 @@ impl FileSessionStore { .join("loop_checkpoints") .join(format!("{}.json", safe_session_id(run_id))) } + + fn workflow_checkpoint_path(&self, workflow_id: &str) -> PathBuf { + self.dir + .join("workflow_checkpoints") + .join(format!("{}.json", safe_session_id(workflow_id))) + } } fn safe_session_id(id: &str) -> String { @@ -489,6 +496,92 @@ impl SessionStore for FileSessionStore { Ok(()) } + async fn save_workflow_checkpoint( + &self, + workflow_id: &str, + checkpoint: &WorkflowCheckpoint, + ) -> Result<()> { + let path = self.workflow_checkpoint_path(workflow_id); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await.with_context(|| { + format!( + "Failed to create workflow checkpoint directory: {}", + parent.display() + ) + })?; + } + let json = serde_json::to_string_pretty(checkpoint).with_context(|| { + format!("Failed to serialize workflow checkpoint for {workflow_id}") + })?; + + // Crash-atomic write (temp + fsync + rename) — same rationale as + // loop checkpoints: a truncated checkpoint would fail to resume. + let unique_suffix = format!( + "{}.{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or(0), + std::process::id() + ); + let temp_path = path.with_extension(format!("json.{}.tmp", unique_suffix)); + let mut file = fs::File::create(&temp_path).await.with_context(|| { + format!( + "Failed to create workflow checkpoint temp file: {}", + temp_path.display() + ) + })?; + file.write_all(json.as_bytes()) + .await + .with_context(|| format!("Failed to write workflow checkpoint for {workflow_id}"))?; + file.sync_all() + .await + .with_context(|| format!("Failed to fsync workflow checkpoint for {workflow_id}"))?; + fs::rename(&temp_path, &path).await.with_context(|| { + format!( + "Failed to rename workflow checkpoint into place: {}", + path.display() + ) + })?; + Ok(()) + } + + async fn load_workflow_checkpoint( + &self, + workflow_id: &str, + ) -> Result> { + let path = self.workflow_checkpoint_path(workflow_id); + if !path.exists() { + return Ok(None); + } + let json = fs::read_to_string(&path).await.with_context(|| { + format!("Failed to read workflow checkpoint from {}", path.display()) + })?; + let checkpoint: WorkflowCheckpoint = serde_json::from_str(&json).with_context(|| { + format!( + "Failed to parse workflow checkpoint from {}", + path.display() + ) + })?; + // Refuse a future, incompatible schema version (see ensure_loadable). + checkpoint.ensure_loadable()?; + Ok(Some(checkpoint)) + } + + async fn delete_workflow_checkpoint(&self, workflow_id: &str) -> Result<()> { + let path = self.workflow_checkpoint_path(workflow_id); + if path.exists() { + fs::remove_file(&path).await.with_context(|| { + format!( + "Failed to delete workflow checkpoint for {}: {}", + workflow_id, + path.display() + ) + })?; + } + Ok(()) + } + async fn health_check(&self) -> Result<()> { // Verify directory exists and is writable let probe = self.dir.join(".health_check"); diff --git a/core/src/store/memory_store.rs b/core/src/store/memory_store.rs index 5aeb8d8..627b995 100644 --- a/core/src/store/memory_store.rs +++ b/core/src/store/memory_store.rs @@ -1,5 +1,6 @@ use super::{SessionData, SessionStore}; use crate::loop_checkpoint::LoopCheckpoint; +use crate::orchestration::WorkflowCheckpoint; use crate::run::RunRecord; use crate::subagent_task_tracker::SubagentTaskSnapshot; use crate::tools::ArtifactStore; @@ -21,6 +22,7 @@ pub struct MemorySessionStore { verification_reports: tokio::sync::RwLock>>, subagent_tasks: tokio::sync::RwLock>>, loop_checkpoints: tokio::sync::RwLock>, + workflow_checkpoints: tokio::sync::RwLock>, } impl MemorySessionStore { @@ -33,6 +35,7 @@ impl MemorySessionStore { verification_reports: tokio::sync::RwLock::new(HashMap::new()), subagent_tasks: tokio::sync::RwLock::new(HashMap::new()), loop_checkpoints: tokio::sync::RwLock::new(HashMap::new()), + workflow_checkpoints: tokio::sync::RwLock::new(HashMap::new()), } } } @@ -170,6 +173,42 @@ impl SessionStore for MemorySessionStore { Ok(()) } + async fn save_workflow_checkpoint( + &self, + workflow_id: &str, + checkpoint: &WorkflowCheckpoint, + ) -> Result<()> { + self.workflow_checkpoints + .write() + .await + .insert(workflow_id.to_string(), checkpoint.clone()); + Ok(()) + } + + async fn load_workflow_checkpoint( + &self, + workflow_id: &str, + ) -> Result> { + match self + .workflow_checkpoints + .read() + .await + .get(workflow_id) + .cloned() + { + Some(cp) => { + cp.ensure_loadable()?; + Ok(Some(cp)) + } + None => Ok(None), + } + } + + async fn delete_workflow_checkpoint(&self, workflow_id: &str) -> Result<()> { + self.workflow_checkpoints.write().await.remove(workflow_id); + Ok(()) + } + fn backend_name(&self) -> &str { "memory" } diff --git a/core/src/store/mod.rs b/core/src/store/mod.rs index 553ae9d..e80a76e 100644 --- a/core/src/store/mod.rs +++ b/core/src/store/mod.rs @@ -170,6 +170,34 @@ pub trait SessionStore: Send + Sync { Ok(()) } + /// Persist a workflow checkpoint, overwriting any earlier one for the same + /// `workflow_id`. The resumable orchestration combinators call this at each + /// step boundary so an interrupted workflow resumes from the last + /// completed step (here or, after migration, on another node). + async fn save_workflow_checkpoint( + &self, + _workflow_id: &str, + _checkpoint: &crate::orchestration::WorkflowCheckpoint, + ) -> Result<()> { + Ok(()) + } + + /// Load the latest workflow checkpoint for `workflow_id`. + async fn load_workflow_checkpoint( + &self, + _workflow_id: &str, + ) -> Result> { + Ok(None) + } + + /// Delete the workflow checkpoint for `workflow_id`, if present. Called + /// when a workflow reaches a terminal state in-process; only a crash should + /// leave one behind for resume. Deleting a non-existent checkpoint is a + /// no-op success. + async fn delete_workflow_checkpoint(&self, _workflow_id: &str) -> Result<()> { + Ok(()) + } + /// Health check — verify the store backend is reachable and operational async fn health_check(&self) -> Result<()> { Ok(()) diff --git a/core/tests/test_orchestration_real_llm.rs b/core/tests/test_orchestration_real_llm.rs index 0d2c8b8..bcde5f8 100644 --- a/core/tests/test_orchestration_real_llm.rs +++ b/core/tests/test_orchestration_real_llm.rs @@ -13,8 +13,10 @@ use std::sync::Arc; use a3s_code_core::config::CodeConfig; use a3s_code_core::llm::create_client_with_config; use a3s_code_core::orchestration::{ - execute_pipeline, AgentExecutor, AgentStepSpec, PipelineStage, StepOutcome, + execute_pipeline, execute_steps_parallel_resumable, AgentExecutor, AgentStepSpec, + PipelineStage, StepOutcome, }; +use a3s_code_core::store::{MemorySessionStore, SessionStore}; use a3s_code_core::subagent::AgentRegistry; use a3s_code_core::tools::TaskExecutor; @@ -149,3 +151,36 @@ async fn real_pipeline_chains_two_agent_stages() { "stage 2 produced output derived from stage 1" ); } + +/// Phase 4: a resumable workflow runs real steps, journals progress to a +/// store, and clears its checkpoint on full success. +#[tokio::test(flavor = "multi_thread")] +#[ignore = "requires real provider credentials and network access"] +async fn real_resumable_workflow_runs_and_clears_checkpoint() { + let (executor, _workspace) = local_executor(); + let exec: Arc = Arc::new(executor); + let store: Arc = Arc::new(MemorySessionStore::new()); + + let specs = vec![ + AgentStepSpec::new("rw-1", "general", "q1", "Reply with one word: ready").with_max_steps(2), + AgentStepSpec::new("rw-2", "general", "q2", "Reply with one word: go").with_max_steps(2), + ]; + + let out = + execute_steps_parallel_resumable(exec, specs, "real-wf", Arc::clone(&store), None).await; + + assert_eq!(out.len(), 2); + assert!( + out.iter().all(|o| o.success), + "both steps succeed: {:?}", + out.iter().map(|o| &o.output).collect::>() + ); + assert!( + store + .load_workflow_checkpoint("real-wf") + .await + .unwrap() + .is_none(), + "a fully-succeeded workflow clears its checkpoint" + ); +}