From a9dbb79bf4d2a2cfdb28e16f33b5013d73a6a63d Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 22:12:18 +0000 Subject: [PATCH 1/2] Add sealed_ix watermark to Partition Track the highest SegmentIndex whose manifest update has been confirmed durable for a sealed (no longer active) segment. This high-water mark lets reconciliation distinguish an active tail segment (writer noise, never a real mismatch) from a sealed, immutable segment. The watermark is sourced from the existing manifest-durability signal: the slog now reports whether a write sealed its segment, and the partition's commit task advances an AtomicU64 right after the durable manifest update lands. It is monotonically non-decreasing, so retention removing old segments never moves it backward. The currently active (writeable) segment is never included. Co-Authored-By: Claude --- catalog/src/partition.rs | 95 ++++++++++++++++++++++++++++++++++++++++ catalog/src/slog.rs | 4 ++ 2 files changed, 99 insertions(+) diff --git a/catalog/src/partition.rs b/catalog/src/partition.rs index 0b87d2b..08f7274 100644 --- a/catalog/src/partition.rs +++ b/catalog/src/partition.rs @@ -21,6 +21,8 @@ use std::fs; use std::ops::{Range, RangeInclusive}; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; +use std::sync::Arc; use std::time::Instant; use crate::data::{ @@ -56,6 +58,13 @@ pub struct Partition { state: RwLock, config: Config, manifest: Manifest, + /// High-water mark for the highest [SegmentIndex] whose manifest update has + /// been confirmed durable for a *sealed* (no longer active) segment. + /// + /// Encoded as `index + 1` so that `0` can represent "no sealed segment yet"; + /// see [Partition::sealed_ix]. Monotonically non-decreasing: retention + /// removing segments never moves it backward. + sealed_ix: Arc, } impl std::fmt::Debug for Partition { @@ -104,10 +113,18 @@ impl Partition { let (commit_writer, commits) = watch::channel(record); let commit_manifest = manifest.clone(); let commit_id = id.clone(); + let sealed_ix = Arc::new(AtomicU64::new(0)); + let commit_sealed_ix = sealed_ix.clone(); tokio::spawn(async move { while let Some(r) = writes.recv().await { trace!("{} checkpoint: {:?}", commit_id, &r); commit_manifest.update(&commit_id, &r.data).await; + // the manifest update above is now durable. if it sealed the + // segment, advance the watermark. fetch_max keeps it monotonic + // regardless of the order updates land in. + if r.sealed { + commit_sealed_ix.fetch_max(r.data.index.0 as u64 + 1, AtomicOrdering::Relaxed); + } // ok if no receivers, that means nothing is awaiting a commit commit_writer.send(r.data.records.end).ok(); } @@ -127,6 +144,7 @@ impl Partition { state: RwLock::new(state), id, config, + sealed_ix, } } @@ -170,6 +188,21 @@ impl Partition { &self.id } + /// The highest [SegmentIndex] whose manifest update has been confirmed + /// durable for a sealed (no longer active) segment, or `None` if no segment + /// has been sealed durably yet. + /// + /// This is a high-water mark: it never moves backward, even after retention + /// removes the segment it points at. The currently active (writeable) + /// segment is never included. Cheap to call; intended to be read once per + /// reconcile pass per partition. + pub fn sealed_ix(&self) -> Option { + match self.sealed_ix.load(AtomicOrdering::Relaxed) { + 0 => None, + encoded => Some(SegmentIndex((encoded - 1) as usize)), + } + } + pub(crate) fn slog_name(id: &PartitionId) -> String { format!("{}-{}", id.topic(), id.partition()) } @@ -1713,4 +1746,66 @@ pub mod test { Ok(()) } + + #[tokio::test] + async fn test_sealed_ix_fresh_is_none() -> Result<()> { + let (_dir, part) = partition(segment_3s()).await?; + assert_eq!(part.sealed_ix(), None); + part.close().await; + Ok(()) + } + + #[tokio::test] + async fn test_sealed_ix_active_not_sealed() -> Result<()> { + // segment_3s rolls at >2 rows; two writes stay in the active segment. + // checkpoint makes that segment's data durable without sealing it, so + // the watermark stays None. + let (_dir, part) = partition(segment_3s()).await?; + init_records(&part, 2).await?; + part.checkpoint().await; + part.ensure_index(RecordIndex(2)).await?; + assert_eq!(part.sealed_ix(), None); + part.close().await; + Ok(()) + } + + #[tokio::test] + async fn test_sealed_ix_advances_after_roll() -> Result<()> { + // the fourth write rolls segment 0 (records 0..3); once that seal lands + // durably, sealed_ix points at segment 0 (never the active segment 1). + let (_dir, part) = partition(segment_3s()).await?; + init_records(&part, 4).await?; + part.ensure_index(RecordIndex(3)).await?; + assert_eq!(part.sealed_ix(), Some(SegmentIndex(0))); + part.close().await; + Ok(()) + } + + #[tokio::test] + async fn test_sealed_ix_tracks_most_recent_sealed() -> Result<()> { + // ten writes seal segments 0 (0..3), 1 (3..6) and 2 (6..9), leaving an + // active segment 3. sealed_ix tracks the most recent *sealed* segment. + let (_dir, part) = partition(segment_3s()).await?; + init_records(&part, 10).await?; + part.ensure_index(RecordIndex(9)).await?; + assert_eq!(part.sealed_ix(), Some(SegmentIndex(2))); + part.close().await; + Ok(()) + } + + #[tokio::test] + async fn test_sealed_ix_monotonic_under_retention() -> Result<()> { + let (_dir, part) = partition(segment_3s()).await?; + init_records(&part, 10).await?; + part.ensure_index(RecordIndex(9)).await?; + assert_eq!(part.sealed_ix(), Some(SegmentIndex(2))); + + // retention removing the oldest segment is an observation that must not + // advance the watermark, and crucially must not move it backward. + part.remove_oldest().await; + assert_eq!(part.sealed_ix(), Some(SegmentIndex(2))); + + part.close().await; + Ok(()) + } } diff --git a/catalog/src/slog.rs b/catalog/src/slog.rs index f11741d..2268e90 100644 --- a/catalog/src/slog.rs +++ b/catalog/src/slog.rs @@ -300,6 +300,9 @@ struct AppendRequest { #[derive(Debug)] pub(crate) struct WriteResult { pub(crate) data: SegmentData, + /// True when this write sealed (finalized) the segment, i.e. the segment is + /// now immutable and no longer the active, writeable one. + pub(crate) sealed: bool, } impl Slog { @@ -732,6 +735,7 @@ fn spawn_slog_thread( size, version: crate::manifest::SEGMENT_FORMAT_VERSION, }, + sealed: seal, }; trace!("{}: commit {:?}/{:?} send", name, segment, records.end); tx_commits.blocking_send(response).expect("channel closed"); From 0a4db420aff7863dc080c21bbe027395cd6a5aa2 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 3 Jun 2026 19:07:29 +0000 Subject: [PATCH 2/2] Store sealed_ix as Option via watch channel Replace the AtomicU64 with an `index + 1` sentinel encoding with a watch::channel>. This removes the error-prone implicit index arithmetic: the watermark is now a real Option, advanced monotonically in the commit task and read lock-free via borrow(). Co-Authored-By: Claude --- catalog/src/partition.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/catalog/src/partition.rs b/catalog/src/partition.rs index 08f7274..4a6c56a 100644 --- a/catalog/src/partition.rs +++ b/catalog/src/partition.rs @@ -21,8 +21,6 @@ use std::fs; use std::ops::{Range, RangeInclusive}; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; -use std::sync::Arc; use std::time::Instant; use crate::data::{ @@ -59,12 +57,12 @@ pub struct Partition { config: Config, manifest: Manifest, /// High-water mark for the highest [SegmentIndex] whose manifest update has - /// been confirmed durable for a *sealed* (no longer active) segment. + /// been confirmed durable for a *sealed* (no longer active) segment, or + /// `None` until the first segment is sealed durably. /// - /// Encoded as `index + 1` so that `0` can represent "no sealed segment yet"; - /// see [Partition::sealed_ix]. Monotonically non-decreasing: retention - /// removing segments never moves it backward. - sealed_ix: Arc, + /// Monotonically non-decreasing: retention removing segments never moves it + /// backward, and the currently active (writeable) segment is never included. + sealed_ix: watch::Receiver>, } impl std::fmt::Debug for Partition { @@ -113,17 +111,22 @@ impl Partition { let (commit_writer, commits) = watch::channel(record); let commit_manifest = manifest.clone(); let commit_id = id.clone(); - let sealed_ix = Arc::new(AtomicU64::new(0)); - let commit_sealed_ix = sealed_ix.clone(); + let (sealed_tx, sealed_ix) = watch::channel(None); tokio::spawn(async move { while let Some(r) = writes.recv().await { trace!("{} checkpoint: {:?}", commit_id, &r); commit_manifest.update(&commit_id, &r.data).await; // the manifest update above is now durable. if it sealed the - // segment, advance the watermark. fetch_max keeps it monotonic - // regardless of the order updates land in. + // segment, advance the watermark, holding it monotonic against + // any out-of-order update. if r.sealed { - commit_sealed_ix.fetch_max(r.data.index.0 as u64 + 1, AtomicOrdering::Relaxed); + sealed_tx.send_if_modified(|current| match current { + Some(ix) if *ix >= r.data.index => false, + _ => { + *current = Some(r.data.index); + true + } + }); } // ok if no receivers, that means nothing is awaiting a commit commit_writer.send(r.data.records.end).ok(); @@ -197,10 +200,7 @@ impl Partition { /// segment is never included. Cheap to call; intended to be read once per /// reconcile pass per partition. pub fn sealed_ix(&self) -> Option { - match self.sealed_ix.load(AtomicOrdering::Relaxed) { - 0 => None, - encoded => Some(SegmentIndex((encoded - 1) as usize)), - } + *self.sealed_ix.borrow() } pub(crate) fn slog_name(id: &PartitionId) -> String {