Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions catalog/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ pub struct Partition {
state: RwLock<State>,
config: Config,
manifest: Manifest,
/// High-water mark for the highest [SegmentIndex] whose manifest update has
/// been confirmed durable for a *sealed* (no longer active) segment, or
/// `None` until the first segment is sealed durably.
///
/// Monotonically non-decreasing: retention removing segments never moves it
/// backward, and the currently active (writeable) segment is never included.
sealed_ix: watch::Receiver<Option<SegmentIndex>>,
}

impl std::fmt::Debug for Partition {
Expand Down Expand Up @@ -104,10 +111,23 @@ impl Partition {
let (commit_writer, commits) = watch::channel(record);
let commit_manifest = manifest.clone();
let commit_id = id.clone();
let (sealed_tx, sealed_ix) = watch::channel(None);
tokio::spawn(async move {
while let Some(r) = writes.recv().await {
trace!("{} checkpoint: {:?}", commit_id, &r);
commit_manifest.update(&commit_id, &r.data).await;
// the manifest update above is now durable. if it sealed the
// segment, advance the watermark, holding it monotonic against
// any out-of-order update.
if r.sealed {
sealed_tx.send_if_modified(|current| match current {
Some(ix) if *ix >= r.data.index => false,
_ => {
*current = Some(r.data.index);
true
}
});
}
// ok if no receivers, that means nothing is awaiting a commit
commit_writer.send(r.data.records.end).ok();
}
Expand All @@ -127,6 +147,7 @@ impl Partition {
state: RwLock::new(state),
id,
config,
sealed_ix,
}
}

Expand Down Expand Up @@ -170,6 +191,18 @@ impl Partition {
&self.id
}

/// The highest [SegmentIndex] whose manifest update has been confirmed
/// durable for a sealed (no longer active) segment, or `None` if no segment
/// has been sealed durably yet.
///
/// This is a high-water mark: it never moves backward, even after retention
/// removes the segment it points at. The currently active (writeable)
/// segment is never included. Cheap to call; intended to be read once per
/// reconcile pass per partition.
pub fn sealed_ix(&self) -> Option<SegmentIndex> {
*self.sealed_ix.borrow()
}

pub(crate) fn slog_name(id: &PartitionId) -> String {
format!("{}-{}", id.topic(), id.partition())
}
Expand Down Expand Up @@ -1713,4 +1746,66 @@ pub mod test {

Ok(())
}

#[tokio::test]
async fn test_sealed_ix_fresh_is_none() -> Result<()> {
let (_dir, part) = partition(segment_3s()).await?;
assert_eq!(part.sealed_ix(), None);
part.close().await;
Ok(())
}

#[tokio::test]
async fn test_sealed_ix_active_not_sealed() -> Result<()> {
// segment_3s rolls at >2 rows; two writes stay in the active segment.
// checkpoint makes that segment's data durable without sealing it, so
// the watermark stays None.
let (_dir, part) = partition(segment_3s()).await?;
init_records(&part, 2).await?;
part.checkpoint().await;
part.ensure_index(RecordIndex(2)).await?;
assert_eq!(part.sealed_ix(), None);
part.close().await;
Ok(())
}

#[tokio::test]
async fn test_sealed_ix_advances_after_roll() -> Result<()> {
// the fourth write rolls segment 0 (records 0..3); once that seal lands
// durably, sealed_ix points at segment 0 (never the active segment 1).
let (_dir, part) = partition(segment_3s()).await?;
init_records(&part, 4).await?;
part.ensure_index(RecordIndex(3)).await?;
assert_eq!(part.sealed_ix(), Some(SegmentIndex(0)));
part.close().await;
Ok(())
}

#[tokio::test]
async fn test_sealed_ix_tracks_most_recent_sealed() -> Result<()> {
// ten writes seal segments 0 (0..3), 1 (3..6) and 2 (6..9), leaving an
// active segment 3. sealed_ix tracks the most recent *sealed* segment.
let (_dir, part) = partition(segment_3s()).await?;
init_records(&part, 10).await?;
part.ensure_index(RecordIndex(9)).await?;
assert_eq!(part.sealed_ix(), Some(SegmentIndex(2)));
part.close().await;
Ok(())
}

#[tokio::test]
async fn test_sealed_ix_monotonic_under_retention() -> Result<()> {
let (_dir, part) = partition(segment_3s()).await?;
init_records(&part, 10).await?;
part.ensure_index(RecordIndex(9)).await?;
assert_eq!(part.sealed_ix(), Some(SegmentIndex(2)));

// retention removing the oldest segment is an observation that must not
// advance the watermark, and crucially must not move it backward.
part.remove_oldest().await;
assert_eq!(part.sealed_ix(), Some(SegmentIndex(2)));

part.close().await;
Ok(())
}
}
4 changes: 4 additions & 0 deletions catalog/src/slog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ struct AppendRequest {
#[derive(Debug)]
pub(crate) struct WriteResult {
pub(crate) data: SegmentData,
/// True when this write sealed (finalized) the segment, i.e. the segment is
/// now immutable and no longer the active, writeable one.
pub(crate) sealed: bool,
}

impl Slog {
Expand Down Expand Up @@ -732,6 +735,7 @@ fn spawn_slog_thread(
size,
version: crate::manifest::SEGMENT_FORMAT_VERSION,
},
sealed: seal,
};
trace!("{}: commit {:?}/{:?} send", name, segment, records.end);
tx_commits.blocking_send(response).expect("channel closed");
Expand Down
Loading