From 2ee963b1da94572bf5fbb38670dbdc4e1a2dc3b2 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 5 Jun 2025 13:01:21 +0200 Subject: [PATCH] Fix bug in cleaner with overflows. --- crates/worker/src/partition/cleaner.rs | 91 ++++++++++++++++---------- 1 file changed, 56 insertions(+), 35 deletions(-) diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs index 55a407715e..57c8e95dba 100644 --- a/crates/worker/src/partition/cleaner.rs +++ b/crates/worker/src/partition/cleaner.rs @@ -130,34 +130,32 @@ where // thus it will be cleaned up with the old timer. continue; }; - let Some(status_expiration_time) = SystemTime::from(completed_time) - .checked_add(completed_invocation.completion_retention_duration) - else { - // If sum overflow, then the cleanup time lies far enough in the future - continue; - }; let now = SystemTime::now(); - if now >= status_expiration_time { - restate_bifrost::append_to_bifrost( - bifrost, - Arc::new(Envelope { - header: Header { - source: bifrost_envelope_source.clone(), - dest: Destination::Processor { - partition_key: invocation_id.partition_key(), - dedup: None, + if let Some(status_expiration_time) = SystemTime::from(completed_time) + .checked_add(completed_invocation.completion_retention_duration) + { + if now >= status_expiration_time { + restate_bifrost::append_to_bifrost( + bifrost, + Arc::new(Envelope { + header: Header { + source: bifrost_envelope_source.clone(), + dest: Destination::Processor { + partition_key: invocation_id.partition_key(), + dedup: None, + }, }, - }, - command: Command::PurgeInvocation(PurgeInvocationRequest { - invocation_id, - response_sink: None, + command: Command::PurgeInvocation(PurgeInvocationRequest { + invocation_id, + response_sink: None, + }), }), - }), - ) - .await - .context("Cannot append to bifrost purge invocation")?; - continue; + ) + .await + .context("Cannot append to bifrost purge invocation")?; + continue; + } } // We don't cleanup the status yet, let's check if there's a journal to cleanup @@ -209,7 +207,7 @@ mod tests { use restate_storage_api::StorageError; use restate_storage_api::invocation_status_table::{ CompletedInvocation, InFlightInvocationMetadata, InvocationStatus, - InvokedInvocationStatusLite, + InvokedInvocationStatusLite, JournalMetadata, }; use restate_types::Version; use restate_types::identifiers::{InvocationId, InvocationUuid}; @@ -266,6 +264,8 @@ mod tests { let expired_invocation = InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); + let expired_journal = + InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); let not_expired_invocation_1 = InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); let not_expired_invocation_2 = @@ -281,6 +281,19 @@ mod tests { ..CompletedInvocation::mock_neo() }), ), + ( + expired_journal, + InvocationStatus::Completed(CompletedInvocation { + completion_retention_duration: Duration::MAX, + journal_retention_duration: Duration::ZERO, + journal_metadata: JournalMetadata { + length: 2, + commands: 2, + span_context: Default::default(), + }, + ..CompletedInvocation::mock_neo() + }), + ), ( not_expired_invocation_1, InvocationStatus::Completed(CompletedInvocation { @@ -324,19 +337,27 @@ mod tests { }) .unwrap(); - let mut log_entries = bifrost.read_all(partition_id.into()).await.unwrap(); - let bifrost_message = log_entries - .remove(0) - .try_decode::() + let log_entries: Vec<_> = bifrost + .read_all(partition_id.into()) + .await .unwrap() - .unwrap(); + .into_iter() + .map(|e| e.try_decode::().unwrap().unwrap().command) + .collect(); assert_that!( - bifrost_message.command, - pat!(Command::PurgeInvocation(pat!(PurgeInvocationRequest { - invocation_id: eq(expired_invocation) - }))) + log_entries, + all!( + len(eq(2)), + contains(pat!(Command::PurgeInvocation(pat!( + PurgeInvocationRequest { + invocation_id: eq(expired_invocation), + } + )))), + contains(pat!(Command::PurgeJournal(pat!(PurgeInvocationRequest { + invocation_id: eq(expired_journal), + })))), + ) ); - assert_that!(log_entries, empty()); } }