Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions core/src/loop_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,36 @@ pub struct LoopCheckpoint {
pub checkpoint_ms: u64,
}

impl LoopCheckpoint {
/// Reject a checkpoint written by a *newer*, incompatible schema
/// version than this build understands — honoring the contract on
/// [`LOOP_CHECKPOINT_SCHEMA_VERSION`].
///
/// Field *additions* are absorbed transparently by `#[serde(default)]`,
/// so an older checkpoint (lower `schema_version`, including a pre-v1
/// `0`) always remains loadable. A *future* version, however, may have
/// changed the meaning of existing fields or the tool-round boundary
/// semantics; resuming from one risks silent corruption (e.g.
/// re-running a non-idempotent tool on the wrong side of the boundary).
///
/// [`SessionStore`](crate::store::SessionStore) impls call this right
/// after deserialization, so both `resume_run` (which surfaces the
/// error to the caller) and the live-run [`LoopCheckpointSink`] (which
/// logs and starts fresh) refuse to act on an unreadable checkpoint.
pub fn ensure_loadable(&self) -> anyhow::Result<()> {
if self.schema_version > LOOP_CHECKPOINT_SCHEMA_VERSION {
anyhow::bail!(
"loop checkpoint for run {} has schema version {} but this build supports at \
most {}; refusing to resume from an incompatible future checkpoint",
self.run_id,
self.schema_version,
LOOP_CHECKPOINT_SCHEMA_VERSION
);
}
Ok(())
}
}

/// Receiver of per-tool-round checkpoints.
///
/// The framework ships one adapter:
Expand Down
6 changes: 5 additions & 1 deletion core/src/store/file_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,12 @@ impl SessionStore for FileSessionStore {
let json = fs::read_to_string(&path)
.await
.with_context(|| format!("Failed to read loop checkpoint from {}", path.display()))?;
let checkpoint = serde_json::from_str(&json)
let checkpoint: LoopCheckpoint = serde_json::from_str(&json)
.with_context(|| format!("Failed to parse loop checkpoint from {}", path.display()))?;
// Refuse to hand back a checkpoint written by a future, incompatible
// schema version — its field semantics may differ (see the contract
// on LoopCheckpoint::ensure_loadable).
checkpoint.ensure_loadable()?;
Ok(Some(checkpoint))
}

Expand Down
10 changes: 9 additions & 1 deletion core/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,15 @@ impl SessionStore for MemorySessionStore {
}

async fn load_loop_checkpoint(&self, run_id: &str) -> Result<Option<LoopCheckpoint>> {
Ok(self.loop_checkpoints.read().await.get(run_id).cloned())
match self.loop_checkpoints.read().await.get(run_id).cloned() {
// Enforce the same future-schema rejection as the file store so
// the contract holds uniformly across backends.
Some(cp) => {
cp.ensure_loadable()?;
Ok(Some(cp))
}
None => Ok(None),
}
}

async fn delete_loop_checkpoint(&self, run_id: &str) -> Result<()> {
Expand Down
Loading
Loading