From a5a0868713c3a81ed7f8a4b6a173d71aa2f54a45 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 14 May 2026 09:24:12 +0200 Subject: [PATCH 1/7] Introduce new behavior for run completion proposals: in protocol v7 we now send back an ad-hoc message indicating an ack for the completion proposal, instead than sending back the full completion. --- crates/invoker-impl/src/input_command.rs | 19 ++- .../src/invocation_state_machine.rs | 35 ++++- .../service_protocol_runner.rs | 4 +- .../service_protocol_runner_v4.rs | 9 +- crates/invoker-impl/src/lib.rs | 23 +++- crates/invoker-impl/src/test_util.rs | 10 +- crates/service-protocol-v4/src/lib.rs | 1 + .../src/message_codec/mod.rs | 5 + crates/worker-api/src/invoker/effects.rs | 5 +- crates/worker-api/src/invoker/handle.rs | 8 +- .../src/partition/leadership/leader_state.rs | 8 ++ .../src/partition/state_machine/actions.rs | 6 +- .../state_machine/entries/notification.rs | 130 ++++++++++++++++-- .../worker/src/partition/state_machine/mod.rs | 14 ++ .../partition/state_machine/tests/matchers.rs | 10 ++ .../dev/restate/service/protocol.proto | 11 ++ 16 files changed, 274 insertions(+), 24 deletions(-) diff --git a/crates/invoker-impl/src/input_command.rs b/crates/invoker-impl/src/input_command.rs index af26344596..0a35dffcf7 100644 --- a/crates/invoker-impl/src/input_command.rs +++ b/crates/invoker-impl/src/input_command.rs @@ -14,7 +14,7 @@ use restate_errors::NotRunningError; use restate_types::LimitKey; use restate_types::identifiers::{EntryIndex, InvocationId}; use restate_types::invocation::InvocationTarget; -use restate_types::journal_v2::{CommandIndex, NotificationId}; +use restate_types::journal_v2::{CommandIndex, CompletionId, NotificationId}; use restate_types::sharding::KeyRange; use restate_types::vqueues::VQueueId; use restate_util_string::ReString; @@ -58,6 +58,10 @@ pub(crate) enum InputCommand { invocation_id: InvocationId, command_index: CommandIndex, }, + StoredNotificationProposalAck { + invocation_id: InvocationId, + completion_id: CompletionId, + }, /// Abort specific invocation id Abort { @@ -161,6 +165,19 @@ impl restate_worker_api::invoker::InvokerHandle for InvokerHandle { .map_err(|_| NotRunningError) } + fn notify_stored_notification_proposal_ack( + &mut self, + invocation_id: InvocationId, + completion_id: CompletionId, + ) -> Result<(), NotRunningError> { + self.input + .send(InputCommand::StoredNotificationProposalAck { + invocation_id, + completion_id, + }) + .map_err(|_| NotRunningError) + } + fn abort_all(&mut self) -> Result<(), NotRunningError> { self.input .send(InputCommand::AbortAll) diff --git a/crates/invoker-impl/src/invocation_state_machine.rs b/crates/invoker-impl/src/invocation_state_machine.rs index 85a2d75de7..9f71b59b40 100644 --- a/crates/invoker-impl/src/invocation_state_machine.rs +++ b/crates/invoker-impl/src/invocation_state_machine.rs @@ -395,7 +395,10 @@ impl InvocationStateMachine { .. } => { if command_acks_to_propagate.remove(&command_index) { - Self::try_send_notification(notifications_tx, Notification::Ack(command_index)); + Self::try_send_notification( + notifications_tx, + Notification::CommandAck(command_index), + ); } journal_tracker.notify_acked_command_from_partition_processor(command_index); } @@ -408,6 +411,32 @@ impl InvocationStateMachine { } } + pub(super) fn notify_stored_notification_proposal_ack(&mut self, completion_id: CompletionId) { + match &mut self.invocation_state { + AttemptState::InFlight { + journal_tracker, + notifications_tx, + .. + } => { + journal_tracker.notify_acked_notification_from_partition_processor( + NotificationId::CompletionId(completion_id), + ); + Self::try_send_notification( + notifications_tx, + Notification::ProposeRunCompletionAck(completion_id), + ); + } + AttemptState::WaitingRetry { + journal_tracker, .. + } => { + journal_tracker.notify_acked_notification_from_partition_processor( + NotificationId::CompletionId(completion_id), + ); + } + _ => {} + } + } + pub(super) fn notify_completion(&mut self, entry_index: EntryIndex) { if let AttemptState::InFlight { notifications_tx, .. @@ -726,9 +755,9 @@ mod tests { // Check notification was sent for ack 1 and 3 let notification = rx.recv().await; - assert_that!(notification, some(pat!(Notification::Ack(eq(1))))); + assert_that!(notification, some(pat!(Notification::CommandAck(eq(1))))); let notification = rx.recv().await; - assert_that!(notification, some(pat!(Notification::Ack(eq(3))))); + assert_that!(notification, some(pat!(Notification::CommandAck(eq(3))))); // Channel should be empty let try_recv = rx.try_recv(); diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs index 0e24fe2b8d..d8753b9013 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs @@ -433,11 +433,11 @@ where trace!("Sending the completion to the wire"); crate::shortcircuit!(self.write_with_lease(&mut http_stream_tx, completion.into(), Some(lease))); }, - Some(Notification::Ack(entry_index)) => { + Some(Notification::CommandAck(entry_index)) => { trace!("Sending the ack to the wire"); crate::shortcircuit!(self.write(&mut http_stream_tx, ProtocolMessage::new_entry_ack(entry_index))); }, - Some(Notification::Entry { .. }) => { + Some(Notification::Entry { .. }) | Some(Notification::ProposeRunCompletionAck(_)) => { panic!("We don't expect to receive journal_v2 entries, this is an invoker bug.") }, None => { diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index 3fd1ed024f..5f26837dbf 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -604,10 +604,14 @@ where Some(Notification::Completion(_)) => { panic!("We don't expect to receive Notification::Completion in v4+, this is an invoker bug.") }, - Some(Notification::Ack(entry_index)) => { + Some(Notification::CommandAck(entry_index)) => { trace!("Sending the ack to the wire"); shortcircuit!(self.write(&mut http_stream_tx, Message::new_command_ack(entry_index))); }, + Some(Notification::ProposeRunCompletionAck(completion_id)) => { + trace!("Sending ProposeRunCompletionAck to the wire"); + shortcircuit!(self.write(&mut http_stream_tx, Message::new_propose_run_completion_ack(completion_id))); + }, None => { // Completion channel is closed, // the invoker main loop won't send completions anymore. @@ -933,6 +937,9 @@ where Message::CommandAck(_) => TerminalLoopState::Failed(InvokerError::UnexpectedMessageV4( MessageType::CommandAck, )), + Message::ProposeRunCompletionAck(_) => TerminalLoopState::Failed( + InvokerError::UnexpectedMessageV4(MessageType::ProposeRunCompletionAck), + ), Message::Suspension(suspension) => self.handle_suspension_message(suspension), Message::AwaitingOn(awaiting_on) => self.handle_awaiting_on_message(awaiting_on), Message::Error(e) => self.handle_error_message(e), diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 7d6d19ef79..86e5ed697f 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -53,7 +53,9 @@ use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal_events::raw::RawEvent; use restate_types::journal_events::{Event, PausedEvent, TransientErrorEvent}; use restate_types::journal_v2::raw::{RawCommand, RawNotification}; -use restate_types::journal_v2::{CommandIndex, EntryMetadata, NotificationId, UnresolvedFuture}; +use restate_types::journal_v2::{ + CommandIndex, CompletionId, EntryMetadata, NotificationId, UnresolvedFuture, +}; use restate_types::live::{Live, LiveLoad}; use restate_types::schema::deployment::DeploymentResolver; use restate_types::schema::invocation_target::InvocationTargetResolver; @@ -94,7 +96,9 @@ pub(crate) enum Notification { /// V2 notification signal: entry index. Entry(EntryIndex), /// V2 command ack: already signal-only. - Ack(CommandIndex), + CommandAck(CommandIndex), + /// Propose run completion ack (protocol >= v7). + ProposeRunCompletionAck(CompletionId), } // -- InvocationTask factory: we use this to mock the state machine in tests @@ -521,6 +525,9 @@ where InputCommand::StoredCommandAck { invocation_id, command_index } => { self.handle_stored_command_ack(options, invocation_id, command_index); } + InputCommand::StoredNotificationProposalAck { invocation_id, completion_id } => { + self.handle_stored_notification_proposal_ack(options, invocation_id, completion_id); + } } }, Some(invoke_input_command) = segmented_input_queue.next(), if !segmented_input_queue.inner().is_empty() && self.quota.is_slot_available() && self.pending_memory_lease.is_some() => { @@ -758,6 +765,18 @@ where }); } + fn handle_stored_notification_proposal_ack( + &mut self, + options: &InvokerOptions, + invocation_id: InvocationId, + completion_id: CompletionId, + ) { + trace!("Received a new stored notification proposal acknowledgement"); + self.handle_retry_event(options, invocation_id, |sm| { + sm.notify_stored_notification_proposal_ack(completion_id) + }); + } + #[instrument( level = "trace", skip_all, diff --git a/crates/invoker-impl/src/test_util.rs b/crates/invoker-impl/src/test_util.rs index 640e74da8f..fc4c9ebfc1 100644 --- a/crates/invoker-impl/src/test_util.rs +++ b/crates/invoker-impl/src/test_util.rs @@ -22,7 +22,7 @@ use restate_types::journal::enriched::{ AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader, EnrichedRawEntry, }; use restate_types::journal::raw::{PlainEntryHeader, PlainRawEntry, RawEntry}; -use restate_types::journal_v2::CommandIndex; +use restate_types::journal_v2::{CommandIndex, CompletionId}; use restate_types::time::MillisSinceEpoch; use restate_types::vqueues::VQueueId; use restate_util_string::ReString; @@ -175,6 +175,14 @@ impl InvokerHandle for MockInvokerHandle { Ok(()) } + fn notify_stored_notification_proposal_ack( + &mut self, + _invocation_id: InvocationId, + _completion_id: CompletionId, + ) -> Result<(), NotRunningError> { + Ok(()) + } + fn abort_all(&mut self) -> Result<(), NotRunningError> { Ok(()) } diff --git a/crates/service-protocol-v4/src/lib.rs b/crates/service-protocol-v4/src/lib.rs index 6641bd23a4..0b6bc23b51 100644 --- a/crates/service-protocol-v4/src/lib.rs +++ b/crates/service-protocol-v4/src/lib.rs @@ -34,6 +34,7 @@ pub mod proto { EndMessage, CommandAckMessage, ProposeRunCompletionMessage, + ProposeRunCompletionAckMessage, CallCommandMessage, OneWayCallCommandMessage, AwaitingOnMessage, diff --git a/crates/service-protocol-v4/src/message_codec/mod.rs b/crates/service-protocol-v4/src/message_codec/mod.rs index ac330932c2..23997665d5 100644 --- a/crates/service-protocol-v4/src/message_codec/mod.rs +++ b/crates/service-protocol-v4/src/message_codec/mod.rs @@ -336,6 +336,7 @@ gen_message!( CommandAck Control = 0x0004, ProposeRunCompletion Control = 0x0005, AwaitingOn Control = 0x0006, + ProposeRunCompletionAck Control = 0x0007, Input Command noparse allows_ack = 0x0400, Output Command noparse allows_ack = 0x0401, @@ -420,4 +421,8 @@ impl Message { pub fn new_command_ack(command_index: CommandIndex) -> Self { Self::CommandAck(proto::CommandAckMessage { command_index }) } + + pub fn new_propose_run_completion_ack(completion_id: u32) -> Self { + Self::ProposeRunCompletionAck(proto::ProposeRunCompletionAckMessage { completion_id }) + } } diff --git a/crates/worker-api/src/invoker/effects.rs b/crates/worker-api/src/invoker/effects.rs index c15e17ede2..dda06fd6b8 100644 --- a/crates/worker-api/src/invoker/effects.rs +++ b/crates/worker-api/src/invoker/effects.rs @@ -18,9 +18,8 @@ use restate_types::identifiers::InvocationId; use restate_types::journal::EntryIndex; use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal_events::raw::RawEvent; -use restate_types::journal_v2::CommandIndex; use restate_types::journal_v2::raw::RawEntry; -use restate_types::journal_v2::{self, UnresolvedFuture}; +use restate_types::journal_v2::{self, CommandIndex, UnresolvedFuture}; use restate_types::storage::{StoredRawEntry, StoredRawEntryHeader}; use restate_types::time::MillisSinceEpoch; @@ -71,7 +70,7 @@ pub enum EffectKind { // Introduced in Restate v1.7. With the new service-protocol v7 SuspendedV3 { /// Future tree describing the notifications this invocation is waiting on. - /// Introduced in Restate v1.7 (protocol version V7). `None` for older invocations. + /// Introduced in Restate v1.7 (protocol version V7). awaiting_on: UnresolvedFuture, }, Paused { diff --git a/crates/worker-api/src/invoker/handle.rs b/crates/worker-api/src/invoker/handle.rs index 30a6d0dc3a..8c73863a4a 100644 --- a/crates/worker-api/src/invoker/handle.rs +++ b/crates/worker-api/src/invoker/handle.rs @@ -13,7 +13,7 @@ use restate_errors::NotRunningError; use restate_types::LimitKey; use restate_types::identifiers::{EntryIndex, InvocationId}; use restate_types::invocation::InvocationTarget; -use restate_types::journal_v2::{CommandIndex, NotificationId}; +use restate_types::journal_v2::{CommandIndex, CompletionId, NotificationId}; use restate_types::vqueues::VQueueId; use restate_util_string::ReString; @@ -57,6 +57,12 @@ pub trait InvokerHandle { command_index: CommandIndex, ) -> Result<(), NotRunningError>; + fn notify_stored_notification_proposal_ack( + &mut self, + invocation_id: InvocationId, + completion_id: CompletionId, + ) -> Result<(), NotRunningError>; + fn abort_all(&mut self) -> Result<(), NotRunningError>; /// *Note*: When aborting an invocation, and restarting it, the `invocation_epoch` MUST be bumped. diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index 69d47a4724..a8c3373ab2 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -622,6 +622,14 @@ impl LeaderState { .notify_stored_command_ack(invocation_id, command_index) .map_err(Error::Invoker)?; } + Action::AckStoredNotificationProposal { + invocation_id, + completion_id, + } => { + self.invoker_handle + .notify_stored_notification_proposal_ack(invocation_id, completion_id) + .map_err(Error::Invoker)?; + } Action::ForwardCompletion { invocation_id, entry_index, diff --git a/crates/worker/src/partition/state_machine/actions.rs b/crates/worker/src/partition/state_machine/actions.rs index 1e625e51d6..7e419918b3 100644 --- a/crates/worker/src/partition/state_machine/actions.rs +++ b/crates/worker/src/partition/state_machine/actions.rs @@ -18,7 +18,7 @@ use restate_types::invocation::client::{ CancelInvocationResponse, InvocationOutputResponse, KillInvocationResponse, PurgeInvocationResponse, RestartAsNewInvocationResponse, ResumeInvocationResponse, }; -use restate_types::journal_v2::{CommandIndex, NotificationId}; +use restate_types::journal_v2::{CommandIndex, CompletionId, NotificationId}; use restate_types::message::MessageIndex; use restate_types::time::MillisSinceEpoch; use restate_util_string::ReString; @@ -56,6 +56,10 @@ pub enum Action { invocation_id: InvocationId, command_index: CommandIndex, }, + AckStoredNotificationProposal { + invocation_id: InvocationId, + completion_id: CompletionId, + }, ForwardCompletion { invocation_id: InvocationId, entry_index: EntryIndex, diff --git a/crates/worker/src/partition/state_machine/entries/notification.rs b/crates/worker/src/partition/state_machine/entries/notification.rs index eae64e6520..8829cdf679 100644 --- a/crates/worker/src/partition/state_machine/entries/notification.rs +++ b/crates/worker/src/partition/state_machine/entries/notification.rs @@ -1,3 +1,4 @@ +use assert2::let_assert; use restate_storage_api::lock_table::WriteLockTable; // Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. // All rights reserved. @@ -11,14 +12,16 @@ use restate_storage_api::lock_table::WriteLockTable; use tracing::debug; +use crate::partition::state_machine::lifecycle::ResumeInvocationCommand; +use crate::partition::state_machine::{CommandHandler, Error, StateMachineApplyContext}; use restate_storage_api::invocation_status_table::InvocationStatus; use restate_storage_api::vqueue_table::{ReadVQueueTable, WriteVQueueTable}; use restate_types::identifiers::{EntryIndex, InvocationId}; -use restate_types::journal_v2::CANCEL_NOTIFICATION_ID; use restate_types::journal_v2::raw::RawNotification; - -use crate::partition::state_machine::lifecycle::ResumeInvocationCommand; -use crate::partition::state_machine::{CommandHandler, Error, StateMachineApplyContext}; +use restate_types::journal_v2::{ + CANCEL_NOTIFICATION_ID, CompletionType, NotificationId, NotificationType, +}; +use restate_types::service_protocol::ServiceProtocolVersion; pub(super) struct ApplyNotificationCommand<'e> { pub(super) invocation_id: InvocationId, @@ -51,9 +54,20 @@ where .await?; } } - InvocationStatus::Invoked(_) => { - // Just forward the notification if we're invoked - ctx.forward_notification(self.invocation_id, self.entry_index, self.entry.id()); + InvocationStatus::Invoked(in_flight_invocation_metadata) => { + if self.entry.ty() == NotificationType::Completion(CompletionType::Run) + && in_flight_invocation_metadata + .pinned_deployment + .as_ref() + .is_some_and(|pd| pd.service_protocol_version >= ServiceProtocolVersion::V7) + { + let_assert!(NotificationId::CompletionId(completion_id) = self.entry.id()); + ctx.forward_completion_ack(self.invocation_id, completion_id); + } else { + ctx.forward_notification(self.invocation_id, self.entry_index, self.entry.id()); + } + // if completion_id_to_ack is Some, the caller emits AckStoredNotificationProposal; + // no need to forward the full notification to the SDK. } InvocationStatus::Paused(_) => { // If we're paused, resume only if the notification was a cancellation signal. @@ -81,6 +95,8 @@ where #[cfg(test)] mod tests { + use super::*; + use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers}; use bytes::Bytes; use bytestring::ByteString; @@ -93,11 +109,11 @@ mod tests { use restate_types::invocation::{ InvocationTermination, NotifySignalRequest, TerminationFlavor, }; - use restate_types::journal_v2::EntryMetadata; use restate_types::journal_v2::{ - BuiltInSignal, CommandType, Entry, EntryType, Failure, FailureMetadata, NotificationId, + BuiltInSignal, CommandType, Entry, EntryType, Failure, FailureMetadata, RunCompletion, Signal, SignalId, SignalResult, SleepCommand, SleepCompletion, }; + use restate_types::journal_v2::{EntryMetadata, RunCommand, RunResult}; use restate_types::time::MillisSinceEpoch; use restate_types::{RESTATE_VERSION_1_6_0, SemanticRestateVersion}; use restate_wal_protocol::timer::TimerKeyValue; @@ -315,4 +331,100 @@ mod tests { test_env.shutdown().await; } + + #[restate_core::test] + async fn run_completion_proposal_protocol_v7_acks_instead_of_forwarding() { + let mut test_env = TestEnv::create().await; + let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; + fixtures::mock_pinned_deployment_v7(&mut test_env, invocation_id).await; + + let completion_id = 1; + // Command must precede notification + let _ = test_env + .apply(fixtures::invoker_entry_effect( + invocation_id, + RunCommand { + completion_id, + name: Default::default(), + }, + )) + .await; + + let actions = test_env + .apply(fixtures::invoker_entry_effect( + invocation_id, + RunCompletion { + completion_id, + result: RunResult::Success(Bytes::new()), + }, + )) + .await; + + // Protocol v7: compact ack sent back, NOT the full notification + assert_that!( + actions, + all!( + not(contains(matchers::actions::forward_notification( + invocation_id, + 2, + NotificationId::CompletionId(completion_id), + ))), + contains(matchers::actions::ack_stored_notification_proposal( + invocation_id, + completion_id, + )) + ) + ); + + test_env.shutdown().await; + } + + #[restate_core::test] + async fn run_completion_protocol_v5_forwards_notification() { + let mut test_env = TestEnv::create().await; + let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; + fixtures::mock_pinned_deployment_v5(&mut test_env, invocation_id).await; + + let completion_id = 1; + // Command must precede notification + let _ = test_env + .apply(fixtures::invoker_entry_effect( + invocation_id, + RunCommand { + completion_id, + name: Default::default(), + }, + )) + .await; + + let actions = test_env + .apply(fixtures::invoker_entry_effect( + invocation_id, + RunCompletion { + completion_id, + result: RunResult::Success(Bytes::new()), + }, + )) + .await; + + // Protocol <= v6: full notification forwarded, no ack + assert_that!( + actions, + all!( + contains(matchers::actions::forward_notification( + invocation_id, + 2, + NotificationId::CompletionId(completion_id), + )), + not(contains( + matchers::actions::ack_stored_notification_proposal( + invocation_id, + completion_id, + ) + )) + ) + ); + + test_env.shutdown().await; + } } diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 666e8a3e17..e520224294 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -454,6 +454,20 @@ impl StateMachineApplyContext<'_, S> { }); } + fn forward_completion_ack(&mut self, invocation_id: InvocationId, completion_id: CompletionId) { + debug_if_leader!( + self.is_leader, + restate.journal.completion.id = completion_id, + "Forward completion ack to deployment", + ); + + self.action_collector + .push(Action::AckStoredNotificationProposal { + invocation_id, + completion_id, + }); + } + fn send_abort_invocation_to_invoker(&mut self, invocation_id: InvocationId) { debug_if_leader!( self.is_leader, diff --git a/crates/worker/src/partition/state_machine/tests/matchers.rs b/crates/worker/src/partition/state_machine/tests/matchers.rs index f1034d04e5..2983224e96 100644 --- a/crates/worker/src/partition/state_machine/tests/matchers.rs +++ b/crates/worker/src/partition/state_machine/tests/matchers.rs @@ -215,6 +215,16 @@ pub mod actions { }) } + pub fn ack_stored_notification_proposal( + invocation_id: InvocationId, + completion_id: restate_types::journal_v2::CompletionId, + ) -> impl Matcher { + pat!(Action::AckStoredNotificationProposal { + invocation_id: eq(invocation_id), + completion_id: eq(completion_id), + }) + } + pub fn invocation_response_to_partition_processor( caller_invocation_id: InvocationId, caller_entry_index: EntryIndex, diff --git a/service-protocol/dev/restate/service/protocol.proto b/service-protocol/dev/restate/service/protocol.proto index 8851b4af6d..45670c17ab 100644 --- a/service-protocol/dev/restate/service/protocol.proto +++ b/service-protocol/dev/restate/service/protocol.proto @@ -44,6 +44,7 @@ enum ServiceProtocolVersion { // * WorkflowTarget.scope // * IdempotentRequestTarget.scope // * StartMessage.scope, StartMessage.limit_key and StartMessage.idempotency_key + // * Semantic changes to Run proposal response, introduced ProposeRunCompletionAckMessage // * ErrorMessage.should_pause V7 = 7; } @@ -215,6 +216,16 @@ message AwaitingOnMessage { bool executing_side_effects = 2; } +// Type: 0x0000 + 7 +// +// This is a message sent as response to ProposeRunCompletionMessage to acknowledge the proposal was correctly stored and replicated. +// Its ordering is considered to be "relative" to the ordering of notifications. +// +// This message will only ever be sent as response to ProposeRunCompletionMessage, and will be sent only in the PROCESSING phase of the protocol, never during REPLAY. +message ProposeRunCompletionAckMessage { + uint32 completion_id = 1; +} + // --- Commands and Notifications --- // The Journal is modelled as commands and notifications. From 5324dcbcecb0563ea503af950d18076f1210f6f1 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 14 May 2026 12:23:27 +0200 Subject: [PATCH 2/7] Name everything ending with -test-report there so the script in the e2e repo detects them --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 87e81cc154..931b32c2e6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -159,7 +159,7 @@ jobs: with: restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} serviceImage: "ghcr.io/restatedev/test-services-java:main" - testArtifactOutput: sdk-java-integration-test-journal-table-v2 + testArtifactOutput: sdk-java-integration-journal-table-v2-test-report envVars: | RESTATE_INTERNAL_FORCE_MIN_RESTATE_VERSION=1.6.0-dev @@ -178,7 +178,7 @@ jobs: with: restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} serviceImage: "ghcr.io/restatedev/test-services-java:main" - testArtifactOutput: sdk-java-integration-test-vqueues + testArtifactOutput: sdk-java-integration-vqueues-test-report envVars: | RESTATE_EXPERIMENTAL_ENABLE_VQUEUES=true # we don't expect all tests to pass, that's why we continue on error @@ -215,7 +215,7 @@ jobs: serviceImage: "ghcr.io/restatedev/test-services-python:main" envVars: | RESTATE_experimental_enable_protocol_v7=true - testArtifactOutput: sdk-python-protocol-v7 + testArtifactOutput: sdk-python-protocol-v7-test-report sdk-go: name: Run SDK-Go integration tests @@ -261,7 +261,7 @@ jobs: restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} envVars: | RESTATE_experimental_enable_protocol_v7=true - testArtifactOutput: sdk-typescript-protocol-v7 + testArtifactOutput: sdk-typescript-protocol-v7-test-report sdk-rust: name: Run SDK-Rust integration tests From ef38b37984b1100f8b26fa89518f51068abb33fc Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 18 May 2026 16:01:28 +0200 Subject: [PATCH 3/7] Revert approach based on PP knowing about protocol v7. We now do everything inside the invoker. --- crates/invoker-impl/src/input_command.rs | 19 +-- .../src/invocation_state_machine.rs | 126 +++++++++++++---- crates/invoker-impl/src/lib.rs | 15 -- crates/invoker-impl/src/test_util.rs | 10 +- crates/types/src/journal_v2/notification.rs | 2 +- crates/worker-api/src/invoker/effects.rs | 5 +- crates/worker-api/src/invoker/handle.rs | 8 +- .../src/partition/leadership/leader_state.rs | 8 -- .../src/partition/state_machine/actions.rs | 6 +- .../state_machine/entries/notification.rs | 130 ++---------------- .../worker/src/partition/state_machine/mod.rs | 14 -- .../partition/state_machine/tests/matchers.rs | 10 -- 12 files changed, 114 insertions(+), 239 deletions(-) diff --git a/crates/invoker-impl/src/input_command.rs b/crates/invoker-impl/src/input_command.rs index 0a35dffcf7..af26344596 100644 --- a/crates/invoker-impl/src/input_command.rs +++ b/crates/invoker-impl/src/input_command.rs @@ -14,7 +14,7 @@ use restate_errors::NotRunningError; use restate_types::LimitKey; use restate_types::identifiers::{EntryIndex, InvocationId}; use restate_types::invocation::InvocationTarget; -use restate_types::journal_v2::{CommandIndex, CompletionId, NotificationId}; +use restate_types::journal_v2::{CommandIndex, NotificationId}; use restate_types::sharding::KeyRange; use restate_types::vqueues::VQueueId; use restate_util_string::ReString; @@ -58,10 +58,6 @@ pub(crate) enum InputCommand { invocation_id: InvocationId, command_index: CommandIndex, }, - StoredNotificationProposalAck { - invocation_id: InvocationId, - completion_id: CompletionId, - }, /// Abort specific invocation id Abort { @@ -165,19 +161,6 @@ impl restate_worker_api::invoker::InvokerHandle for InvokerHandle { .map_err(|_| NotRunningError) } - fn notify_stored_notification_proposal_ack( - &mut self, - invocation_id: InvocationId, - completion_id: CompletionId, - ) -> Result<(), NotRunningError> { - self.input - .send(InputCommand::StoredNotificationProposalAck { - invocation_id, - completion_id, - }) - .map_err(|_| NotRunningError) - } - fn abort_all(&mut self) -> Result<(), NotRunningError> { self.input .send(InputCommand::AbortAll) diff --git a/crates/invoker-impl/src/invocation_state_machine.rs b/crates/invoker-impl/src/invocation_state_machine.rs index 9f71b59b40..cfe87f6d8c 100644 --- a/crates/invoker-impl/src/invocation_state_machine.rs +++ b/crates/invoker-impl/src/invocation_state_machine.rs @@ -18,6 +18,7 @@ use restate_memory::LocalMemoryPool; use restate_types::identifiers::EntryIndex; use restate_types::retries; use restate_types::schema::invocation_target::OnMaxAttempts; +use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::vqueues::VQueueId; use restate_worker_api::resources::ReservedResources; @@ -147,6 +148,12 @@ enum AttemptState { // Acks that should be propagated back to the SDK command_acks_to_propagate: HashSet, + // Run completions the SDK proposed during this attempt. When the partition + // processor echoes the stored notification back via [`Self::notify_entry`], + // we swap [`Notification::Entry`] for [`Notification::ProposeRunCompletionAck`] + // so the SDK gets the compact ack message on the wire (protocol >= v7). + run_completion_proposals_to_ack: HashSet, + // Deployment being used during this attempt using_deployment: Option, // If true, we need to notify the deployment id to the partition processor @@ -253,6 +260,7 @@ impl InvocationStateMachine { journal_tracker: Default::default(), abort_handle, command_acks_to_propagate: Default::default(), + run_completion_proposals_to_ack: Default::default(), using_deployment: None, should_notify_pinned_deployment: false, }; @@ -379,9 +387,22 @@ impl InvocationStateMachine { )); if let AttemptState::InFlight { - journal_tracker, .. + journal_tracker, + run_completion_proposals_to_ack, + using_deployment, + .. } = &mut self.invocation_state { + // We track the run completion proposals to ack only if we're using protocol v7 + if let Some(pinned_deployment) = using_deployment + && pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V7 + { + run_completion_proposals_to_ack.insert( + *notification_id + .try_as_completion_id_ref() + .expect("run proposals notification id must be completion id."), + ); + } journal_tracker.notify_notification_proposed_to_partition_processor(notification_id); } } @@ -411,32 +432,6 @@ impl InvocationStateMachine { } } - pub(super) fn notify_stored_notification_proposal_ack(&mut self, completion_id: CompletionId) { - match &mut self.invocation_state { - AttemptState::InFlight { - journal_tracker, - notifications_tx, - .. - } => { - journal_tracker.notify_acked_notification_from_partition_processor( - NotificationId::CompletionId(completion_id), - ); - Self::try_send_notification( - notifications_tx, - Notification::ProposeRunCompletionAck(completion_id), - ); - } - AttemptState::WaitingRetry { - journal_tracker, .. - } => { - journal_tracker.notify_acked_notification_from_partition_processor( - NotificationId::CompletionId(completion_id), - ); - } - _ => {} - } - } - pub(super) fn notify_completion(&mut self, entry_index: EntryIndex) { if let AttemptState::InFlight { notifications_tx, .. @@ -455,11 +450,20 @@ impl InvocationStateMachine { AttemptState::InFlight { journal_tracker, notifications_tx, + run_completion_proposals_to_ack, .. } => { + let to_send = match ¬ification_id { + // We send RunCompletionAck only if we're tracking this specific run completion. + NotificationId::CompletionId(c) + if run_completion_proposals_to_ack.remove(c) => + { + Notification::ProposeRunCompletionAck(*c) + } + _ => Notification::Entry(entry_index), + }; journal_tracker.notify_acked_notification_from_partition_processor(notification_id); - - Self::try_send_notification(notifications_tx, Notification::Entry(entry_index)); + Self::try_send_notification(notifications_tx, to_send); } AttemptState::WaitingRetry { journal_tracker, .. @@ -764,6 +768,70 @@ mod tests { assert_that!(try_recv, err(eq(TryRecvError::Empty))); } + fn start_with_protocol( + ism: &mut InvocationStateMachine, + version: ServiceProtocolVersion, + ) -> mpsc::UnboundedReceiver { + let abort_handle = tokio::spawn(async {}).abort_handle(); + let (tx, rx) = mpsc::unbounded_channel(); + ism.start(abort_handle, tx); + ism.notify_pinned_deployment( + PinnedDeployment::new(DeploymentId::default(), version), + true, + ); + rx + } + + #[test(tokio::test)] + async fn notify_entry_swaps_proposed_run_completion_to_ack_on_v7() { + let mut ism = create_test_invocation_state_machine(); + let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7); + + // Track a proposal for CompletionId 5 + ism.notify_new_notification_proposal(NotificationId::CompletionId(5)); + + // PP echoes back the stored notification — the ISM must swap to the ack + ism.notify_entry(7, NotificationId::CompletionId(5)); + assert_that!( + rx.recv().await, + some(pat!(Notification::ProposeRunCompletionAck(eq(5)))) + ); + + // The proposal was consumed: a second notify_entry for the same id falls + // back to the regular Entry path. + ism.notify_entry(7, NotificationId::CompletionId(5)); + assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(7))))); + } + + #[test(tokio::test)] + async fn notify_entry_does_not_swap_on_v6_old_path_preserved() { + let mut ism = create_test_invocation_state_machine(); + let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V6); + + // Proposal is recorded in the journal tracker (for retry safety) but NOT + // tracked for swapping, because the deployment is on protocol v6. + ism.notify_new_notification_proposal(NotificationId::CompletionId(5)); + + // PP echoes back the stored notification — the ISM forwards the full Entry, + // exactly like before protocol v7 existed. + ism.notify_entry(7, NotificationId::CompletionId(5)); + assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(7))))); + } + + #[test(tokio::test)] + async fn notify_entry_on_v7_without_proposal_falls_through_to_entry() { + let mut ism = create_test_invocation_state_machine(); + let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7); + + // No prior proposal: a completion notification flows through as Entry. + ism.notify_entry(3, NotificationId::CompletionId(5)); + assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(3))))); + + // Signals are also never swapped — only completion ids tracked at propose time qualify. + ism.notify_entry(4, NotificationId::SignalIndex(17)); + assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(4))))); + } + #[test(tokio::test)] async fn handle_error_counts_attempts_on_same_entry() { let mut invocation_state_machine = create_test_invocation_state_machine(); diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 86e5ed697f..6455e12d33 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -525,9 +525,6 @@ where InputCommand::StoredCommandAck { invocation_id, command_index } => { self.handle_stored_command_ack(options, invocation_id, command_index); } - InputCommand::StoredNotificationProposalAck { invocation_id, completion_id } => { - self.handle_stored_notification_proposal_ack(options, invocation_id, completion_id); - } } }, Some(invoke_input_command) = segmented_input_queue.next(), if !segmented_input_queue.inner().is_empty() && self.quota.is_slot_available() && self.pending_memory_lease.is_some() => { @@ -765,18 +762,6 @@ where }); } - fn handle_stored_notification_proposal_ack( - &mut self, - options: &InvokerOptions, - invocation_id: InvocationId, - completion_id: CompletionId, - ) { - trace!("Received a new stored notification proposal acknowledgement"); - self.handle_retry_event(options, invocation_id, |sm| { - sm.notify_stored_notification_proposal_ack(completion_id) - }); - } - #[instrument( level = "trace", skip_all, diff --git a/crates/invoker-impl/src/test_util.rs b/crates/invoker-impl/src/test_util.rs index fc4c9ebfc1..640e74da8f 100644 --- a/crates/invoker-impl/src/test_util.rs +++ b/crates/invoker-impl/src/test_util.rs @@ -22,7 +22,7 @@ use restate_types::journal::enriched::{ AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader, EnrichedRawEntry, }; use restate_types::journal::raw::{PlainEntryHeader, PlainRawEntry, RawEntry}; -use restate_types::journal_v2::{CommandIndex, CompletionId}; +use restate_types::journal_v2::CommandIndex; use restate_types::time::MillisSinceEpoch; use restate_types::vqueues::VQueueId; use restate_util_string::ReString; @@ -175,14 +175,6 @@ impl InvokerHandle for MockInvokerHandle { Ok(()) } - fn notify_stored_notification_proposal_ack( - &mut self, - _invocation_id: InvocationId, - _completion_id: CompletionId, - ) -> Result<(), NotRunningError> { - Ok(()) - } - fn abort_all(&mut self) -> Result<(), NotRunningError> { Ok(()) } diff --git a/crates/types/src/journal_v2/notification.rs b/crates/types/src/journal_v2/notification.rs index 5789d20043..79ee0e2ff1 100644 --- a/crates/types/src/journal_v2/notification.rs +++ b/crates/types/src/journal_v2/notification.rs @@ -21,7 +21,7 @@ use crate::journal_v2::{ }; /// See [`Notification`]. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::EnumTryAs)] pub enum NotificationId { CompletionId(CompletionId), SignalIndex(SignalIndex), diff --git a/crates/worker-api/src/invoker/effects.rs b/crates/worker-api/src/invoker/effects.rs index dda06fd6b8..c15e17ede2 100644 --- a/crates/worker-api/src/invoker/effects.rs +++ b/crates/worker-api/src/invoker/effects.rs @@ -18,8 +18,9 @@ use restate_types::identifiers::InvocationId; use restate_types::journal::EntryIndex; use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal_events::raw::RawEvent; +use restate_types::journal_v2::CommandIndex; use restate_types::journal_v2::raw::RawEntry; -use restate_types::journal_v2::{self, CommandIndex, UnresolvedFuture}; +use restate_types::journal_v2::{self, UnresolvedFuture}; use restate_types::storage::{StoredRawEntry, StoredRawEntryHeader}; use restate_types::time::MillisSinceEpoch; @@ -70,7 +71,7 @@ pub enum EffectKind { // Introduced in Restate v1.7. With the new service-protocol v7 SuspendedV3 { /// Future tree describing the notifications this invocation is waiting on. - /// Introduced in Restate v1.7 (protocol version V7). + /// Introduced in Restate v1.7 (protocol version V7). `None` for older invocations. awaiting_on: UnresolvedFuture, }, Paused { diff --git a/crates/worker-api/src/invoker/handle.rs b/crates/worker-api/src/invoker/handle.rs index 8c73863a4a..30a6d0dc3a 100644 --- a/crates/worker-api/src/invoker/handle.rs +++ b/crates/worker-api/src/invoker/handle.rs @@ -13,7 +13,7 @@ use restate_errors::NotRunningError; use restate_types::LimitKey; use restate_types::identifiers::{EntryIndex, InvocationId}; use restate_types::invocation::InvocationTarget; -use restate_types::journal_v2::{CommandIndex, CompletionId, NotificationId}; +use restate_types::journal_v2::{CommandIndex, NotificationId}; use restate_types::vqueues::VQueueId; use restate_util_string::ReString; @@ -57,12 +57,6 @@ pub trait InvokerHandle { command_index: CommandIndex, ) -> Result<(), NotRunningError>; - fn notify_stored_notification_proposal_ack( - &mut self, - invocation_id: InvocationId, - completion_id: CompletionId, - ) -> Result<(), NotRunningError>; - fn abort_all(&mut self) -> Result<(), NotRunningError>; /// *Note*: When aborting an invocation, and restarting it, the `invocation_epoch` MUST be bumped. diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index a8c3373ab2..69d47a4724 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -622,14 +622,6 @@ impl LeaderState { .notify_stored_command_ack(invocation_id, command_index) .map_err(Error::Invoker)?; } - Action::AckStoredNotificationProposal { - invocation_id, - completion_id, - } => { - self.invoker_handle - .notify_stored_notification_proposal_ack(invocation_id, completion_id) - .map_err(Error::Invoker)?; - } Action::ForwardCompletion { invocation_id, entry_index, diff --git a/crates/worker/src/partition/state_machine/actions.rs b/crates/worker/src/partition/state_machine/actions.rs index 7e419918b3..1e625e51d6 100644 --- a/crates/worker/src/partition/state_machine/actions.rs +++ b/crates/worker/src/partition/state_machine/actions.rs @@ -18,7 +18,7 @@ use restate_types::invocation::client::{ CancelInvocationResponse, InvocationOutputResponse, KillInvocationResponse, PurgeInvocationResponse, RestartAsNewInvocationResponse, ResumeInvocationResponse, }; -use restate_types::journal_v2::{CommandIndex, CompletionId, NotificationId}; +use restate_types::journal_v2::{CommandIndex, NotificationId}; use restate_types::message::MessageIndex; use restate_types::time::MillisSinceEpoch; use restate_util_string::ReString; @@ -56,10 +56,6 @@ pub enum Action { invocation_id: InvocationId, command_index: CommandIndex, }, - AckStoredNotificationProposal { - invocation_id: InvocationId, - completion_id: CompletionId, - }, ForwardCompletion { invocation_id: InvocationId, entry_index: EntryIndex, diff --git a/crates/worker/src/partition/state_machine/entries/notification.rs b/crates/worker/src/partition/state_machine/entries/notification.rs index 8829cdf679..eae64e6520 100644 --- a/crates/worker/src/partition/state_machine/entries/notification.rs +++ b/crates/worker/src/partition/state_machine/entries/notification.rs @@ -1,4 +1,3 @@ -use assert2::let_assert; use restate_storage_api::lock_table::WriteLockTable; // Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. // All rights reserved. @@ -12,16 +11,14 @@ use restate_storage_api::lock_table::WriteLockTable; use tracing::debug; -use crate::partition::state_machine::lifecycle::ResumeInvocationCommand; -use crate::partition::state_machine::{CommandHandler, Error, StateMachineApplyContext}; use restate_storage_api::invocation_status_table::InvocationStatus; use restate_storage_api::vqueue_table::{ReadVQueueTable, WriteVQueueTable}; use restate_types::identifiers::{EntryIndex, InvocationId}; +use restate_types::journal_v2::CANCEL_NOTIFICATION_ID; use restate_types::journal_v2::raw::RawNotification; -use restate_types::journal_v2::{ - CANCEL_NOTIFICATION_ID, CompletionType, NotificationId, NotificationType, -}; -use restate_types::service_protocol::ServiceProtocolVersion; + +use crate::partition::state_machine::lifecycle::ResumeInvocationCommand; +use crate::partition::state_machine::{CommandHandler, Error, StateMachineApplyContext}; pub(super) struct ApplyNotificationCommand<'e> { pub(super) invocation_id: InvocationId, @@ -54,20 +51,9 @@ where .await?; } } - InvocationStatus::Invoked(in_flight_invocation_metadata) => { - if self.entry.ty() == NotificationType::Completion(CompletionType::Run) - && in_flight_invocation_metadata - .pinned_deployment - .as_ref() - .is_some_and(|pd| pd.service_protocol_version >= ServiceProtocolVersion::V7) - { - let_assert!(NotificationId::CompletionId(completion_id) = self.entry.id()); - ctx.forward_completion_ack(self.invocation_id, completion_id); - } else { - ctx.forward_notification(self.invocation_id, self.entry_index, self.entry.id()); - } - // if completion_id_to_ack is Some, the caller emits AckStoredNotificationProposal; - // no need to forward the full notification to the SDK. + InvocationStatus::Invoked(_) => { + // Just forward the notification if we're invoked + ctx.forward_notification(self.invocation_id, self.entry_index, self.entry.id()); } InvocationStatus::Paused(_) => { // If we're paused, resume only if the notification was a cancellation signal. @@ -95,8 +81,6 @@ where #[cfg(test)] mod tests { - use super::*; - use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers}; use bytes::Bytes; use bytestring::ByteString; @@ -109,11 +93,11 @@ mod tests { use restate_types::invocation::{ InvocationTermination, NotifySignalRequest, TerminationFlavor, }; + use restate_types::journal_v2::EntryMetadata; use restate_types::journal_v2::{ - BuiltInSignal, CommandType, Entry, EntryType, Failure, FailureMetadata, RunCompletion, + BuiltInSignal, CommandType, Entry, EntryType, Failure, FailureMetadata, NotificationId, Signal, SignalId, SignalResult, SleepCommand, SleepCompletion, }; - use restate_types::journal_v2::{EntryMetadata, RunCommand, RunResult}; use restate_types::time::MillisSinceEpoch; use restate_types::{RESTATE_VERSION_1_6_0, SemanticRestateVersion}; use restate_wal_protocol::timer::TimerKeyValue; @@ -331,100 +315,4 @@ mod tests { test_env.shutdown().await; } - - #[restate_core::test] - async fn run_completion_proposal_protocol_v7_acks_instead_of_forwarding() { - let mut test_env = TestEnv::create().await; - let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; - fixtures::mock_pinned_deployment_v7(&mut test_env, invocation_id).await; - - let completion_id = 1; - // Command must precede notification - let _ = test_env - .apply(fixtures::invoker_entry_effect( - invocation_id, - RunCommand { - completion_id, - name: Default::default(), - }, - )) - .await; - - let actions = test_env - .apply(fixtures::invoker_entry_effect( - invocation_id, - RunCompletion { - completion_id, - result: RunResult::Success(Bytes::new()), - }, - )) - .await; - - // Protocol v7: compact ack sent back, NOT the full notification - assert_that!( - actions, - all!( - not(contains(matchers::actions::forward_notification( - invocation_id, - 2, - NotificationId::CompletionId(completion_id), - ))), - contains(matchers::actions::ack_stored_notification_proposal( - invocation_id, - completion_id, - )) - ) - ); - - test_env.shutdown().await; - } - - #[restate_core::test] - async fn run_completion_protocol_v5_forwards_notification() { - let mut test_env = TestEnv::create().await; - let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; - fixtures::mock_pinned_deployment_v5(&mut test_env, invocation_id).await; - - let completion_id = 1; - // Command must precede notification - let _ = test_env - .apply(fixtures::invoker_entry_effect( - invocation_id, - RunCommand { - completion_id, - name: Default::default(), - }, - )) - .await; - - let actions = test_env - .apply(fixtures::invoker_entry_effect( - invocation_id, - RunCompletion { - completion_id, - result: RunResult::Success(Bytes::new()), - }, - )) - .await; - - // Protocol <= v6: full notification forwarded, no ack - assert_that!( - actions, - all!( - contains(matchers::actions::forward_notification( - invocation_id, - 2, - NotificationId::CompletionId(completion_id), - )), - not(contains( - matchers::actions::ack_stored_notification_proposal( - invocation_id, - completion_id, - ) - )) - ) - ); - - test_env.shutdown().await; - } } diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index e520224294..666e8a3e17 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -454,20 +454,6 @@ impl StateMachineApplyContext<'_, S> { }); } - fn forward_completion_ack(&mut self, invocation_id: InvocationId, completion_id: CompletionId) { - debug_if_leader!( - self.is_leader, - restate.journal.completion.id = completion_id, - "Forward completion ack to deployment", - ); - - self.action_collector - .push(Action::AckStoredNotificationProposal { - invocation_id, - completion_id, - }); - } - fn send_abort_invocation_to_invoker(&mut self, invocation_id: InvocationId) { debug_if_leader!( self.is_leader, diff --git a/crates/worker/src/partition/state_machine/tests/matchers.rs b/crates/worker/src/partition/state_machine/tests/matchers.rs index 2983224e96..f1034d04e5 100644 --- a/crates/worker/src/partition/state_machine/tests/matchers.rs +++ b/crates/worker/src/partition/state_machine/tests/matchers.rs @@ -215,16 +215,6 @@ pub mod actions { }) } - pub fn ack_stored_notification_proposal( - invocation_id: InvocationId, - completion_id: restate_types::journal_v2::CompletionId, - ) -> impl Matcher { - pat!(Action::AckStoredNotificationProposal { - invocation_id: eq(invocation_id), - completion_id: eq(completion_id), - }) - } - pub fn invocation_response_to_partition_processor( caller_invocation_id: InvocationId, caller_entry_index: EntryIndex, From bbd9c500b93122668b6bd07f29653ec7bd317c8b Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 19 May 2026 13:46:47 +0200 Subject: [PATCH 4/7] Feedback --- .../src/invocation_state_machine.rs | 50 ++++++++++++++----- crates/invoker-impl/src/lib.rs | 10 ++-- 2 files changed, 45 insertions(+), 15 deletions(-) diff --git a/crates/invoker-impl/src/invocation_state_machine.rs b/crates/invoker-impl/src/invocation_state_machine.rs index cfe87f6d8c..04a17535da 100644 --- a/crates/invoker-impl/src/invocation_state_machine.rs +++ b/crates/invoker-impl/src/invocation_state_machine.rs @@ -380,12 +380,29 @@ impl InvocationStateMachine { } } - pub(super) fn notify_new_notification_proposal(&mut self, notification_id: NotificationId) { + pub(super) fn notify_new_notification_proposal( + &mut self, + notification_type: NotificationType, + notification_id: NotificationId, + ) { debug_assert!(matches!( &self.invocation_state, AttemptState::InFlight { .. } )); + // The only notification proposal currently defined in the protocol is + // ProposeRunCompletionMessage. We assert the invariant explicitly so that + // if/when new proposal-like messages are added, this code is forced to be + // revisited rather than silently mistreating them as run completions. + assert_eq!( + notification_type, + NotificationType::Completion(CompletionType::Run), + "the only notification proposal currently defined in the protocol is ProposeRunCompletionMessage", + ); + let completion_id = *notification_id + .try_as_completion_id_ref() + .expect("RunCompletion notification id must be a CompletionId"); + if let AttemptState::InFlight { journal_tracker, run_completion_proposals_to_ack, @@ -397,11 +414,7 @@ impl InvocationStateMachine { if let Some(pinned_deployment) = using_deployment && pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V7 { - run_completion_proposals_to_ack.insert( - *notification_id - .try_as_completion_id_ref() - .expect("run proposals notification id must be completion id."), - ); + run_completion_proposals_to_ack.insert(completion_id); } journal_tracker.notify_notification_proposed_to_partition_processor(notification_id); } @@ -788,7 +801,10 @@ mod tests { let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7); // Track a proposal for CompletionId 5 - ism.notify_new_notification_proposal(NotificationId::CompletionId(5)); + ism.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(5), + ); // PP echoes back the stored notification — the ISM must swap to the ack ism.notify_entry(7, NotificationId::CompletionId(5)); @@ -810,7 +826,10 @@ mod tests { // Proposal is recorded in the journal tracker (for retry safety) but NOT // tracked for swapping, because the deployment is on protocol v6. - ism.notify_new_notification_proposal(NotificationId::CompletionId(5)); + ism.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(5), + ); // PP echoes back the stored notification — the ISM forwards the full Entry, // exactly like before protocol v7 existed. @@ -922,8 +941,15 @@ mod tests { let (tx, _rx) = mpsc::unbounded_channel(); invocation_state_machine.start(abort_handle, tx); - invocation_state_machine.notify_new_notification_proposal(NotificationId::SignalIndex(18)); - invocation_state_machine.notify_new_notification_proposal(NotificationId::CompletionId(1)); + // Only RunCompletion notifications are valid proposals today; the ISM asserts this. + invocation_state_machine.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(18), + ); + invocation_state_machine.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(1), + ); let_assert!( OnTaskError::Retrying(_) = invocation_state_machine.handle_task_error(true, None, false, true, |_| 0) @@ -933,8 +959,8 @@ mod tests { assert!(!invocation_state_machine.is_ready_to_retry()); assert!(let AttemptState::WaitingRetry { .. } = invocation_state_machine.invocation_state); - // Got signal 18 - invocation_state_machine.notify_entry(0, NotificationId::SignalIndex(18)); + // Got completion 18 + invocation_state_machine.notify_entry(0, NotificationId::CompletionId(18)); // Retry timer fired invocation_state_machine.notify_retry_timer_fired(0); diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 6455e12d33..480bedc4f3 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -54,7 +54,8 @@ use restate_types::journal_events::raw::RawEvent; use restate_types::journal_events::{Event, PausedEvent, TransientErrorEvent}; use restate_types::journal_v2::raw::{RawCommand, RawNotification}; use restate_types::journal_v2::{ - CommandIndex, CompletionId, EntryMetadata, NotificationId, UnresolvedFuture, + CommandIndex, CompletionId, CompletionType, EntryMetadata, NotificationId, NotificationType, + UnresolvedFuture, }; use restate_types::live::{Live, LiveLoad}; use restate_types::schema::deployment::DeploymentResolver; @@ -891,7 +892,7 @@ where .invocation_state_machine_manager .resolve_invocation(&invocation_id) { - ism.notify_new_notification_proposal(notification.id()); + ism.notify_new_notification_proposal(notification.ty(), notification.id()); trace!( restate.invocation.target = %ism.invocation_target, "Received a new notification. Invocation state: {:?}", @@ -2431,7 +2432,10 @@ mod tests { ism.start(tokio::spawn(async {}).abort_handle(), tx); // Add a notification proposal - ism.notify_new_notification_proposal(NotificationId::CompletionId(1)); + ism.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(1), + ); // Register the ISM and use handle_invocation_task_failed to put it in WaitingRetry state. // This will register the timer in the real DelayQueue. From df68db3ad49b74862dcde331fd30b64698a553e1 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 19 May 2026 14:03:53 +0200 Subject: [PATCH 5/7] Feedback --- .../src/invocation_state_machine.rs | 7 ++++++- .../dev/restate/service/protocol.proto | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/crates/invoker-impl/src/invocation_state_machine.rs b/crates/invoker-impl/src/invocation_state_machine.rs index 04a17535da..dbf06d97ef 100644 --- a/crates/invoker-impl/src/invocation_state_machine.rs +++ b/crates/invoker-impl/src/invocation_state_machine.rs @@ -151,7 +151,12 @@ enum AttemptState { // Run completions the SDK proposed during this attempt. When the partition // processor echoes the stored notification back via [`Self::notify_entry`], // we swap [`Notification::Entry`] for [`Notification::ProposeRunCompletionAck`] - // so the SDK gets the compact ack message on the wire (protocol >= v7). + // so the SDK gets the ack message on the wire (only protocol >= v7). + // + // The SDK will replace the ack message with the full notification, kept around locally. + // + // This mechanism is used only in PROCESSING, for run completions proposed during the current attempt, + // and not when the invocation is REPLAYING. run_completion_proposals_to_ack: HashSet, // Deployment being used during this attempt diff --git a/service-protocol/dev/restate/service/protocol.proto b/service-protocol/dev/restate/service/protocol.proto index 45670c17ab..7a03c5fe4a 100644 --- a/service-protocol/dev/restate/service/protocol.proto +++ b/service-protocol/dev/restate/service/protocol.proto @@ -219,7 +219,24 @@ message AwaitingOnMessage { // Type: 0x0000 + 7 // // This is a message sent as response to ProposeRunCompletionMessage to acknowledge the proposal was correctly stored and replicated. +// // Its ordering is considered to be "relative" to the ordering of notifications. +// In other words, the SDK expects that on replay the RunCompletionMessage, +// to which the ProposeRunCompletionAckMessage relates to, +// is sent in the same order relative to the other notifications. +// +// For example, if when PROCESSING the first time the SDK sees: +// +// -> SleepCompletionMessage id = 1 +// -> ProposeRunCompletionAckMessage id = 3 +// -> SleepCompletionMessage id = 2 +// +// Then on the next replay, the SDK expects to see notifications in the following order: +// +// -> SleepCompletionMessage id = 1 +// -> RunCompletionMessage id = 3 (the actual completion here replaces the ack) +// -> SleepCompletionMessage id = 2 + // // This message will only ever be sent as response to ProposeRunCompletionMessage, and will be sent only in the PROCESSING phase of the protocol, never during REPLAY. message ProposeRunCompletionAckMessage { From 209221eb3b90a0a4a9d49a2149b42712130c5f92 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 19 May 2026 16:01:12 +0200 Subject: [PATCH 6/7] Make the ack system opt-in through the existing flag --- .../src/invocation_state_machine.rs | 39 +++++++++++++++++-- .../invoker-impl/src/invocation_task/mod.rs | 5 +++ .../service_protocol_runner_v4.rs | 3 ++ crates/invoker-impl/src/lib.rs | 13 +++++-- .../src/message_codec/mod.rs | 15 ++++--- 5 files changed, 62 insertions(+), 13 deletions(-) diff --git a/crates/invoker-impl/src/invocation_state_machine.rs b/crates/invoker-impl/src/invocation_state_machine.rs index dbf06d97ef..d30b2b9b6c 100644 --- a/crates/invoker-impl/src/invocation_state_machine.rs +++ b/crates/invoker-impl/src/invocation_state_machine.rs @@ -389,6 +389,7 @@ impl InvocationStateMachine { &mut self, notification_type: NotificationType, notification_id: NotificationId, + requires_ack: bool, ) { debug_assert!(matches!( &self.invocation_state, @@ -415,8 +416,13 @@ impl InvocationStateMachine { .. } = &mut self.invocation_state { - // We track the run completion proposals to ack only if we're using protocol v7 - if let Some(pinned_deployment) = using_deployment + // We track the run completion proposal to ack only if the SDK asked for + // an ack (header flag) AND the negotiated protocol supports it (>= v7). + // If either condition is missing, the proposal flows through the normal + // `notify_entry` → `Notification::Entry` path and the SDK receives the + // full `RunCompletionNotificationMessage` like on older protocols. + if requires_ack + && let Some(pinned_deployment) = using_deployment && pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V7 { run_completion_proposals_to_ack.insert(completion_id); @@ -805,10 +811,11 @@ mod tests { let mut ism = create_test_invocation_state_machine(); let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7); - // Track a proposal for CompletionId 5 + // Track a proposal for CompletionId 5 with requires_ack=true ism.notify_new_notification_proposal( NotificationType::Completion(CompletionType::Run), NotificationId::CompletionId(5), + true, ); // PP echoes back the stored notification — the ISM must swap to the ack @@ -830,10 +837,12 @@ mod tests { let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V6); // Proposal is recorded in the journal tracker (for retry safety) but NOT - // tracked for swapping, because the deployment is on protocol v6. + // tracked for swapping, because the deployment is on protocol v6 — even + // if the SDK had set requires_ack, the runtime caps the behaviour at v7. ism.notify_new_notification_proposal( NotificationType::Completion(CompletionType::Run), NotificationId::CompletionId(5), + true, ); // PP echoes back the stored notification — the ISM forwards the full Entry, @@ -842,6 +851,24 @@ mod tests { assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(7))))); } + #[test(tokio::test)] + async fn notify_entry_on_v7_without_requires_ack_falls_through_to_entry() { + let mut ism = create_test_invocation_state_machine(); + let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7); + + // V7 deployment but SDK did NOT set the requires_ack header flag — the + // proposal is tracked in the journal tracker for retry safety, but no + // swap happens, and the SDK gets the full notification back. + ism.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(5), + false, + ); + + ism.notify_entry(7, NotificationId::CompletionId(5)); + assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(7))))); + } + #[test(tokio::test)] async fn notify_entry_on_v7_without_proposal_falls_through_to_entry() { let mut ism = create_test_invocation_state_machine(); @@ -947,13 +974,17 @@ mod tests { invocation_state_machine.start(abort_handle, tx); // Only RunCompletion notifications are valid proposals today; the ISM asserts this. + // requires_ack=false because this test is about journal-tracker accounting for + // retry safety, not about the v7 ack swap. invocation_state_machine.notify_new_notification_proposal( NotificationType::Completion(CompletionType::Run), NotificationId::CompletionId(18), + false, ); invocation_state_machine.notify_new_notification_proposal( NotificationType::Completion(CompletionType::Run), NotificationId::CompletionId(1), + false, ); let_assert!( OnTaskError::Retrying(_) = diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index 08a6af335d..6b579abae1 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -179,6 +179,11 @@ pub(super) enum InvocationTaskOutputInner { }, NewNotificationProposal { notification: RawNotification, + /// If true, the SDK requested to be notified when the proposed notification + /// is durably stored — the runtime will send back a compact + /// `ProposeRunCompletionAckMessage` instead of forwarding the full + /// notification. Only honoured on protocol >= v7. + requires_ack: bool, }, AwaitingOn { unresolved_future: UnresolvedFuture, diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index 5f26837dbf..68267cfdea 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -979,6 +979,9 @@ where self.invocation_task.send_invoker_tx( InvocationTaskOutputInner::NewNotificationProposal { notification: raw_notification, + requires_ack: mh + .requires_ack() + .expect("ProposeRunCompletion message supports requires_ack"), }, ); diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 480bedc4f3..5769bf8548 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -563,10 +563,11 @@ where requires_ack ).await }, - InvocationTaskOutputInner::NewNotificationProposal { notification } => { + InvocationTaskOutputInner::NewNotificationProposal { notification, requires_ack } => { self.handle_new_notification_proposal( invocation_id, - notification + notification, + requires_ack, ).await }, InvocationTaskOutputInner::AwaitingOn { unresolved_future } => { @@ -887,12 +888,17 @@ where &mut self, invocation_id: InvocationId, notification: RawNotification, + requires_ack: bool, ) { if let Some((output_tx, ism)) = self .invocation_state_machine_manager .resolve_invocation(&invocation_id) { - ism.notify_new_notification_proposal(notification.ty(), notification.id()); + ism.notify_new_notification_proposal( + notification.ty(), + notification.id(), + requires_ack, + ); trace!( restate.invocation.target = %ism.invocation_target, "Received a new notification. Invocation state: {:?}", @@ -2435,6 +2441,7 @@ mod tests { ism.notify_new_notification_proposal( NotificationType::Completion(CompletionType::Run), NotificationId::CompletionId(1), + false, ); // Register the ISM and use handle_invocation_task_failed to put it in WaitingRetry state. diff --git a/crates/service-protocol-v4/src/message_codec/mod.rs b/crates/service-protocol-v4/src/message_codec/mod.rs index 23997665d5..8d4df018bd 100644 --- a/crates/service-protocol-v4/src/message_codec/mod.rs +++ b/crates/service-protocol-v4/src/message_codec/mod.rs @@ -55,7 +55,7 @@ macro_rules! gen_message { Custom(u16, bytes::Bytes) } }; - (@gen_message_enum [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { + (@gen_message_enum [$variant:ident Control $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { paste::paste! { gen_message!(@gen_message_enum [$($tail)*] -> [$variant(proto::[< $variant Message >]), $($body)*]); } }; (@gen_message_enum [$variant:ident $ty:ident noparse $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { @@ -75,7 +75,7 @@ macro_rules! gen_message { } } }; - (@gen_message_enum_encoded_len [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { + (@gen_message_enum_encoded_len [$variant:ident Control $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { paste::paste! { gen_message!(@gen_message_enum_encoded_len [$($tail)*] -> [(Message::$variant(msg), service_protocol_version) => ] as $crate::message_codec::encoding::ServiceWireEncoder>::encoded_len(msg, service_protocol_version), $($body)*]); } }; (@gen_message_enum_encoded_len [$variant:ident $ty:ident noparse $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { @@ -113,7 +113,7 @@ macro_rules! gen_message { } } }; - (@gen_message_enum_encode [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { + (@gen_message_enum_encode [$variant:ident Control $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { paste::paste! { gen_message!(@gen_message_enum_encode [$($tail)*] -> [(Message::$variant(msg), service_protocol_version, buf) => ] as $crate::message_codec::encoding::ServiceWireEncoder>::encode(msg, buf, service_protocol_version)?, $($body)*]); } }; (@gen_message_enum_encode [$variant:ident $ty:ident noparse $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { @@ -133,7 +133,7 @@ macro_rules! gen_message { } } }; - (@gen_message_enum_proto_debug [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { + (@gen_message_enum_proto_debug [$variant:ident Control $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { gen_message!(@gen_message_enum_proto_debug [$($tail)*] -> [Message::$variant(msg) => format!("{msg:?}"), $($body)*]); }; (@gen_message_enum_proto_debug [$variant:ident $ty:ident noparse $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { @@ -167,6 +167,9 @@ macro_rules! gen_message { } } }; + (@gen_message_type_enum_allows_ack [$variant:ident Control allows_ack = $id:literal, $($tail:tt)*] -> [$($variants:tt)*]) => { + gen_message!(@gen_message_type_enum_allows_ack [$($tail)*] -> [MessageType::$variant | $($variants)*]); + }; (@gen_message_type_enum_allows_ack [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($variants:tt)*]) => { gen_message!(@gen_message_type_enum_allows_ack [$($tail)*] -> [$($variants)*]); }; @@ -190,7 +193,7 @@ macro_rules! gen_message { } } }; - (@gen_message_type_enum_decode [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { + (@gen_message_type_enum_decode [$variant:ident Control $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { paste::paste! { gen_message!(@gen_message_type_enum_decode [$($tail)*] -> [(MessageType::$variant, buf, service_protocol_version) => Ok(Message::$variant(] as $crate::message_codec::encoding::ServiceWireDecoder>::decode(buf, service_protocol_version)?)), $($body)*]); } }; (@gen_message_type_enum_decode [$variant:ident $ty:ident noparse $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { @@ -334,7 +337,7 @@ gen_message!( Error Control = 0x0002, End Control = 0x0003, CommandAck Control = 0x0004, - ProposeRunCompletion Control = 0x0005, + ProposeRunCompletion Control allows_ack = 0x0005, AwaitingOn Control = 0x0006, ProposeRunCompletionAck Control = 0x0007, From 4ca572bd505c017112761de45637af1ccf4e1d0a Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 19 May 2026 16:40:39 +0200 Subject: [PATCH 7/7] Renamed requires_ack to requested_ack, to better convey the new meaning. Update documentation on the field. --- .../src/invocation_state_machine.rs | 20 +++++------ .../invoker-impl/src/invocation_task/mod.rs | 11 +++--- .../service_protocol_runner_v4.rs | 12 +++---- crates/invoker-impl/src/lib.rs | 16 ++++----- .../src/message_codec/header.rs | 36 +++++++++++-------- .../dev/restate/service/protocol.proto | 6 +++- .../dissector.lua | 6 ++-- .../src/lib.rs | 4 +-- 8 files changed, 61 insertions(+), 50 deletions(-) diff --git a/crates/invoker-impl/src/invocation_state_machine.rs b/crates/invoker-impl/src/invocation_state_machine.rs index d30b2b9b6c..dc5d2ed579 100644 --- a/crates/invoker-impl/src/invocation_state_machine.rs +++ b/crates/invoker-impl/src/invocation_state_machine.rs @@ -362,7 +362,7 @@ impl InvocationStateMachine { } } - pub(super) fn notify_new_command(&mut self, command_index: CommandIndex, requires_ack: bool) { + pub(super) fn notify_new_command(&mut self, command_index: CommandIndex, requested_ack: bool) { debug_assert!(matches!( &self.invocation_state, AttemptState::InFlight { .. } @@ -378,7 +378,7 @@ impl InvocationStateMachine { .. } = &mut self.invocation_state { - if requires_ack { + if requested_ack { entries_to_ack.insert(command_index); } journal_tracker.notify_command_sent_to_partition_processor(command_index); @@ -389,7 +389,7 @@ impl InvocationStateMachine { &mut self, notification_type: NotificationType, notification_id: NotificationId, - requires_ack: bool, + requested_ack: bool, ) { debug_assert!(matches!( &self.invocation_state, @@ -421,7 +421,7 @@ impl InvocationStateMachine { // If either condition is missing, the proposal flows through the normal // `notify_entry` → `Notification::Entry` path and the SDK receives the // full `RunCompletionNotificationMessage` like on older protocols. - if requires_ack + if requested_ack && let Some(pinned_deployment) = using_deployment && pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V7 { @@ -766,7 +766,7 @@ mod tests { } #[test(tokio::test)] - async fn handle_requires_ack() { + async fn handle_requested_ack() { let mut invocation_state_machine = create_test_invocation_state_machine(); let abort_handle = tokio::spawn(async {}).abort_handle(); @@ -811,7 +811,7 @@ mod tests { let mut ism = create_test_invocation_state_machine(); let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7); - // Track a proposal for CompletionId 5 with requires_ack=true + // Track a proposal for CompletionId 5 with requested_ack=true ism.notify_new_notification_proposal( NotificationType::Completion(CompletionType::Run), NotificationId::CompletionId(5), @@ -838,7 +838,7 @@ mod tests { // Proposal is recorded in the journal tracker (for retry safety) but NOT // tracked for swapping, because the deployment is on protocol v6 — even - // if the SDK had set requires_ack, the runtime caps the behaviour at v7. + // if the SDK had set requested_ack, the runtime caps the behaviour at v7. ism.notify_new_notification_proposal( NotificationType::Completion(CompletionType::Run), NotificationId::CompletionId(5), @@ -852,11 +852,11 @@ mod tests { } #[test(tokio::test)] - async fn notify_entry_on_v7_without_requires_ack_falls_through_to_entry() { + async fn notify_entry_on_v7_without_requested_ack_falls_through_to_entry() { let mut ism = create_test_invocation_state_machine(); let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7); - // V7 deployment but SDK did NOT set the requires_ack header flag — the + // V7 deployment but SDK did NOT set the requested_ack header flag — the // proposal is tracked in the journal tracker for retry safety, but no // swap happens, and the SDK gets the full notification back. ism.notify_new_notification_proposal( @@ -974,7 +974,7 @@ mod tests { invocation_state_machine.start(abort_handle, tx); // Only RunCompletion notifications are valid proposals today; the ISM asserts this. - // requires_ack=false because this test is about journal-tracker accounting for + // requested_ack=false because this test is about journal-tracker accounting for // retry safety, not about the v7 ack swap. invocation_state_machine.notify_new_notification_proposal( NotificationType::Completion(CompletionType::Run), diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index 6b579abae1..f49549ce82 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -175,15 +175,16 @@ pub(super) enum InvocationTaskOutputInner { /// When reading the entry from the storage this flag will always be false, as we never need to send acks for entries sent during a journal replay. /// /// See https://github.com/restatedev/service-protocol/blob/main/service-invocation-protocol.md#acknowledgment-of-stored-entries - requires_ack: bool, + requested_ack: bool, }, NewNotificationProposal { notification: RawNotification, /// If true, the SDK requested to be notified when the proposed notification - /// is durably stored — the runtime will send back a compact - /// `ProposeRunCompletionAckMessage` instead of forwarding the full - /// notification. Only honoured on protocol >= v7. - requires_ack: bool, + /// is durably stored. + /// + /// The runtime will send back `ProposeRunCompletionAckMessage` instead of `RunCompletionMessage`. + /// Only protocol >= v7. + requested_ack: bool, }, AwaitingOn { unresolved_future: UnresolvedFuture, diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index 68267cfdea..5096c29e17 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -910,9 +910,9 @@ where self.invocation_task .send_invoker_tx(InvocationTaskOutputInner::NewCommand { command_index: self.command_index, - requires_ack: mh - .requires_ack() - .expect("All command messages support requires_ack"), + requested_ack: mh + .requested_ack() + .expect("All command messages support requested_ack"), command, }); self.command_index += 1; @@ -979,9 +979,9 @@ where self.invocation_task.send_invoker_tx( InvocationTaskOutputInner::NewNotificationProposal { notification: raw_notification, - requires_ack: mh - .requires_ack() - .expect("ProposeRunCompletion message supports requires_ack"), + requested_ack: mh + .requested_ack() + .expect("ProposeRunCompletion message supports requested_ack"), }, ); diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 5769bf8548..062f1b6762 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -563,11 +563,11 @@ where requires_ack ).await }, - InvocationTaskOutputInner::NewNotificationProposal { notification, requires_ack } => { + InvocationTaskOutputInner::NewNotificationProposal { notification, requested_ack } => { self.handle_new_notification_proposal( invocation_id, notification, - requires_ack, + requested_ack, ).await }, InvocationTaskOutputInner::AwaitingOn { unresolved_future } => { @@ -585,12 +585,12 @@ where InvocationTaskOutputInner::Suspended(indexes) => { self.handle_invocation_task_suspended(invocation_id, indexes).await } - InvocationTaskOutputInner::NewCommand { command, command_index, requires_ack } => { + InvocationTaskOutputInner::NewCommand { command, command_index, requested_ack } => { self.handle_new_command( invocation_id, command_index, command, - requires_ack + requested_ack ).await } InvocationTaskOutputInner::SuspendedV2(future) => { @@ -888,7 +888,7 @@ where &mut self, invocation_id: InvocationId, notification: RawNotification, - requires_ack: bool, + requested_ack: bool, ) { if let Some((output_tx, ism)) = self .invocation_state_machine_manager @@ -897,7 +897,7 @@ where ism.notify_new_notification_proposal( notification.ty(), notification.id(), - requires_ack, + requested_ack, ); trace!( restate.invocation.target = %ism.invocation_target, @@ -939,13 +939,13 @@ where invocation_id: InvocationId, command_index: CommandIndex, command: RawCommand, - requires_ack: bool, + requested_ack: bool, ) { if let Some((output_tx, ism)) = self .invocation_state_machine_manager .resolve_invocation(&invocation_id) { - ism.notify_new_command(command_index, requires_ack); + ism.notify_new_command(command_index, requested_ack); trace!( restate.invocation.target = %ism.invocation_target, "Received a new command. Invocation state: {:?}", diff --git a/crates/service-protocol-v4/src/message_codec/header.rs b/crates/service-protocol-v4/src/message_codec/header.rs index 536684f036..6002b6423c 100644 --- a/crates/service-protocol-v4/src/message_codec/header.rs +++ b/crates/service-protocol-v4/src/message_codec/header.rs @@ -10,7 +10,7 @@ use super::{MessageType, UnknownMessageType}; -const REQUIRES_ACK_MASK: u64 = 0x8000_0000_0000; +const REQUESTED_ACK_MASK: u64 = 0x8000_0000_0000; #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct MessageHeader { @@ -18,7 +18,7 @@ pub struct MessageHeader { length: u32, // --- Flags - requires_ack_flag: Option, + requested_ack_flag: Option, } impl MessageHeader { @@ -28,11 +28,11 @@ impl MessageHeader { } #[inline] - fn _new(ty: MessageType, requires_ack_flag: Option, length: u32) -> Self { + fn _new(ty: MessageType, requested_ack_flag: Option, length: u32) -> Self { MessageHeader { ty, length, - requires_ack_flag, + requested_ack_flag, } } @@ -41,9 +41,11 @@ impl MessageHeader { self.ty } + /// For commands: requested_ack forces the runtime to send back CommandAckMessage + /// For run proposals: requested_ack forces the runtime to send back ProposeRunCompletionAckMessage instead of the whole RunCompletionMessage. #[inline] - pub fn requires_ack(&self) -> Option { - self.requires_ack_flag + pub fn requested_ack(&self) -> Option { + self.requested_ack_flag } #[inline] @@ -65,16 +67,14 @@ macro_rules! read_flag_if { impl TryFrom for MessageHeader { type Error = UnknownMessageType; - /// Deserialize the protocol header. - /// See https://github.com/restatedev/service-protocol/blob/main/service-invocation-protocol.md#message-header fn try_from(value: u64) -> Result { let ty_code = (value >> 48) as u16; let ty: MessageType = ty_code.try_into()?; - let requires_ack_flag = read_flag_if!(ty.allows_ack(), value, REQUIRES_ACK_MASK); + let requested_ack_flag = read_flag_if!(ty.allows_ack(), value, REQUESTED_ACK_MASK); let length = value as u32; - Ok(MessageHeader::_new(ty, requires_ack_flag, length)) + Ok(MessageHeader::_new(ty, requested_ack_flag, length)) } } @@ -87,16 +87,14 @@ macro_rules! write_flag { } impl From for u64 { - /// Serialize the protocol header. - /// See https://github.com/restatedev/service-protocol/blob/main/service-invocation-protocol.md#message-header fn from(message_header: crate::message_codec::MessageHeader) -> Self { let mut res = ((u16::from(message_header.ty) as u64) << 48) | (message_header.length as u64); write_flag!( - message_header.requires_ack_flag, + message_header.requested_ack_flag, &mut res, - REQUIRES_ACK_MASK + REQUESTED_ACK_MASK ); res @@ -121,7 +119,7 @@ mod tests { let header: MessageHeader = serialized.try_into().unwrap(); assert_eq!(header.message_type(), $ty); - assert_eq!(header.requires_ack(), $requires_ack); + assert_eq!(header.requested_ack(), $requires_ack); assert_eq!(header.frame_length(), $len); } }; @@ -158,6 +156,14 @@ mod tests { requires_ack: true ); + roundtrip_test!( + propose_run_completion, + MessageHeader::_new(ProposeRunCompletion, Some(true), 1), + ProposeRunCompletion, + 1, + requires_ack: true + ); + roundtrip_test!( custom_entry, MessageHeader::new(Custom(0xFC00), 10341), diff --git a/service-protocol/dev/restate/service/protocol.proto b/service-protocol/dev/restate/service/protocol.proto index 7a03c5fe4a..e9f15b1e51 100644 --- a/service-protocol/dev/restate/service/protocol.proto +++ b/service-protocol/dev/restate/service/protocol.proto @@ -194,10 +194,14 @@ message CommandAckMessage { uint32 command_index = 1; } +// Type: 0x0000 + 5 +// // This is a special control message to propose ctx.run completions to the runtime. // This won't be written to the journal immediately, but will appear later as a new notification (meaning the result was stored). // -// Type: 0x0000 + 5 +// In response to this message, the SDK expects the runtime to either: +// * if requested_ack = true -> Send back ProposeRunCompletionAckMessage with the related completion_id +// * if requested_ack = false -> Send back the whole notification just proposed message ProposeRunCompletionMessage { uint32 result_completion_id = 1; oneof result { diff --git a/tools/service-protocol-wireshark-dissector/dissector.lua b/tools/service-protocol-wireshark-dissector/dissector.lua index 7a9673804e..38286b61e6 100644 --- a/tools/service-protocol-wireshark-dissector/dissector.lua +++ b/tools/service-protocol-wireshark-dissector/dissector.lua @@ -16,9 +16,9 @@ local p_service_protocol = Proto("restate_service_protocol", "Restate Service En -- Define the fields local f_ty = ProtoField.uint16("restate_service_protocol.message_type", "Message Type", base.HEX) -local f_requires_ack = ProtoField.bool("restate_service_protocol.requires_ack", "REQUIRES_ACK", base.NONE, { - "Requires ack", - "Doesn't require ack" +local f_requires_ack = ProtoField.bool("restate_service_protocol.requested_ack", "REQUESTED_ACK", base.NONE, { + "Requested ack", + "Ack not requested" }) local f_len = ProtoField.uint16("restate_service_protocol.length", "Length", base.DEC) local f_message = ProtoField.string("restate_service_protocol.message", "Message", base.UNICODE) diff --git a/tools/service-protocol-wireshark-dissector/src/lib.rs b/tools/service-protocol-wireshark-dissector/src/lib.rs index 236c7bea98..ce65c18eb1 100644 --- a/tools/service-protocol-wireshark-dissector/src/lib.rs +++ b/tools/service-protocol-wireshark-dissector/src/lib.rs @@ -58,8 +58,8 @@ fn decode_packages(lua: &Lua, buf_lua: Value) -> LuaResult { ); // Optional flags - if let Some(requires_ack) = header.requires_ack() { - set_table_values!(message_table, "requires_ack" => requires_ack); + if let Some(requested_ack) = header.requested_ack() { + set_table_values!(message_table, "requested_ack" => requested_ack); } // For some messages, spit out more stuff