Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 180 additions & 2 deletions core/src/orchestration/combinators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,23 @@ pub async fn execute_steps_parallel_resumable(
store: Arc<dyn SessionStore>,
event_tx: Option<broadcast::Sender<AgentEvent>>,
) -> Vec<StepOutcome> {
// Prior progress (the store's load rejects a future-version checkpoint).
// Prior progress. An unreadable checkpoint — e.g. one written by a newer,
// incompatible schema version, which the store rejects via
// `ensure_loadable` — is treated as *no* prior progress: the workflow
// re-runs from scratch rather than resuming from state it can't interpret.
// That's a fail-safe (do the work), but surface it rather than swallow it.
let done: HashMap<String, StepOutcome> = match store.load_workflow_checkpoint(workflow_id).await
{
Ok(Some(cp)) => cp.completed(),
_ => HashMap::new(),
Ok(None) => HashMap::new(),
Err(e) => {
tracing::warn!(
workflow_id = %workflow_id,
error = %e,
"workflow checkpoint unreadable; re-running the workflow from scratch"
);
HashMap::new()
}
};

let pending: Vec<AgentStepSpec> = specs
Expand Down Expand Up @@ -486,4 +498,170 @@ mod tests {
"failed step is NOT recorded → it retries on resume"
);
}

struct ZeroHintExecutor;
#[async_trait]
impl AgentExecutor for ZeroHintExecutor {
async fn execute_step(
&self,
spec: AgentStepSpec,
_event_tx: Option<broadcast::Sender<AgentEvent>>,
) -> StepOutcome {
StepOutcome {
task_id: spec.task_id.clone(),
session_id: format!("task-run-{}", spec.task_id),
agent: spec.agent.clone(),
output: "ok".to_string(),
success: true,
structured: None,
}
}
fn concurrency_hint(&self) -> usize {
0
}
}

#[tokio::test]
async fn empty_inputs_return_empty() {
let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
assert!(
crate::orchestration::execute_steps_parallel(Arc::clone(&exec), vec![], None)
.await
.is_empty()
);
let stages: Vec<PipelineStage<&str>> =
vec![stage(|_p: Option<&StepOutcome>, item: &&str| {
Some(AgentStepSpec::new("s", "explore", "d", *item))
})];
assert!(execute_pipeline(exec, Vec::<&str>::new(), stages, None)
.await
.is_empty());
}

#[tokio::test]
async fn zero_concurrency_hint_still_makes_progress() {
// The .max(1) clamp in run_ordered_parallel_with_limit keeps a 0-hint
// executor serialized-but-live instead of deadlocking on 0 permits.
let exec: Arc<dyn AgentExecutor> = Arc::new(ZeroHintExecutor);
let specs = vec![
AgentStepSpec::new("a", "explore", "d", "p"),
AgentStepSpec::new("b", "explore", "d", "p"),
AgentStepSpec::new("c", "explore", "d", "p"),
];
let out = crate::orchestration::execute_steps_parallel(exec, specs, None).await;
assert_eq!(
out.iter().map(|o| o.task_id.as_str()).collect::<Vec<_>>(),
vec!["a", "b", "c"]
);
assert!(out.iter().all(|o| o.success));
}

#[tokio::test]
async fn pipeline_first_stage_none_yields_none_outcome() {
let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
let stages: Vec<PipelineStage<&str>> =
vec![stage(|_p: Option<&StepOutcome>, item: &&str| {
if *item == "skip" {
None
} else {
Some(AgentStepSpec::new("s", "explore", "d", *item))
}
})];
let out = execute_pipeline(exec, vec!["skip", "run"], stages, None).await;
assert!(
out[0].is_none(),
"a first-stage None yields a None outcome (chain never started)"
);
assert!(out[1].as_ref().unwrap().success);
}

fn cached(task_id: &str, agent: &str, output: &str) -> StepOutcome {
StepOutcome {
task_id: task_id.to_string(),
session_id: format!("task-run-{task_id}"),
agent: agent.to_string(),
output: output.to_string(),
success: true,
structured: None,
}
}

#[tokio::test]
async fn resumable_reruns_all_when_checkpoint_load_errors() {
use crate::store::MemorySessionStore;
let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());

// A checkpoint written by a *newer*, incompatible schema version: the
// store rejects it on load. The resumable combinator must treat that as
// no prior progress and re-run everything (fail-safe), not panic or
// silently resume from state it can't read.
let mut done = std::collections::HashMap::new();
done.insert("a".to_string(), cached("a", "explore", "old"));
let mut cp = WorkflowCheckpoint::from_completed("wf-err", &done, 1);
cp.schema_version = crate::orchestration::WORKFLOW_CHECKPOINT_SCHEMA_VERSION + 1;
store.save_workflow_checkpoint("wf-err", &cp).await.unwrap();

let ran = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let exec: Arc<dyn AgentExecutor> = Arc::new(RecordingExecutor {
ran: Arc::clone(&ran),
});
let specs = vec![
AgentStepSpec::new("a", "explore", "d", "pa"),
AgentStepSpec::new("b", "review", "d", "pb"),
];
let out =
execute_steps_parallel_resumable(exec, specs, "wf-err", Arc::clone(&store), None).await;

let mut ran_ids = ran.lock().await.clone();
ran_ids.sort();
assert_eq!(
ran_ids,
vec!["a".to_string(), "b".to_string()],
"an unreadable (future-version) checkpoint is ignored → all steps re-run"
);
assert_eq!(out.len(), 2);
assert!(out.iter().all(|o| o.success));
}

#[tokio::test]
async fn resumable_ignores_checkpointed_steps_absent_from_new_specs() {
use crate::store::MemorySessionStore;
let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());

