From 5ab73fd126e84ade7249b840c8275896e9cf987c Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 4 Jun 2026 18:17:48 +0000 Subject: [PATCH 1/2] Add ReconcileReport, split sealed vs active segments in reconciliation Reconciliation previously ran every segment through the strict size-diff pipeline, which flagged the currently active (writeable) tail segment of every partition on every pass: writes accumulate on disk before the manifest flush, so the active segment legitimately diverges. That noise blocks running reconcile continuously alongside writes. Split the output into two buckets keyed off each partition's sealed_ix watermark, read once per partition pass: - sealed bucket: segments at or below sealed_ix go through the existing strict-diff + CAS-fix pipeline, unchanged. - active bucket: segments above sealed_ix (or all segments when the partition has never sealed one) are recorded informationally as { topic, partition, manifest_size, disk_size, delta } and never fixed. Introduce ReconcileReport { sealed: ReconcileStats, active: Vec<..> } and replace ReconcileJob::stats() with report(). The /info handler now also returns the active-segment bucket (additive, serde-default field). last_manifest_update_age is left as an optional None field with a TODO: there is no durable-update timestamp hook in the write path yet, and this PR deliberately does not add one. --- catalog/src/lib.rs | 4 +- catalog/src/reconcile.rs | 699 +++++++++++++++++++++++++++++++-------- server/src/http.rs | 38 ++- server/tests/server.rs | 15 + transport/src/lib.rs | 28 ++ 5 files changed, 633 insertions(+), 151 deletions(-) diff --git a/catalog/src/lib.rs b/catalog/src/lib.rs index 838423d..1b4f489 100644 --- a/catalog/src/lib.rs +++ b/catalog/src/lib.rs @@ -32,4 +32,6 @@ pub use plateau_transport as transport; pub use plateau_test as test; pub use catalog::{Catalog, Config}; -pub use reconcile::{ReconcileConfig, ReconcileJob, ReconcileStats}; +pub use reconcile::{ + ActiveSegmentReport, ReconcileConfig, ReconcileJob, ReconcileReport, ReconcileStats, +}; diff --git a/catalog/src/reconcile.rs b/catalog/src/reconcile.rs index aa07de7..9fd078b 100644 --- a/catalog/src/reconcile.rs +++ b/catalog/src/reconcile.rs @@ -12,6 +12,8 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant}; +use crate::slog::SegmentIndex; + use tokio::fs; use crate::data::segment::Segment; @@ -28,6 +30,15 @@ use crate::partition::Partition; use crate::slog::Slog; use crate::topic::Topic; +/// Whether a segment is sealed relative to a partition's `sealed_ix` watermark. +/// +/// A segment is sealed only when the partition has a watermark (`Some`) and the +/// segment index is at or below it. When the watermark is `None` (no segment has +/// sealed durably yet) every segment is treated as active. +fn is_sealed(sealed_ix: Option, index: SegmentIndex) -> bool { + sealed_ix.is_some_and(|watermark| index <= watermark) +} + /// Configuration for the reconciliation job #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(default)] @@ -77,17 +88,17 @@ struct ReconcileState { topics: Option>, /// Accumulator for all segments in the current topic topic_segments: BTreeSet, - /// Statistics from the reconciliation - stats: ReconcileStats, + /// Report accumulated during the reconciliation (sealed + active buckets) + report: ReconcileReport, } impl ReconcileState { pub fn new(track_files: bool) -> Self { Self { - stats: if track_files { - ReconcileStats::with_path_tracking() + report: if track_files { + ReconcileReport::with_path_tracking() } else { - ReconcileStats::default() + ReconcileReport::default() }, ..Default::default() } @@ -199,6 +210,72 @@ impl ReconcileStats { } } +/// An informational report for the currently *active* (writeable) tail segment +/// of a single partition. +/// +/// The active segment is deliberately excluded from the strict sealed-diff +/// pipeline. Writes accumulate in it before the manifest is flushed, so its +/// on-disk size routinely runs ahead of the size recorded in the manifest. +/// Treating that as a "size mismatch" would flag the active segment of every +/// partition on every pass. This bucket exists to surface that drift without +/// ever acting on it — reconciliation fixes never touch the active segment. +/// +/// How to read the signals: +/// - `delta != 0` is *expected*: disk is ahead of the manifest between flushes. +/// - `delta < 0` is alert-worthy: the manifest claims more bytes than exist on +/// disk, which points at corruption or an accounting bug. +/// - `delta` growing unboundedly across consecutive scans suggests manifest +/// updates are stuck or falling behind. +/// - `last_manifest_update_age` exceeding a threshold suggests the write path is +/// wedged. (See the field's TODO; currently always `None`.) +#[derive(Debug, Clone)] +pub struct ActiveSegmentReport { + /// Topic the active segment belongs to. + pub topic: String, + /// Partition the active segment belongs to. + pub partition: String, + /// Size the manifest currently records for the active segment. + pub manifest_size: usize, + /// Size observed on disk (main segment file plus any parts). + pub disk_size: usize, + /// `disk_size as i64 - manifest_size as i64`. Signed; negative is an alert + /// signal (manifest claims more bytes than disk has). + pub delta: i64, + /// Time since this partition's most recent manifest update landed durably. + /// + /// TODO: there is no hook in the write path that records when the last + /// manifest update became durable, so this is always `None` for now. Wire + /// it up once the write path exposes that timestamp; per this PR's scope we + /// deliberately do not add new plumbing in the write path for it. + pub last_manifest_update_age: Option, +} + +/// The full output of a reconciliation pass, split into two buckets. +/// +/// `sealed` is the strict pipeline: every segment at or below a partition's +/// `sealed_ix` watermark is diffed against the manifest and, when configured, +/// fixed. This is the historical [ReconcileStats] behaviour, unchanged. +/// +/// `active` is informational only: one entry per partition whose active tail +/// segment was observed during the pass. Reconciliation fixes never touch the +/// active bucket. See [ActiveSegmentReport] for how to interpret its entries. +#[derive(Debug, Clone, Default)] +pub struct ReconcileReport { + /// Strict sealed-segment diffs (the historical [ReconcileStats] shape). + pub sealed: ReconcileStats, + /// Informational per-partition active-segment reports. + pub active: Vec, +} + +impl ReconcileReport { + fn with_path_tracking() -> Self { + Self { + sealed: ReconcileStats::with_path_tracking(), + active: Vec::new(), + } + } +} + impl ReconcileJob { /// Create a new reconciliation job for the given catalog with default configuration pub fn new(catalog: Arc) -> Self { @@ -235,8 +312,8 @@ impl ReconcileJob { // If we're done, return true if done { - let stats = self.stats(); - info!("Reconciliation complete: {:?}", stats); + let report = self.report(); + info!("Reconciliation complete: {:?}", report); return Ok(true); } @@ -339,7 +416,8 @@ impl ReconcileJob { // Add the partition files to our stats self.state - .stats + .report + .sealed .files_checked .add_paths(partition_files.clone(), partition_bytes); debug!( @@ -357,7 +435,8 @@ impl ReconcileJob { .map(|m| m.len() as usize) .unwrap_or(0); self.state - .stats + .report + .sealed .untracked_files .add_path(file_path.clone(), file_size); } else { @@ -384,6 +463,18 @@ impl ReconcileJob { topic_name, partition_name, topic_path ); + // Read the sealed watermark once for this partition pass. Segments at or + // below it are durable and run through the strict sealed-diff pipeline; + // anything above it (or every segment, if the partition has not sealed + // one yet) is the currently active tail and goes to the informational + // active bucket instead. The watermark never moves backward, so this + // single read is a stable basis for the whole pass. + let sealed_ix = { + let topic = self.catalog.get_topic(topic_name).await; + let partition = topic.get_partition(partition_name).await; + partition.sealed_ix() + }; + // Create sets to track files let mut tracked_files = BTreeSet::new(); @@ -397,10 +488,11 @@ impl ReconcileJob { let segments: Vec = segments_stream.collect().await; if let Some((start, end)) = segments.first().zip(segments.last()) { debug!( - "Fetched {} segments: {} ..= {}", + "Fetched {} segments: {} ..= {} (sealed_ix={:?})", segments.len(), start.index.0, - end.index.0 + end.index.0, + sealed_ix, ); } else { debug!("Found no segments") @@ -410,143 +502,235 @@ impl ReconcileJob { for segment in segments { debug!("Validating segment: {:?}", segment.index); - let partition_id = PartitionId { - topic: topic_name.into(), - partition: partition_name.into(), - }; - let slog_name = Partition::slog_name(&partition_id); let segment_file_name = format!("{}-{}", slog_name, segment.index.0); let segment_path = Slog::segment_path(&topic_path, &slog_name, segment.index); debug!("Checking segment file: {} at {:?}", slog_name, segment_path); - // Mark this file as tracked (we may want to consider .arrows extension depending on actual requirements) + // Mark this file as tracked regardless of bucket so the orphan + // detection phase does not false-positive on active segment files. tracked_files.insert(segment_path.clone()); - // Check if the file exists - if !segment_path.exists() { - warn!("Missing file {:?}", segment_path); - // Add the missing path to our stats - self.state - .stats - .missing_files - .add_path(segment_path.clone(), segment.size); + // A segment is "sealed" only when the partition has a watermark and + // this segment's index is at or below it. Everything else is the + // active tail (this includes the all-segments-active case when the + // partition has never sealed a segment, i.e. sealed_ix is None). + if is_sealed(sealed_ix, segment.index) { + self.process_sealed_segment( + &partition_id, + &segment, + &segment_file_name, + segment_path, + &mut tracked_files, + ) + .await; } else { - // Check file size including recovery files - let mut total_actual_size = 0; - - // Check main segment file - match fs::metadata(&segment_path).await { - Ok(metadata) => { - total_actual_size += metadata.len() as usize; - debug!( - "Segment {} file size: {}", - segment_file_name, - metadata.len() - ); - } - Err(e) => { - warn!( - "Error getting metadata for segment {}: {:?}", - segment_file_name, e - ); - } - } + self.process_active_segment( + topic_name, + partition_name, + &segment, + &segment_file_name, + segment_path, + &mut tracked_files, + ) + .await; + } + } - // Check for associated parts and add their size - let segment_file = Segment::at(segment_path); - for part_path in segment_file - .parts() - .chain(iter::once(segment_file.cache_path())) - { - if part_path.exists() { - tracked_files.insert(part_path.clone()); - if part_path != segment_file.cache_path() { - match fs::metadata(&part_path).await { - Ok(metadata) => { - total_actual_size += metadata.len() as usize; - debug!("Part {:?} size: {}", part_path, metadata.len()); - } - Err(e) => { - warn!( - "Error getting metadata for part {:?}: {:?}", - part_path, e - ); - } - } - } - } else { - debug!("Part {:?} does not exist", part_path); - } + // We've completed processing this partition + Ok(tracked_files) + } + + /// Strict diff (and optional CAS-fix) for a sealed segment. This preserves + /// the historical reconciliation behaviour and only ever touches the sealed + /// bucket of the report. + async fn process_sealed_segment( + &mut self, + partition_id: &PartitionId, + segment: &SegmentData, + segment_file_name: &str, + segment_path: PathBuf, + tracked_files: &mut BTreeSet, + ) { + // Check if the file exists + if !segment_path.exists() { + warn!("Missing file {:?}", segment_path); + // Add the missing path to our stats + self.state + .report + .sealed + .missing_files + .add_path(segment_path.clone(), segment.size); + return; + } + + let total_actual_size = self + .segment_disk_size(&segment_path, segment_file_name, tracked_files) + .await; + + let expected_size = ByteSize(segment.size as u64); + let actual_size = ByteSize(total_actual_size as u64); + self.state.report.sealed.expected_size = ByteSize( + self.state.report.sealed.expected_size.as_u64() + expected_size.as_u64(), + ); + self.state.report.sealed.actual_size = + ByteSize(self.state.report.sealed.actual_size.as_u64() + actual_size.as_u64()); + + // Compare total size with expected size + debug!( + "Comparing sizes - total_actual_size={}, segment.size={}, diff={}", + total_actual_size, + segment.size, + total_actual_size.abs_diff(segment.size) + ); + if total_actual_size.abs_diff(segment.size) > 0 { + warn!( + "Size mismatch for segment {}. Expected {}, actual {}", + segment_file_name, expected_size, actual_size + ); + // Add the mismatched path to our stats + self.state.report.sealed.size_mismatches.add_path( + segment_path.clone(), + // NOTE: this is probably not ideal as it can "overcount" the total difference + total_actual_size.abs_diff(segment.size), + ); + + if self + .config + .fixes + .contains(&ReconcileFix::UpdateManifestSizes) + { + // Apply the fix conditionally on the size we observed. If + // retention removed this segment (or another writer changed + // it) since we snapshotted the manifest, the update is a + // no-op rather than re-creating a stale row. This keeps the + // fix safe to run while writes and retention are in flight. + let applied = self + .catalog + .manifest() + .update_size_if_unchanged( + partition_id, + segment.index, + segment.size, + total_actual_size, + ) + .await; + if applied { + info!("Fixed size mismatch for segment {}", segment_file_name); + } else { + info!( + "Skipped stale size fix for segment {} (concurrently removed or changed)", + segment_file_name + ); } + } + } else { + debug!( + "Segment {} size ok. Expected: {}, actual: {}", + segment_file_name, segment.size, total_actual_size + ); + } + } + + /// Record an informational active-segment report. This never contributes a + /// size mismatch to the sealed bucket and never applies a fix — the active + /// tail segment is, by definition, not sealed, so its on-disk size legally + /// runs ahead of the manifest between flushes. + async fn process_active_segment( + &mut self, + topic_name: &str, + partition_name: &str, + segment: &SegmentData, + segment_file_name: &str, + segment_path: PathBuf, + tracked_files: &mut BTreeSet, + ) { + let disk_size = if segment_path.exists() { + self.segment_disk_size(&segment_path, segment_file_name, tracked_files) + .await + } else { + // The active segment may not have been flushed to disk yet. A zero + // disk size against a non-zero manifest size yields a negative delta, + // which is exactly the alert signal we want to surface. + debug!("Active segment file {:?} not present on disk", segment_path); + 0 + }; - let expected_size = ByteSize(segment.size as u64); - let actual_size = ByteSize(total_actual_size as u64); - self.state.stats.expected_size = - ByteSize(self.state.stats.expected_size.as_u64() + expected_size.as_u64()); - self.state.stats.actual_size = - ByteSize(self.state.stats.actual_size.as_u64() + actual_size.as_u64()); + let manifest_size = segment.size; + let delta = disk_size as i64 - manifest_size as i64; + debug!( + "Active segment {} for {}/{}: manifest_size={}, disk_size={}, delta={}", + segment_file_name, topic_name, partition_name, manifest_size, disk_size, delta + ); - // Compare total size with expected size + self.state.report.active.push(ActiveSegmentReport { + topic: topic_name.to_string(), + partition: partition_name.to_string(), + manifest_size, + disk_size, + delta, + // TODO: no durable-update timestamp hook exists in the write path + // yet; see ActiveSegmentReport::last_manifest_update_age. + last_manifest_update_age: None, + }); + } + + /// Compute the on-disk size of a segment: the main file plus any parts. + /// Every part path encountered is inserted into `tracked_files` so the + /// orphan-detection phase does not flag it. The caller is responsible for + /// confirming the main segment file exists before calling. + async fn segment_disk_size( + &self, + segment_path: &Path, + segment_file_name: &str, + tracked_files: &mut BTreeSet, + ) -> usize { + let mut total_actual_size = 0; + + // Check main segment file + match fs::metadata(segment_path).await { + Ok(metadata) => { + total_actual_size += metadata.len() as usize; debug!( - "Comparing sizes - total_actual_size={}, segment.size={}, diff={}", - total_actual_size, - segment.size, - total_actual_size.abs_diff(segment.size) + "Segment {} file size: {}", + segment_file_name, + metadata.len() ); - if total_actual_size.abs_diff(segment.size) > 0 { - warn!( - "Size mismatch for segment {}. Expected {}, actual {}", - segment_file_name, expected_size, actual_size - ); - // Add the mismatched path to our stats - self.state.stats.size_mismatches.add_path( - segment_file.path().clone(), - // NOTE: this is probably not ideal as it can "overcount" the total difference - total_actual_size.abs_diff(segment.size), - ); + } + Err(e) => { + warn!( + "Error getting metadata for segment {}: {:?}", + segment_file_name, e + ); + } + } - if self - .config - .fixes - .contains(&ReconcileFix::UpdateManifestSizes) - { - // Apply the fix conditionally on the size we observed. If - // retention removed this segment (or another writer changed - // it) since we snapshotted the manifest, the update is a - // no-op rather than re-creating a stale row. This keeps the - // fix safe to run while writes and retention are in flight. - let applied = self - .catalog - .manifest() - .update_size_if_unchanged( - &partition_id, - segment.index, - segment.size, - total_actual_size, - ) - .await; - if applied { - info!("Fixed size mismatch for segment {}", segment_file_name); - } else { - info!( - "Skipped stale size fix for segment {} (concurrently removed or changed)", - segment_file_name - ); + // Check for associated parts and add their size + let segment_file = Segment::at(segment_path.to_path_buf()); + for part_path in segment_file + .parts() + .chain(iter::once(segment_file.cache_path())) + { + if part_path.exists() { + tracked_files.insert(part_path.clone()); + if part_path != segment_file.cache_path() { + match fs::metadata(&part_path).await { + Ok(metadata) => { + total_actual_size += metadata.len() as usize; + debug!("Part {:?} size: {}", part_path, metadata.len()); + } + Err(e) => { + warn!("Error getting metadata for part {:?}: {:?}", part_path, e); } } - } else { - debug!( - "Segment {} size ok. Expected: {}, actual: {}", - segment_file_name, segment.size, total_actual_size - ); } + } else { + debug!("Part {:?} does not exist", part_path); } } - // We've completed processing this partition - Ok(tracked_files) + total_actual_size } /// List all segment-related files in a partition directory @@ -567,9 +751,9 @@ impl ReconcileJob { Ok(files) } - /// Get current reconciliation statistics - pub fn stats(&self) -> &ReconcileStats { - &self.state.stats + /// Get the current reconciliation report (sealed + active buckets) + pub fn report(&self) -> &ReconcileReport { + &self.state.report } /// Reset the reconciliation job to start from the beginning @@ -577,10 +761,10 @@ impl ReconcileJob { self.state.current_topic_index = 0; self.state.current_partition_index = 0; self.state.topics = None; - self.state.stats = if self.config.track_files { - ReconcileStats::with_path_tracking() + self.state.report = if self.config.track_files { + ReconcileReport::with_path_tracking() } else { - ReconcileStats::default() + ReconcileReport::default() }; } } @@ -604,6 +788,40 @@ mod tests { (dir, Arc::new(catalog)) } + /// Create a catalog whose partitions roll after `max_rows` rows, so tests + /// can deterministically force a segment to seal. + async fn create_test_catalog_rolling(max_rows: usize) -> (TempDir, Arc) { + let dir = TempDir::new().unwrap(); + let root = PathBuf::from(dir.path()); + let mut config = Config::default(); + config.partition.roll.max_rows = max_rows; + let catalog = Catalog::attach(root, config).await.unwrap(); + (dir, Arc::new(catalog)) + } + + fn test_records(messages: &[&str]) -> Vec { + messages + .iter() + .map(|message| Record { + time: Utc::now(), + message: message.bytes().collect(), + }) + .collect() + } + + /// Look up the size the manifest currently records for a single segment. + async fn manifest_segment_size( + catalog: &Catalog, + partition_id: &PartitionId, + index: SegmentIndex, + ) -> Option { + catalog + .manifest() + .get_segment_data(index.to_id(partition_id)) + .await + .map(|data| data.size) + } + // run reconcile with tracking for all of these tests and verify the associated // path(s) end up in the path stats. @@ -621,7 +839,7 @@ mod tests { let done = reconciler.run(Some(100)).await?; assert!(done); - let stats = reconciler.stats(); + let stats = &reconciler.report().sealed; assert_eq!(stats.files_checked.len(), 0); assert_eq!(stats.untracked_files.len(), 0); assert_eq!(stats.size_mismatches.len(), 0); @@ -664,7 +882,7 @@ mod tests { assert!(done); // Should have validated some segments - let stats = reconciler.stats(); + let stats = &reconciler.report().sealed; assert!(!stats.files_checked.is_empty()); assert_eq!(stats.untracked_files.paths.len(), 0); assert_eq!(stats.missing_files.paths.len(), 0); @@ -712,7 +930,7 @@ mod tests { assert!(done3); // Should be done now // Stats should show work was done - let stats = reconciler.stats(); + let stats = &reconciler.report().sealed; assert_eq!(stats.files_checked.len(), 3); Ok(()) @@ -747,7 +965,7 @@ mod tests { assert!(done); // Should be done // Stats should show work was done - let stats = reconciler.stats(); + let stats = &reconciler.report().sealed; assert_eq!(stats.files_checked.len(), 1); assert_eq!(stats.missing_files.len(), 0); assert_eq!(stats.size_mismatches.len(), 0); @@ -810,7 +1028,7 @@ mod tests { assert!(done); // Should have found one untracked file - let stats = reconciler.stats(); + let stats = &reconciler.report().sealed; assert_eq!(stats.untracked_files.len(), 1); assert_eq!(stats.files_checked.len(), 2); @@ -898,7 +1116,7 @@ mod tests { assert!(done); // We should check that there's at least one segment validated - let stats = reconciler.stats(); + let stats = &reconciler.report().sealed; assert_eq!(stats.files_checked.len(), 1); // We should have found exactly one size mismatch @@ -929,7 +1147,7 @@ mod tests { // After fixing, we should still have validated segments but no size mismatches in stats // NOTE: The stats tracking the mismatches that were already found won't be cleared, // but the actual size comparison should now match - let fix_stats = fix_reconciler.stats(); + let fix_stats = &fix_reconciler.report().sealed; assert_eq!(fix_stats.files_checked.len(), 1); // Now run another reconciliation to verify there are no errors @@ -938,11 +1156,212 @@ mod tests { assert!(done); // Verify no size mismatches are found after the fix - let verify_stats = verify_reconciler.stats(); + let verify_stats = &verify_reconciler.report().sealed; assert_eq!(verify_stats.size_mismatches.len(), 0, "Should detect no size mismatches after fix. Got: files_checked.len()={}, size_mismatches.len()={}, missing_files.len()={}", verify_stats.files_checked.len(), verify_stats.size_mismatches.len(), verify_stats.missing_files.len()); Ok(()) } + + /// Collect the active-bucket entries for a single partition. + fn active_entries<'a>( + report: &'a ReconcileReport, + topic: &str, + partition: &str, + ) -> Vec<&'a ActiveSegmentReport> { + report + .active + .iter() + .filter(|a| a.topic == topic && a.partition == partition) + .collect() + } + + /// A partition that has data but has never rolled (sealed_ix == None) keeps + /// its only segment in the active bucket; it must never appear as a sealed + /// size mismatch. + #[test_log::test(tokio::test)] + async fn test_active_segment_not_sealed_mismatch() -> Result<()> { + // A high roll threshold keeps a single batch in the active segment, so + // no roll happens and the watermark stays None. + let (_tmpdir, catalog) = create_test_catalog_rolling(1000).await; + + let topic_name = "active-only"; + let partition_name = "p0"; + + let topic = catalog.get_topic(topic_name).await; + topic + .extend_records(partition_name, &test_records(&["a", "b", "c"])) + .await?; + drop(topic); + + // Make the active segment durable in the manifest without sealing it. + catalog.checkpoint().await; + catalog + .get_topic(topic_name) + .await + .ensure_index(partition_name, RecordIndex(3)) + .await?; + + // Sanity: the partition has not sealed a segment yet. + { + let topic = catalog.get_topic(topic_name).await; + let partition = topic.get_partition(partition_name).await; + assert_eq!(partition.sealed_ix(), None); + } + + let config = ReconcileConfig { + track_files: true, + ..Default::default() + }; + let mut reconciler = ReconcileJob::with_config(catalog.clone(), config); + assert!(reconciler.run(Some(100)).await?); + + let report = reconciler.report(); + // The active segment must not surface as a sealed-bucket problem. + assert_eq!(report.sealed.size_mismatches.len(), 0); + assert_eq!(report.sealed.missing_files.len(), 0); + + // It must show up exactly once in the active bucket, with real sizes. + let active = active_entries(report, topic_name, partition_name); + assert_eq!(active.len(), 1); + assert!(active[0].manifest_size > 0); + assert!(active[0].disk_size > 0); + + Ok(()) + } + + /// After a roll, the just-sealed segment runs through the strict sealed + /// pipeline while the new active segment lands in the active bucket. A + /// consistent system reports no problems in either bucket. + #[test_log::test(tokio::test)] + async fn test_just_rolled_segment_in_sealed_bucket() -> Result<()> { + // Roll after 2 rows so the second batch seals segment 0. + let (_tmpdir, catalog) = create_test_catalog_rolling(2).await; + let topic_name = "rolled"; + let partition_name = "p0"; + + let topic = catalog.get_topic(topic_name).await; + // First batch fills the active segment past the roll threshold. + topic + .extend_records(partition_name, &test_records(&["a", "b", "c"])) + .await?; + // Second batch rolls (and seals) segment 0, then lands in segment 1. + // The roll waits for segment 0's seal to be durable. + topic + .extend_records(partition_name, &test_records(&["d", "e", "f"])) + .await?; + drop(topic); + + // Make the active segment (1) durable so it appears in the manifest, + // and finalize segment 0's file on disk. + catalog.checkpoint().await; + catalog + .get_topic(topic_name) + .await + .ensure_index(partition_name, RecordIndex(6)) + .await?; + catalog.checkpoint().await; + + // Segment 0 sealed; segment 1 is the active tail. + { + let topic = catalog.get_topic(topic_name).await; + let partition = topic.get_partition(partition_name).await; + assert_eq!(partition.sealed_ix(), Some(SegmentIndex(0))); + } + + let config = ReconcileConfig { + track_files: true, + ..Default::default() + }; + let mut reconciler = ReconcileJob::with_config(catalog.clone(), config); + assert!(reconciler.run(Some(100)).await?); + + let report = reconciler.report(); + // The rolled segment was checked in the sealed bucket. + assert!(report.sealed.files_checked.len() >= 1); + // Healthy system: no problems in either bucket. + assert_eq!(report.sealed.size_mismatches.len(), 0); + assert_eq!(report.sealed.missing_files.len(), 0); + + // The still-active segment shows up in the active bucket. + let active = active_entries(report, topic_name, partition_name); + assert_eq!(active.len(), 1); + + Ok(()) + } + + /// The UpdateManifestSizes fix must never apply to an active segment, even + /// when its on-disk size is corrupted. The drift is reported (non-zero + /// delta) but the manifest is left untouched. + #[test_log::test(tokio::test)] + async fn test_fix_never_targets_active_segment() -> Result<()> { + use tokio::io::AsyncWriteExt; + + // High roll threshold keeps the single segment active (sealed_ix None). + let (_tmpdir, catalog) = create_test_catalog_rolling(1000).await; + let topic_name = "active-fix"; + let partition_name = "p0"; + + let topic = catalog.get_topic(topic_name).await; + topic + .extend_records(partition_name, &test_records(&["a", "b", "c"])) + .await?; + drop(topic); + catalog.checkpoint().await; + catalog + .get_topic(topic_name) + .await + .ensure_index(partition_name, RecordIndex(3)) + .await?; + + let partition_id = PartitionId::new(topic_name, partition_name); + // Record the manifest's stored size for the (active) segment 0. + let original_size = manifest_segment_size(&catalog, &partition_id, SegmentIndex(0)) + .await + .expect("segment 0 should be in the manifest"); + + // Corrupt the on-disk size by appending extra bytes to the active + // segment file before any roll seals it. + let topic_path = Topic::partition_root(catalog.topic_root(), topic_name); + let slog_name = Partition::slog_name(&partition_id); + let segment_path = Slog::segment_path(&topic_path, &slog_name, SegmentIndex(0)); + { + let mut f = fs::OpenOptions::new().append(true).open(&segment_path).await?; + f.write_all(&vec![0u8; 1000]).await?; + f.flush().await?; + } + + let fix_config = ReconcileConfig { + track_files: true, + fixes: BTreeSet::from([ReconcileFix::UpdateManifestSizes]), + ..Default::default() + }; + let mut reconciler = ReconcileJob::with_config(catalog.clone(), fix_config); + assert!(reconciler.run(Some(100)).await?); + + // The fix must not have touched the active segment's manifest size. + let after_size = manifest_segment_size(&catalog, &partition_id, SegmentIndex(0)) + .await + .expect("segment 0 should still be in the manifest"); + assert_eq!( + after_size, original_size, + "active-segment manifest size must be unchanged by the fix" + ); + + let report = reconciler.report(); + // The corruption is never a sealed-bucket mismatch for an active segment. + assert_eq!(report.sealed.size_mismatches.len(), 0); + + // The active bucket reports the (positive) drift we introduced. + let active = active_entries(report, topic_name, partition_name); + assert_eq!(active.len(), 1); + assert!( + active[0].delta > 0, + "expected positive delta, got {}", + active[0].delta + ); + + Ok(()) + } } diff --git a/server/src/http.rs b/server/src/http.rs index c3025f1..1e5e7fb 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -27,9 +27,9 @@ use utoipa_swagger_ui::SwaggerUi; use crate::config::PlateauConfig; use crate::transport::{ - DataFocus, InfoResponse, Inserted, PartitionInfo, Partitions, ReconcileStats, RecordQuery, - RecordStatus, Span, Topic, TopicInfo, TopicIterationOrder, TopicIterationQuery, - TopicIterationStatus, TopicIterator, Topics, + ActiveSegmentReport, DataFocus, InfoResponse, Inserted, PartitionInfo, Partitions, + ReconcileStats, RecordQuery, RecordStatus, Span, Topic, TopicInfo, TopicIterationOrder, + TopicIterationQuery, TopicIterationStatus, TopicIterator, Topics, }; pub use crate::axum_util::{query::Query, Response}; @@ -547,19 +547,36 @@ async fn get_info( .await .map_err(|_| ErrorReply::Unknown)?; - let reconcile_stats = reconciler.stats(); + let report = reconciler.report(); + let sealed = &report.sealed; let retention_stats = ReconcileStats { - files_checked: reconcile_stats.files_checked.len(), - untracked_files: reconcile_stats.untracked_files.len(), - size_mismatches: reconcile_stats.size_mismatches.len(), - missing_files: reconcile_stats.missing_files.len(), - expected_size: reconcile_stats.expected_size.as_u64() as usize, - actual_size: reconcile_stats.actual_size.as_u64() as usize, + files_checked: sealed.files_checked.len(), + untracked_files: sealed.untracked_files.len(), + size_mismatches: sealed.size_mismatches.len(), + missing_files: sealed.missing_files.len(), + expected_size: sealed.expected_size.as_u64() as usize, + actual_size: sealed.actual_size.as_u64() as usize, }; + let active_segments = report + .active + .iter() + .map(|a| ActiveSegmentReport { + topic: a.topic.clone(), + partition: a.partition.clone(), + manifest_size: a.manifest_size, + disk_size: a.disk_size, + delta: a.delta, + last_manifest_update_age_secs: a + .last_manifest_update_age + .map(|d| d.as_secs_f64()), + }) + .collect(); + Ok(Response::ok(InfoResponse { topics, retention_stats, + active_segments, })) } @@ -590,6 +607,7 @@ async fn get_info( TopicInfo, PartitionInfo, ReconcileStats, + ActiveSegmentReport, ) ), tags( diff --git a/server/tests/server.rs b/server/tests/server.rs index 4f9033a..1a7a79a 100644 --- a/server/tests/server.rs +++ b/server/tests/server.rs @@ -1056,5 +1056,20 @@ async fn info_endpoint() -> Result<()> { assert!(retention_stats["expected_size"].is_number()); assert!(retention_stats["actual_size"].is_number()); + // The active-segment bucket is part of the wire format. The freshly written + // partitions have not rolled, so their active tails should be reported here. + let active_segments = info_response["active_segments"].as_array().unwrap(); + assert!( + !active_segments.is_empty(), + "expected at least one active-segment report" + ); + for entry in active_segments { + assert!(entry["topic"].is_string()); + assert!(entry["partition"].is_string()); + assert!(entry["manifest_size"].is_number()); + assert!(entry["disk_size"].is_number()); + assert!(entry["delta"].is_number()); + } + Ok(()) } diff --git a/transport/src/lib.rs b/transport/src/lib.rs index d04e62c..9e4d103 100644 --- a/transport/src/lib.rs +++ b/transport/src/lib.rs @@ -455,7 +455,15 @@ pub struct TopicInfo { #[cfg_attr(feature = "rweb", derive(Schema))] pub struct InfoResponse { pub topics: Vec, + /// Strict sealed-segment reconciliation stats. pub retention_stats: ReconcileStats, + /// Informational per-partition reports for the currently active (writeable) + /// tail segments. These are never "fixed" by reconciliation; a non-zero + /// `delta` is expected while writes accumulate before a manifest flush. + /// + /// Defaulted on deserialization so older clients and payloads remain valid. + #[serde(default)] + pub active_segments: Vec, } #[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)] @@ -469,6 +477,26 @@ pub struct ReconcileStats { pub actual_size: usize, } +/// Wire form of a reconciliation report for the active (writeable) tail segment +/// of a single partition. +/// +/// `delta` is `disk_size - manifest_size`, signed: a negative value means the +/// manifest claims more bytes than exist on disk, which is an alert signal +/// (corruption or an accounting bug). A positive value is normal — writes +/// accumulate on disk before the manifest is flushed. +#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)] +#[cfg_attr(feature = "rweb", derive(Schema))] +pub struct ActiveSegmentReport { + pub topic: String, + pub partition: String, + pub manifest_size: usize, + pub disk_size: usize, + pub delta: i64, + /// Seconds since this partition's most recent manifest update landed + /// durably, if known. Currently always `None` (no write-path hook yet). + pub last_manifest_update_age_secs: Option, +} + #[derive(Debug, Clone, Deserialize, Serialize)] #[cfg_attr(feature = "rweb", derive(Schema))] pub struct ErrorMessage { From 8e220e248b1c1b6425d83f548e6e97b280a00a26 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 4 Jun 2026 21:09:36 +0000 Subject: [PATCH 2/2] Drop last_manifest_update_age from active-segment report No plans to populate it in the near term; can be re-added if/when the write path grows a durable-update timestamp hook. --- catalog/src/reconcile.rs | 24 +++++++----------------- server/src/http.rs | 3 --- transport/src/lib.rs | 3 --- 3 files changed, 7 insertions(+), 23 deletions(-) diff --git a/catalog/src/reconcile.rs b/catalog/src/reconcile.rs index 9fd078b..45e690a 100644 --- a/catalog/src/reconcile.rs +++ b/catalog/src/reconcile.rs @@ -226,8 +226,6 @@ impl ReconcileStats { /// disk, which points at corruption or an accounting bug. /// - `delta` growing unboundedly across consecutive scans suggests manifest /// updates are stuck or falling behind. -/// - `last_manifest_update_age` exceeding a threshold suggests the write path is -/// wedged. (See the field's TODO; currently always `None`.) #[derive(Debug, Clone)] pub struct ActiveSegmentReport { /// Topic the active segment belongs to. @@ -241,13 +239,6 @@ pub struct ActiveSegmentReport { /// `disk_size as i64 - manifest_size as i64`. Signed; negative is an alert /// signal (manifest claims more bytes than disk has). pub delta: i64, - /// Time since this partition's most recent manifest update landed durably. - /// - /// TODO: there is no hook in the write path that records when the last - /// manifest update became durable, so this is always `None` for now. Wire - /// it up once the write path exposes that timestamp; per this PR's scope we - /// deliberately do not add new plumbing in the write path for it. - pub last_manifest_update_age: Option, } /// The full output of a reconciliation pass, split into two buckets. @@ -571,9 +562,8 @@ impl ReconcileJob { let expected_size = ByteSize(segment.size as u64); let actual_size = ByteSize(total_actual_size as u64); - self.state.report.sealed.expected_size = ByteSize( - self.state.report.sealed.expected_size.as_u64() + expected_size.as_u64(), - ); + self.state.report.sealed.expected_size = + ByteSize(self.state.report.sealed.expected_size.as_u64() + expected_size.as_u64()); self.state.report.sealed.actual_size = ByteSize(self.state.report.sealed.actual_size.as_u64() + actual_size.as_u64()); @@ -670,9 +660,6 @@ impl ReconcileJob { manifest_size, disk_size, delta, - // TODO: no durable-update timestamp hook exists in the write path - // yet; see ActiveSegmentReport::last_manifest_update_age. - last_manifest_update_age: None, }); } @@ -1279,7 +1266,7 @@ mod tests { let report = reconciler.report(); // The rolled segment was checked in the sealed bucket. - assert!(report.sealed.files_checked.len() >= 1); + assert!(!report.sealed.files_checked.is_empty()); // Healthy system: no problems in either bucket. assert_eq!(report.sealed.size_mismatches.len(), 0); assert_eq!(report.sealed.missing_files.len(), 0); @@ -1327,7 +1314,10 @@ mod tests { let slog_name = Partition::slog_name(&partition_id); let segment_path = Slog::segment_path(&topic_path, &slog_name, SegmentIndex(0)); { - let mut f = fs::OpenOptions::new().append(true).open(&segment_path).await?; + let mut f = fs::OpenOptions::new() + .append(true) + .open(&segment_path) + .await?; f.write_all(&vec![0u8; 1000]).await?; f.flush().await?; } diff --git a/server/src/http.rs b/server/src/http.rs index 1e5e7fb..0573643 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -567,9 +567,6 @@ async fn get_info( manifest_size: a.manifest_size, disk_size: a.disk_size, delta: a.delta, - last_manifest_update_age_secs: a - .last_manifest_update_age - .map(|d| d.as_secs_f64()), }) .collect(); diff --git a/transport/src/lib.rs b/transport/src/lib.rs index 9e4d103..b64cc89 100644 --- a/transport/src/lib.rs +++ b/transport/src/lib.rs @@ -492,9 +492,6 @@ pub struct ActiveSegmentReport { pub manifest_size: usize, pub disk_size: usize, pub delta: i64, - /// Seconds since this partition's most recent manifest update landed - /// durably, if known. Currently always `None` (no write-path hook yet). - pub last_manifest_update_age_secs: Option, } #[derive(Debug, Clone, Deserialize, Serialize)]