Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions catalog/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,38 @@ impl Manifest {
.unwrap();
}

/// Correct a segment's recorded size, but only if the row still exists and
/// still records `observed_size`. Returns true when the update was applied.
///
/// Unlike [`update`](Self::update), this never re-creates a row: if retention
/// has removed the segment since it was observed, the conditional `WHERE`
/// matches nothing and we leave the manifest untouched. This makes it safe to
/// apply size fixes from a reconciliation snapshot that may have gone stale
/// while writes and retention ran concurrently.
pub(crate) async fn update_size_if_unchanged(
&self,
id: &PartitionId,
index: SegmentIndex,
observed_size: usize,
corrected_size: usize,
) -> bool {
let result = sqlx::query(
"
UPDATE segments SET size = ?4
WHERE topic = ?1 AND partition = ?2 AND segment_index = ?3 AND size = ?5
",
)
.bind(id.topic())
.bind(id.partition())
.bind(index.to_row())
.bind(i64::try_from(corrected_size).unwrap())
.bind(i64::try_from(observed_size).unwrap())
.execute(&self.pool)
.await
.unwrap();
result.rows_affected() > 0
}

/// Inverse of upsert, gets any previously stored data for a segment.
pub async fn get_segment_data(&self, id: SegmentId<&PartitionId>) -> Option<SegmentData> {
sqlx::query(
Expand Down Expand Up @@ -849,6 +881,58 @@ mod test {
assert_eq!(state.get_partition_range(&id).await, None);
}

#[tokio::test]
async fn test_update_size_if_unchanged() {
let id = PartitionId::new("topic", "cas");
let root = tempdir().unwrap();
let path = root.path().join("testing.sqlite");
let state = Manifest::attach(path).await;

let time = Utc.timestamp_opt(0, 0).unwrap()..=Utc.timestamp_opt(0, 0).unwrap();
state
.update(
&id,
&SegmentData {
index: SegmentIndex(0),
time,
records: RecordIndex(0)..RecordIndex(10),
size: 12,
version: SEGMENT_FORMAT_VERSION,
},
)
.await;
assert_eq!(state.get_size(Scope::Partition(&id)).await, 12);

// Stale observation: the row no longer records the size we think it does,
// so the conditional update must not apply.
assert!(
!state
.update_size_if_unchanged(&id, SegmentIndex(0), 999, 34)
.await
);
assert_eq!(state.get_size(Scope::Partition(&id)).await, 12);

// Matching observation: the fix applies.
assert!(
state
.update_size_if_unchanged(&id, SegmentIndex(0), 12, 34)
.await
);
assert_eq!(state.get_size(Scope::Partition(&id)).await, 34);

// Simulate retention removing the segment, then a stale fix racing in. The
// update must not resurrect the deleted row.
state.remove_segment(SegmentIndex(0).to_id(&id)).await;
assert_eq!(state.get_min_segment(&id).await, None);
assert!(
!state
.update_size_if_unchanged(&id, SegmentIndex(0), 34, 56)
.await
);
assert_eq!(state.get_min_segment(&id).await, None);
assert_eq!(state.get_size(Scope::Partition(&id)).await, 0);
}

#[tokio::test]
async fn test_time_query() {
let topic = "topic";
Expand Down
28 changes: 21 additions & 7 deletions catalog/src/reconcile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,15 +512,29 @@ impl ReconcileJob {
.fixes
.contains(&ReconcileFix::UpdateManifestSizes)
{
info!("Fixing size mismatch for segment {}", segment_file_name);
let mut corrected_segment = segment.clone();
corrected_segment.size = total_actual_size;

// Update the manifest with the correct size
self.catalog
// Apply the fix conditionally on the size we observed. If
// retention removed this segment (or another writer changed
// it) since we snapshotted the manifest, the update is a
// no-op rather than re-creating a stale row. This keeps the
// fix safe to run while writes and retention are in flight.
let applied = self
.catalog
.manifest()
.update(&partition_id, &corrected_segment)
.update_size_if_unchanged(
&partition_id,
segment.index,
segment.size,
total_actual_size,
)
.await;
if applied {
info!("Fixed size mismatch for segment {}", segment_file_name);
} else {
info!(
"Skipped stale size fix for segment {} (concurrently removed or changed)",
segment_file_name
);
}
}
} else {
debug!(
Expand Down
Loading