// Prior checkpoint completed {a, b}; the new run drops "a", reorders,
// and adds "c". Output follows the NEW specs; "b" is reused; the stale
// "a" simply doesn't appear; only "c" actually runs.
let mut done = std::collections::HashMap::new();
done.insert("a".to_string(), cached("a", "explore", "cached-a"));
done.insert("b".to_string(), cached("b", "review", "cached-b"));
store
.save_workflow_checkpoint(
"wf-x",
&WorkflowCheckpoint::from_completed("wf-x", &done, 1),
)
.await
.unwrap();

let ran = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let exec: Arc<dyn AgentExecutor> = Arc::new(RecordingExecutor {
ran: Arc::clone(&ran),
});
let specs = vec![
AgentStepSpec::new("b", "review", "d", "pb"),
AgentStepSpec::new("c", "plan", "d", "pc"),
];
let out =
execute_steps_parallel_resumable(exec, specs, "wf-x", Arc::clone(&store), None).await;

assert_eq!(
*ran.lock().await,
vec!["c".to_string()],
"cached b reused, stale a dropped, only new c runs"
);
assert_eq!(out.len(), 2);
assert_eq!(out[0].task_id, "b");
assert_eq!(out[0].output, "cached-b");
assert_eq!(out[1].task_id, "c");
assert!(out.iter().all(|o| o.success));
}
}
119 changes: 119 additions & 0 deletions core/src/store/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,3 +808,122 @@ async fn test_file_store_checkpoint_write_is_atomic_no_temp_leftovers() {
"the final checkpoint file must exist, got: {names:?}"
);
}

fn sample_workflow_checkpoint(wf_id: &str) -> crate::orchestration::WorkflowCheckpoint {
use crate::orchestration::{
StepOutcome, WorkflowCheckpoint, WorkflowStepRecord, WORKFLOW_CHECKPOINT_SCHEMA_VERSION,
};
let outcome = |id: &str, structured| StepOutcome {
task_id: id.to_string(),
session_id: format!("task-run-{id}"),
agent: "explore".to_string(),
output: format!("out-{id}"),
success: true,
structured,
};
WorkflowCheckpoint {
schema_version: WORKFLOW_CHECKPOINT_SCHEMA_VERSION,
workflow_id: wf_id.to_string(),
steps: vec![
WorkflowStepRecord {
task_id: "a".into(),
outcome: outcome("a", None),
},
WorkflowStepRecord {
task_id: "b".into(),
outcome: outcome("b", Some(serde_json::json!({ "k": 1 }))),
},
],
checkpoint_ms: 1_700_000_000_000,
}
}

#[tokio::test]
async fn test_file_store_workflow_checkpoint_roundtrip() {
let dir = tempdir().unwrap();
let store = FileSessionStore::new(dir.path()).await.unwrap();
let cp = sample_workflow_checkpoint("wf-1");
store.save_workflow_checkpoint("wf-1", &cp).await.unwrap();

let loaded = store
.load_workflow_checkpoint("wf-1")
.await
.unwrap()
.expect("present");
assert_eq!(loaded, cp);
assert_eq!(loaded.completed().len(), 2);

store.delete_workflow_checkpoint("wf-1").await.unwrap();
assert!(store
.load_workflow_checkpoint("wf-1")
.await
.unwrap()
.is_none());
// Idempotent on a missing file.
store.delete_workflow_checkpoint("wf-1").await.unwrap();
}

#[tokio::test]
async fn test_memory_store_rejects_future_workflow_checkpoint() {
let store = MemorySessionStore::new();
let mut future = sample_workflow_checkpoint("wf-future");
future.schema_version = crate::orchestration::WORKFLOW_CHECKPOINT_SCHEMA_VERSION + 1;
store
.save_workflow_checkpoint("wf-future", &future)
.await
.unwrap();
let err = store
.load_workflow_checkpoint("wf-future")
.await
.unwrap_err();
assert!(err.to_string().contains("schema version"), "got: {err}");

store
.save_workflow_checkpoint("wf-ok", &sample_workflow_checkpoint("wf-ok"))
.await
.unwrap();
assert!(store
.load_workflow_checkpoint("wf-ok")
.await
.unwrap()
.is_some());
}

#[tokio::test]
async fn test_file_store_rejects_future_workflow_checkpoint() {
let dir = tempdir().unwrap();
let store = FileSessionStore::new(dir.path()).await.unwrap();
let mut future = sample_workflow_checkpoint("wf-f");
future.schema_version = crate::orchestration::WORKFLOW_CHECKPOINT_SCHEMA_VERSION + 1;
store
.save_workflow_checkpoint("wf-f", &future)
.await
.unwrap();
let err = store.load_workflow_checkpoint("wf-f").await.unwrap_err();
assert!(err.to_string().contains("schema version"), "got: {err}");
}

#[tokio::test]
async fn test_file_store_workflow_checkpoint_atomic_no_temp_leftovers() {
let dir = tempdir().unwrap();
let store = FileSessionStore::new(dir.path()).await.unwrap();
store
.save_workflow_checkpoint("wf-z", &sample_workflow_checkpoint("wf-z"))
.await
.unwrap();

let ckpt_dir = dir.path().join("workflow_checkpoints");
let mut entries = tokio::fs::read_dir(&ckpt_dir).await.unwrap();
let mut names = Vec::new();
while let Some(e) = entries.next_entry().await.unwrap() {
names.push(e.file_name().to_string_lossy().to_string());
}
assert!(
names.iter().all(|n| !n.contains(".tmp")),
"no temp leftovers after atomic write, got: {names:?}"
);
assert!(
names.iter().any(|n| n == "wf-z.json"),
"the final workflow checkpoint file must exist, got: {names:?}"
);
}
Loading
Loading