diff --git a/catalog/src/partition.rs b/catalog/src/partition.rs index 0b87d2b..4a6c56a 100644 --- a/catalog/src/partition.rs +++ b/catalog/src/partition.rs @@ -56,6 +56,13 @@ pub struct Partition { state: RwLock, config: Config, manifest: Manifest, + /// High-water mark for the highest [SegmentIndex] whose manifest update has + /// been confirmed durable for a *sealed* (no longer active) segment, or + /// `None` until the first segment is sealed durably. + /// + /// Monotonically non-decreasing: retention removing segments never moves it + /// backward, and the currently active (writeable) segment is never included. + sealed_ix: watch::Receiver>, } impl std::fmt::Debug for Partition { @@ -104,10 +111,23 @@ impl Partition { let (commit_writer, commits) = watch::channel(record); let commit_manifest = manifest.clone(); let commit_id = id.clone(); + let (sealed_tx, sealed_ix) = watch::channel(None); tokio::spawn(async move { while let Some(r) = writes.recv().await { trace!("{} checkpoint: {:?}", commit_id, &r); commit_manifest.update(&commit_id, &r.data).await; + // the manifest update above is now durable. if it sealed the + // segment, advance the watermark, holding it monotonic against + // any out-of-order update. + if r.sealed { + sealed_tx.send_if_modified(|current| match current { + Some(ix) if *ix >= r.data.index => false, + _ => { + *current = Some(r.data.index); + true + } + }); + } // ok if no receivers, that means nothing is awaiting a commit commit_writer.send(r.data.records.end).ok(); } @@ -127,6 +147,7 @@ impl Partition { state: RwLock::new(state), id, config, + sealed_ix, } } @@ -170,6 +191,18 @@ impl Partition { &self.id } + /// The highest [SegmentIndex] whose manifest update has been confirmed + /// durable for a sealed (no longer active) segment, or `None` if no segment + /// has been sealed durably yet. + /// + /// This is a high-water mark: it never moves backward, even after retention + /// removes the segment it points at. The currently active (writeable) + /// segment is never included. Cheap to call; intended to be read once per + /// reconcile pass per partition. + pub fn sealed_ix(&self) -> Option { + *self.sealed_ix.borrow() + } + pub(crate) fn slog_name(id: &PartitionId) -> String { format!("{}-{}", id.topic(), id.partition()) } @@ -1713,4 +1746,66 @@ pub mod test { Ok(()) } + + #[tokio::test] + async fn test_sealed_ix_fresh_is_none() -> Result<()> { + let (_dir, part) = partition(segment_3s()).await?; + assert_eq!(part.sealed_ix(), None); + part.close().await; + Ok(()) + } + + #[tokio::test] + async fn test_sealed_ix_active_not_sealed() -> Result<()> { + // segment_3s rolls at >2 rows; two writes stay in the active segment. + // checkpoint makes that segment's data durable without sealing it, so + // the watermark stays None. + let (_dir, part) = partition(segment_3s()).await?; + init_records(&part, 2).await?; + part.checkpoint().await; + part.ensure_index(RecordIndex(2)).await?; + assert_eq!(part.sealed_ix(), None); + part.close().await; + Ok(()) + } + + #[tokio::test] + async fn test_sealed_ix_advances_after_roll() -> Result<()> { + // the fourth write rolls segment 0 (records 0..3); once that seal lands + // durably, sealed_ix points at segment 0 (never the active segment 1). + let (_dir, part) = partition(segment_3s()).await?; + init_records(&part, 4).await?; + part.ensure_index(RecordIndex(3)).await?; + assert_eq!(part.sealed_ix(), Some(SegmentIndex(0))); + part.close().await; + Ok(()) + } + + #[tokio::test] + async fn test_sealed_ix_tracks_most_recent_sealed() -> Result<()> { + // ten writes seal segments 0 (0..3), 1 (3..6) and 2 (6..9), leaving an + // active segment 3. sealed_ix tracks the most recent *sealed* segment. + let (_dir, part) = partition(segment_3s()).await?; + init_records(&part, 10).await?; + part.ensure_index(RecordIndex(9)).await?; + assert_eq!(part.sealed_ix(), Some(SegmentIndex(2))); + part.close().await; + Ok(()) + } + + #[tokio::test] + async fn test_sealed_ix_monotonic_under_retention() -> Result<()> { + let (_dir, part) = partition(segment_3s()).await?; + init_records(&part, 10).await?; + part.ensure_index(RecordIndex(9)).await?; + assert_eq!(part.sealed_ix(), Some(SegmentIndex(2))); + + // retention removing the oldest segment is an observation that must not + // advance the watermark, and crucially must not move it backward. + part.remove_oldest().await; + assert_eq!(part.sealed_ix(), Some(SegmentIndex(2))); + + part.close().await; + Ok(()) + } } diff --git a/catalog/src/slog.rs b/catalog/src/slog.rs index f11741d..2268e90 100644 --- a/catalog/src/slog.rs +++ b/catalog/src/slog.rs @@ -300,6 +300,9 @@ struct AppendRequest { #[derive(Debug)] pub(crate) struct WriteResult { pub(crate) data: SegmentData, + /// True when this write sealed (finalized) the segment, i.e. the segment is + /// now immutable and no longer the active, writeable one. + pub(crate) sealed: bool, } impl Slog { @@ -732,6 +735,7 @@ fn spawn_slog_thread( size, version: crate::manifest::SEGMENT_FORMAT_VERSION, }, + sealed: seal, }; trace!("{}: commit {:?}/{:?} send", name, segment, records.end); tx_commits.blocking_send(response).expect("channel closed");