diff --git a/catalog/src/manifest.rs b/catalog/src/manifest.rs index b3ddf92..aec0083 100644 --- a/catalog/src/manifest.rs +++ b/catalog/src/manifest.rs @@ -281,6 +281,38 @@ impl Manifest { .unwrap(); } + /// Correct a segment's recorded size, but only if the row still exists and + /// still records `observed_size`. Returns true when the update was applied. + /// + /// Unlike [`update`](Self::update), this never re-creates a row: if retention + /// has removed the segment since it was observed, the conditional `WHERE` + /// matches nothing and we leave the manifest untouched. This makes it safe to + /// apply size fixes from a reconciliation snapshot that may have gone stale + /// while writes and retention ran concurrently. + pub(crate) async fn update_size_if_unchanged( + &self, + id: &PartitionId, + index: SegmentIndex, + observed_size: usize, + corrected_size: usize, + ) -> bool { + let result = sqlx::query( + " + UPDATE segments SET size = ?4 + WHERE topic = ?1 AND partition = ?2 AND segment_index = ?3 AND size = ?5 + ", + ) + .bind(id.topic()) + .bind(id.partition()) + .bind(index.to_row()) + .bind(i64::try_from(corrected_size).unwrap()) + .bind(i64::try_from(observed_size).unwrap()) + .execute(&self.pool) + .await + .unwrap(); + result.rows_affected() > 0 + } + /// Inverse of upsert, gets any previously stored data for a segment. pub async fn get_segment_data(&self, id: SegmentId<&PartitionId>) -> Option { sqlx::query( @@ -849,6 +881,58 @@ mod test { assert_eq!(state.get_partition_range(&id).await, None); } + #[tokio::test] + async fn test_update_size_if_unchanged() { + let id = PartitionId::new("topic", "cas"); + let root = tempdir().unwrap(); + let path = root.path().join("testing.sqlite"); + let state = Manifest::attach(path).await; + + let time = Utc.timestamp_opt(0, 0).unwrap()..=Utc.timestamp_opt(0, 0).unwrap(); + state + .update( + &id, + &SegmentData { + index: SegmentIndex(0), + time, + records: RecordIndex(0)..RecordIndex(10), + size: 12, + version: SEGMENT_FORMAT_VERSION, + }, + ) + .await; + assert_eq!(state.get_size(Scope::Partition(&id)).await, 12); + + // Stale observation: the row no longer records the size we think it does, + // so the conditional update must not apply. + assert!( + !state + .update_size_if_unchanged(&id, SegmentIndex(0), 999, 34) + .await + ); + assert_eq!(state.get_size(Scope::Partition(&id)).await, 12); + + // Matching observation: the fix applies. + assert!( + state + .update_size_if_unchanged(&id, SegmentIndex(0), 12, 34) + .await + ); + assert_eq!(state.get_size(Scope::Partition(&id)).await, 34); + + // Simulate retention removing the segment, then a stale fix racing in. The + // update must not resurrect the deleted row. + state.remove_segment(SegmentIndex(0).to_id(&id)).await; + assert_eq!(state.get_min_segment(&id).await, None); + assert!( + !state + .update_size_if_unchanged(&id, SegmentIndex(0), 34, 56) + .await + ); + assert_eq!(state.get_min_segment(&id).await, None); + assert_eq!(state.get_size(Scope::Partition(&id)).await, 0); + } + #[tokio::test] async fn test_time_query() { let topic = "topic"; diff --git a/catalog/src/reconcile.rs b/catalog/src/reconcile.rs index e17a05a..4ec629d 100644 --- a/catalog/src/reconcile.rs +++ b/catalog/src/reconcile.rs @@ -512,15 +512,29 @@ impl ReconcileJob { .fixes .contains(&ReconcileFix::UpdateManifestSizes) { - info!("Fixing size mismatch for segment {}", segment_file_name); - let mut corrected_segment = segment.clone(); - corrected_segment.size = total_actual_size; - - // Update the manifest with the correct size - self.catalog + // Apply the fix conditionally on the size we observed. If + // retention removed this segment (or another writer changed + // it) since we snapshotted the manifest, the update is a + // no-op rather than re-creating a stale row. This keeps the + // fix safe to run while writes and retention are in flight. + let applied = self + .catalog .manifest() - .update(&partition_id, &corrected_segment) + .update_size_if_unchanged( + &partition_id, + segment.index, + segment.size, + total_actual_size, + ) .await; + if applied { + info!("Fixed size mismatch for segment {}", segment_file_name); + } else { + info!( + "Skipped stale size fix for segment {} (concurrently removed or changed)", + segment_file_name + ); + } } } else { debug!(