diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 87e81cc154..931b32c2e6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -159,7 +159,7 @@ jobs: with: restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} serviceImage: "ghcr.io/restatedev/test-services-java:main" - testArtifactOutput: sdk-java-integration-test-journal-table-v2 + testArtifactOutput: sdk-java-integration-journal-table-v2-test-report envVars: | RESTATE_INTERNAL_FORCE_MIN_RESTATE_VERSION=1.6.0-dev @@ -178,7 +178,7 @@ jobs: with: restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} serviceImage: "ghcr.io/restatedev/test-services-java:main" - testArtifactOutput: sdk-java-integration-test-vqueues + testArtifactOutput: sdk-java-integration-vqueues-test-report envVars: | RESTATE_EXPERIMENTAL_ENABLE_VQUEUES=true # we don't expect all tests to pass, that's why we continue on error @@ -215,7 +215,7 @@ jobs: serviceImage: "ghcr.io/restatedev/test-services-python:main" envVars: | RESTATE_experimental_enable_protocol_v7=true - testArtifactOutput: sdk-python-protocol-v7 + testArtifactOutput: sdk-python-protocol-v7-test-report sdk-go: name: Run SDK-Go integration tests @@ -261,7 +261,7 @@ jobs: restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} envVars: | RESTATE_experimental_enable_protocol_v7=true - testArtifactOutput: sdk-typescript-protocol-v7 + testArtifactOutput: sdk-typescript-protocol-v7-test-report sdk-rust: name: Run SDK-Rust integration tests diff --git a/crates/invoker-impl/src/invocation_state_machine.rs b/crates/invoker-impl/src/invocation_state_machine.rs index 85a2d75de7..dc5d2ed579 100644 --- a/crates/invoker-impl/src/invocation_state_machine.rs +++ b/crates/invoker-impl/src/invocation_state_machine.rs @@ -18,6 +18,7 @@ use restate_memory::LocalMemoryPool; use restate_types::identifiers::EntryIndex; use restate_types::retries; use restate_types::schema::invocation_target::OnMaxAttempts; +use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::vqueues::VQueueId; use restate_worker_api::resources::ReservedResources; @@ -147,6 +148,17 @@ enum AttemptState { // Acks that should be propagated back to the SDK command_acks_to_propagate: HashSet, + // Run completions the SDK proposed during this attempt. When the partition + // processor echoes the stored notification back via [`Self::notify_entry`], + // we swap [`Notification::Entry`] for [`Notification::ProposeRunCompletionAck`] + // so the SDK gets the ack message on the wire (only protocol >= v7). + // + // The SDK will replace the ack message with the full notification, kept around locally. + // + // This mechanism is used only in PROCESSING, for run completions proposed during the current attempt, + // and not when the invocation is REPLAYING. + run_completion_proposals_to_ack: HashSet, + // Deployment being used during this attempt using_deployment: Option, // If true, we need to notify the deployment id to the partition processor @@ -253,6 +265,7 @@ impl InvocationStateMachine { journal_tracker: Default::default(), abort_handle, command_acks_to_propagate: Default::default(), + run_completion_proposals_to_ack: Default::default(), using_deployment: None, should_notify_pinned_deployment: false, }; @@ -349,7 +362,7 @@ impl InvocationStateMachine { } } - pub(super) fn notify_new_command(&mut self, command_index: CommandIndex, requires_ack: bool) { + pub(super) fn notify_new_command(&mut self, command_index: CommandIndex, requested_ack: bool) { debug_assert!(matches!( &self.invocation_state, AttemptState::InFlight { .. } @@ -365,23 +378,55 @@ impl InvocationStateMachine { .. } = &mut self.invocation_state { - if requires_ack { + if requested_ack { entries_to_ack.insert(command_index); } journal_tracker.notify_command_sent_to_partition_processor(command_index); } } - pub(super) fn notify_new_notification_proposal(&mut self, notification_id: NotificationId) { + pub(super) fn notify_new_notification_proposal( + &mut self, + notification_type: NotificationType, + notification_id: NotificationId, + requested_ack: bool, + ) { debug_assert!(matches!( &self.invocation_state, AttemptState::InFlight { .. } )); + // The only notification proposal currently defined in the protocol is + // ProposeRunCompletionMessage. We assert the invariant explicitly so that + // if/when new proposal-like messages are added, this code is forced to be + // revisited rather than silently mistreating them as run completions. + assert_eq!( + notification_type, + NotificationType::Completion(CompletionType::Run), + "the only notification proposal currently defined in the protocol is ProposeRunCompletionMessage", + ); + let completion_id = *notification_id + .try_as_completion_id_ref() + .expect("RunCompletion notification id must be a CompletionId"); + if let AttemptState::InFlight { - journal_tracker, .. + journal_tracker, + run_completion_proposals_to_ack, + using_deployment, + .. } = &mut self.invocation_state { + // We track the run completion proposal to ack only if the SDK asked for + // an ack (header flag) AND the negotiated protocol supports it (>= v7). + // If either condition is missing, the proposal flows through the normal + // `notify_entry` → `Notification::Entry` path and the SDK receives the + // full `RunCompletionNotificationMessage` like on older protocols. + if requested_ack + && let Some(pinned_deployment) = using_deployment + && pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V7 + { + run_completion_proposals_to_ack.insert(completion_id); + } journal_tracker.notify_notification_proposed_to_partition_processor(notification_id); } } @@ -395,7 +440,10 @@ impl InvocationStateMachine { .. } => { if command_acks_to_propagate.remove(&command_index) { - Self::try_send_notification(notifications_tx, Notification::Ack(command_index)); + Self::try_send_notification( + notifications_tx, + Notification::CommandAck(command_index), + ); } journal_tracker.notify_acked_command_from_partition_processor(command_index); } @@ -426,11 +474,20 @@ impl InvocationStateMachine { AttemptState::InFlight { journal_tracker, notifications_tx, + run_completion_proposals_to_ack, .. } => { + let to_send = match ¬ification_id { + // We send RunCompletionAck only if we're tracking this specific run completion. + NotificationId::CompletionId(c) + if run_completion_proposals_to_ack.remove(c) => + { + Notification::ProposeRunCompletionAck(*c) + } + _ => Notification::Entry(entry_index), + }; journal_tracker.notify_acked_notification_from_partition_processor(notification_id); - - Self::try_send_notification(notifications_tx, Notification::Entry(entry_index)); + Self::try_send_notification(notifications_tx, to_send); } AttemptState::WaitingRetry { journal_tracker, .. @@ -709,7 +766,7 @@ mod tests { } #[test(tokio::test)] - async fn handle_requires_ack() { + async fn handle_requested_ack() { let mut invocation_state_machine = create_test_invocation_state_machine(); let abort_handle = tokio::spawn(async {}).abort_handle(); @@ -726,15 +783,106 @@ mod tests { // Check notification was sent for ack 1 and 3 let notification = rx.recv().await; - assert_that!(notification, some(pat!(Notification::Ack(eq(1))))); + assert_that!(notification, some(pat!(Notification::CommandAck(eq(1))))); let notification = rx.recv().await; - assert_that!(notification, some(pat!(Notification::Ack(eq(3))))); + assert_that!(notification, some(pat!(Notification::CommandAck(eq(3))))); // Channel should be empty let try_recv = rx.try_recv(); assert_that!(try_recv, err(eq(TryRecvError::Empty))); } + fn start_with_protocol( + ism: &mut InvocationStateMachine, + version: ServiceProtocolVersion, + ) -> mpsc::UnboundedReceiver { + let abort_handle = tokio::spawn(async {}).abort_handle(); + let (tx, rx) = mpsc::unbounded_channel(); + ism.start(abort_handle, tx); + ism.notify_pinned_deployment( + PinnedDeployment::new(DeploymentId::default(), version), + true, + ); + rx + } + + #[test(tokio::test)] + async fn notify_entry_swaps_proposed_run_completion_to_ack_on_v7() { + let mut ism = create_test_invocation_state_machine(); + let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7); + + // Track a proposal for CompletionId 5 with requested_ack=true + ism.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(5), + true, + ); + + // PP echoes back the stored notification — the ISM must swap to the ack + ism.notify_entry(7, NotificationId::CompletionId(5)); + assert_that!( + rx.recv().await, + some(pat!(Notification::ProposeRunCompletionAck(eq(5)))) + ); + + // The proposal was consumed: a second notify_entry for the same id falls + // back to the regular Entry path. + ism.notify_entry(7, NotificationId::CompletionId(5)); + assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(7))))); + } + + #[test(tokio::test)] + async fn notify_entry_does_not_swap_on_v6_old_path_preserved() { + let mut ism = create_test_invocation_state_machine(); + let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V6); + + // Proposal is recorded in the journal tracker (for retry safety) but NOT + // tracked for swapping, because the deployment is on protocol v6 — even + // if the SDK had set requested_ack, the runtime caps the behaviour at v7. + ism.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(5), + true, + ); + + // PP echoes back the stored notification — the ISM forwards the full Entry, + // exactly like before protocol v7 existed. + ism.notify_entry(7, NotificationId::CompletionId(5)); + assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(7))))); + } + + #[test(tokio::test)] + async fn notify_entry_on_v7_without_requested_ack_falls_through_to_entry() { + let mut ism = create_test_invocation_state_machine(); + let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7); + + // V7 deployment but SDK did NOT set the requested_ack header flag — the + // proposal is tracked in the journal tracker for retry safety, but no + // swap happens, and the SDK gets the full notification back. + ism.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(5), + false, + ); + + ism.notify_entry(7, NotificationId::CompletionId(5)); + assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(7))))); + } + + #[test(tokio::test)] + async fn notify_entry_on_v7_without_proposal_falls_through_to_entry() { + let mut ism = create_test_invocation_state_machine(); + let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7); + + // No prior proposal: a completion notification flows through as Entry. + ism.notify_entry(3, NotificationId::CompletionId(5)); + assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(3))))); + + // Signals are also never swapped — only completion ids tracked at propose time qualify. + ism.notify_entry(4, NotificationId::SignalIndex(17)); + assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(4))))); + } + #[test(tokio::test)] async fn handle_error_counts_attempts_on_same_entry() { let mut invocation_state_machine = create_test_invocation_state_machine(); @@ -825,8 +973,19 @@ mod tests { let (tx, _rx) = mpsc::unbounded_channel(); invocation_state_machine.start(abort_handle, tx); - invocation_state_machine.notify_new_notification_proposal(NotificationId::SignalIndex(18)); - invocation_state_machine.notify_new_notification_proposal(NotificationId::CompletionId(1)); + // Only RunCompletion notifications are valid proposals today; the ISM asserts this. + // requested_ack=false because this test is about journal-tracker accounting for + // retry safety, not about the v7 ack swap. + invocation_state_machine.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(18), + false, + ); + invocation_state_machine.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(1), + false, + ); let_assert!( OnTaskError::Retrying(_) = invocation_state_machine.handle_task_error(true, None, false, true, |_| 0) @@ -836,8 +995,8 @@ mod tests { assert!(!invocation_state_machine.is_ready_to_retry()); assert!(let AttemptState::WaitingRetry { .. } = invocation_state_machine.invocation_state); - // Got signal 18 - invocation_state_machine.notify_entry(0, NotificationId::SignalIndex(18)); + // Got completion 18 + invocation_state_machine.notify_entry(0, NotificationId::CompletionId(18)); // Retry timer fired invocation_state_machine.notify_retry_timer_fired(0); diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index 08a6af335d..f49549ce82 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -175,10 +175,16 @@ pub(super) enum InvocationTaskOutputInner { /// When reading the entry from the storage this flag will always be false, as we never need to send acks for entries sent during a journal replay. /// /// See https://github.com/restatedev/service-protocol/blob/main/service-invocation-protocol.md#acknowledgment-of-stored-entries - requires_ack: bool, + requested_ack: bool, }, NewNotificationProposal { notification: RawNotification, + /// If true, the SDK requested to be notified when the proposed notification + /// is durably stored. + /// + /// The runtime will send back `ProposeRunCompletionAckMessage` instead of `RunCompletionMessage`. + /// Only protocol >= v7. + requested_ack: bool, }, AwaitingOn { unresolved_future: UnresolvedFuture, diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs index 0e24fe2b8d..d8753b9013 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs @@ -433,11 +433,11 @@ where trace!("Sending the completion to the wire"); crate::shortcircuit!(self.write_with_lease(&mut http_stream_tx, completion.into(), Some(lease))); }, - Some(Notification::Ack(entry_index)) => { + Some(Notification::CommandAck(entry_index)) => { trace!("Sending the ack to the wire"); crate::shortcircuit!(self.write(&mut http_stream_tx, ProtocolMessage::new_entry_ack(entry_index))); }, - Some(Notification::Entry { .. }) => { + Some(Notification::Entry { .. }) | Some(Notification::ProposeRunCompletionAck(_)) => { panic!("We don't expect to receive journal_v2 entries, this is an invoker bug.") }, None => { diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index 3fd1ed024f..5096c29e17 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -604,10 +604,14 @@ where Some(Notification::Completion(_)) => { panic!("We don't expect to receive Notification::Completion in v4+, this is an invoker bug.") }, - Some(Notification::Ack(entry_index)) => { + Some(Notification::CommandAck(entry_index)) => { trace!("Sending the ack to the wire"); shortcircuit!(self.write(&mut http_stream_tx, Message::new_command_ack(entry_index))); }, + Some(Notification::ProposeRunCompletionAck(completion_id)) => { + trace!("Sending ProposeRunCompletionAck to the wire"); + shortcircuit!(self.write(&mut http_stream_tx, Message::new_propose_run_completion_ack(completion_id))); + }, None => { // Completion channel is closed, // the invoker main loop won't send completions anymore. @@ -906,9 +910,9 @@ where self.invocation_task .send_invoker_tx(InvocationTaskOutputInner::NewCommand { command_index: self.command_index, - requires_ack: mh - .requires_ack() - .expect("All command messages support requires_ack"), + requested_ack: mh + .requested_ack() + .expect("All command messages support requested_ack"), command, }); self.command_index += 1; @@ -933,6 +937,9 @@ where Message::CommandAck(_) => TerminalLoopState::Failed(InvokerError::UnexpectedMessageV4( MessageType::CommandAck, )), + Message::ProposeRunCompletionAck(_) => TerminalLoopState::Failed( + InvokerError::UnexpectedMessageV4(MessageType::ProposeRunCompletionAck), + ), Message::Suspension(suspension) => self.handle_suspension_message(suspension), Message::AwaitingOn(awaiting_on) => self.handle_awaiting_on_message(awaiting_on), Message::Error(e) => self.handle_error_message(e), @@ -972,6 +979,9 @@ where self.invocation_task.send_invoker_tx( InvocationTaskOutputInner::NewNotificationProposal { notification: raw_notification, + requested_ack: mh + .requested_ack() + .expect("ProposeRunCompletion message supports requested_ack"), }, ); diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 7d6d19ef79..062f1b6762 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -53,7 +53,10 @@ use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal_events::raw::RawEvent; use restate_types::journal_events::{Event, PausedEvent, TransientErrorEvent}; use restate_types::journal_v2::raw::{RawCommand, RawNotification}; -use restate_types::journal_v2::{CommandIndex, EntryMetadata, NotificationId, UnresolvedFuture}; +use restate_types::journal_v2::{ + CommandIndex, CompletionId, CompletionType, EntryMetadata, NotificationId, NotificationType, + UnresolvedFuture, +}; use restate_types::live::{Live, LiveLoad}; use restate_types::schema::deployment::DeploymentResolver; use restate_types::schema::invocation_target::InvocationTargetResolver; @@ -94,7 +97,9 @@ pub(crate) enum Notification { /// V2 notification signal: entry index. Entry(EntryIndex), /// V2 command ack: already signal-only. - Ack(CommandIndex), + CommandAck(CommandIndex), + /// Propose run completion ack (protocol >= v7). + ProposeRunCompletionAck(CompletionId), } // -- InvocationTask factory: we use this to mock the state machine in tests @@ -558,10 +563,11 @@ where requires_ack ).await }, - InvocationTaskOutputInner::NewNotificationProposal { notification } => { + InvocationTaskOutputInner::NewNotificationProposal { notification, requested_ack } => { self.handle_new_notification_proposal( invocation_id, - notification + notification, + requested_ack, ).await }, InvocationTaskOutputInner::AwaitingOn { unresolved_future } => { @@ -579,12 +585,12 @@ where InvocationTaskOutputInner::Suspended(indexes) => { self.handle_invocation_task_suspended(invocation_id, indexes).await } - InvocationTaskOutputInner::NewCommand { command, command_index, requires_ack } => { + InvocationTaskOutputInner::NewCommand { command, command_index, requested_ack } => { self.handle_new_command( invocation_id, command_index, command, - requires_ack + requested_ack ).await } InvocationTaskOutputInner::SuspendedV2(future) => { @@ -882,12 +888,17 @@ where &mut self, invocation_id: InvocationId, notification: RawNotification, + requested_ack: bool, ) { if let Some((output_tx, ism)) = self .invocation_state_machine_manager .resolve_invocation(&invocation_id) { - ism.notify_new_notification_proposal(notification.id()); + ism.notify_new_notification_proposal( + notification.ty(), + notification.id(), + requested_ack, + ); trace!( restate.invocation.target = %ism.invocation_target, "Received a new notification. Invocation state: {:?}", @@ -928,13 +939,13 @@ where invocation_id: InvocationId, command_index: CommandIndex, command: RawCommand, - requires_ack: bool, + requested_ack: bool, ) { if let Some((output_tx, ism)) = self .invocation_state_machine_manager .resolve_invocation(&invocation_id) { - ism.notify_new_command(command_index, requires_ack); + ism.notify_new_command(command_index, requested_ack); trace!( restate.invocation.target = %ism.invocation_target, "Received a new command. Invocation state: {:?}", @@ -2427,7 +2438,11 @@ mod tests { ism.start(tokio::spawn(async {}).abort_handle(), tx); // Add a notification proposal - ism.notify_new_notification_proposal(NotificationId::CompletionId(1)); + ism.notify_new_notification_proposal( + NotificationType::Completion(CompletionType::Run), + NotificationId::CompletionId(1), + false, + ); // Register the ISM and use handle_invocation_task_failed to put it in WaitingRetry state. // This will register the timer in the real DelayQueue. diff --git a/crates/service-protocol-v4/src/lib.rs b/crates/service-protocol-v4/src/lib.rs index 6641bd23a4..0b6bc23b51 100644 --- a/crates/service-protocol-v4/src/lib.rs +++ b/crates/service-protocol-v4/src/lib.rs @@ -34,6 +34,7 @@ pub mod proto { EndMessage, CommandAckMessage, ProposeRunCompletionMessage, + ProposeRunCompletionAckMessage, CallCommandMessage, OneWayCallCommandMessage, AwaitingOnMessage, diff --git a/crates/service-protocol-v4/src/message_codec/header.rs b/crates/service-protocol-v4/src/message_codec/header.rs index 536684f036..6002b6423c 100644 --- a/crates/service-protocol-v4/src/message_codec/header.rs +++ b/crates/service-protocol-v4/src/message_codec/header.rs @@ -10,7 +10,7 @@ use super::{MessageType, UnknownMessageType}; -const REQUIRES_ACK_MASK: u64 = 0x8000_0000_0000; +const REQUESTED_ACK_MASK: u64 = 0x8000_0000_0000; #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct MessageHeader { @@ -18,7 +18,7 @@ pub struct MessageHeader { length: u32, // --- Flags - requires_ack_flag: Option, + requested_ack_flag: Option, } impl MessageHeader { @@ -28,11 +28,11 @@ impl MessageHeader { } #[inline] - fn _new(ty: MessageType, requires_ack_flag: Option, length: u32) -> Self { + fn _new(ty: MessageType, requested_ack_flag: Option, length: u32) -> Self { MessageHeader { ty, length, - requires_ack_flag, + requested_ack_flag, } } @@ -41,9 +41,11 @@ impl MessageHeader { self.ty } + /// For commands: requested_ack forces the runtime to send back CommandAckMessage + /// For run proposals: requested_ack forces the runtime to send back ProposeRunCompletionAckMessage instead of the whole RunCompletionMessage. #[inline] - pub fn requires_ack(&self) -> Option { - self.requires_ack_flag + pub fn requested_ack(&self) -> Option { + self.requested_ack_flag } #[inline] @@ -65,16 +67,14 @@ macro_rules! read_flag_if { impl TryFrom for MessageHeader { type Error = UnknownMessageType; - /// Deserialize the protocol header. - /// See https://github.com/restatedev/service-protocol/blob/main/service-invocation-protocol.md#message-header fn try_from(value: u64) -> Result { let ty_code = (value >> 48) as u16; let ty: MessageType = ty_code.try_into()?; - let requires_ack_flag = read_flag_if!(ty.allows_ack(), value, REQUIRES_ACK_MASK); + let requested_ack_flag = read_flag_if!(ty.allows_ack(), value, REQUESTED_ACK_MASK); let length = value as u32; - Ok(MessageHeader::_new(ty, requires_ack_flag, length)) + Ok(MessageHeader::_new(ty, requested_ack_flag, length)) } } @@ -87,16 +87,14 @@ macro_rules! write_flag { } impl From for u64 { - /// Serialize the protocol header. - /// See https://github.com/restatedev/service-protocol/blob/main/service-invocation-protocol.md#message-header fn from(message_header: crate::message_codec::MessageHeader) -> Self { let mut res = ((u16::from(message_header.ty) as u64) << 48) | (message_header.length as u64); write_flag!( - message_header.requires_ack_flag, + message_header.requested_ack_flag, &mut res, - REQUIRES_ACK_MASK + REQUESTED_ACK_MASK ); res @@ -121,7 +119,7 @@ mod tests { let header: MessageHeader = serialized.try_into().unwrap(); assert_eq!(header.message_type(), $ty); - assert_eq!(header.requires_ack(), $requires_ack); + assert_eq!(header.requested_ack(), $requires_ack); assert_eq!(header.frame_length(), $len); } }; @@ -158,6 +156,14 @@ mod tests { requires_ack: true ); + roundtrip_test!( + propose_run_completion, + MessageHeader::_new(ProposeRunCompletion, Some(true), 1), + ProposeRunCompletion, + 1, + requires_ack: true + ); + roundtrip_test!( custom_entry, MessageHeader::new(Custom(0xFC00), 10341), diff --git a/crates/service-protocol-v4/src/message_codec/mod.rs b/crates/service-protocol-v4/src/message_codec/mod.rs index ac330932c2..8d4df018bd 100644 --- a/crates/service-protocol-v4/src/message_codec/mod.rs +++ b/crates/service-protocol-v4/src/message_codec/mod.rs @@ -55,7 +55,7 @@ macro_rules! gen_message { Custom(u16, bytes::Bytes) } }; - (@gen_message_enum [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { + (@gen_message_enum [$variant:ident Control $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { paste::paste! { gen_message!(@gen_message_enum [$($tail)*] -> [$variant(proto::[< $variant Message >]), $($body)*]); } }; (@gen_message_enum [$variant:ident $ty:ident noparse $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { @@ -75,7 +75,7 @@ macro_rules! gen_message { } } }; - (@gen_message_enum_encoded_len [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { + (@gen_message_enum_encoded_len [$variant:ident Control $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { paste::paste! { gen_message!(@gen_message_enum_encoded_len [$($tail)*] -> [(Message::$variant(msg), service_protocol_version) => ] as $crate::message_codec::encoding::ServiceWireEncoder>::encoded_len(msg, service_protocol_version), $($body)*]); } }; (@gen_message_enum_encoded_len [$variant:ident $ty:ident noparse $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { @@ -113,7 +113,7 @@ macro_rules! gen_message { } } }; - (@gen_message_enum_encode [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { + (@gen_message_enum_encode [$variant:ident Control $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { paste::paste! { gen_message!(@gen_message_enum_encode [$($tail)*] -> [(Message::$variant(msg), service_protocol_version, buf) => ] as $crate::message_codec::encoding::ServiceWireEncoder>::encode(msg, buf, service_protocol_version)?, $($body)*]); } }; (@gen_message_enum_encode [$variant:ident $ty:ident noparse $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { @@ -133,7 +133,7 @@ macro_rules! gen_message { } } }; - (@gen_message_enum_proto_debug [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { + (@gen_message_enum_proto_debug [$variant:ident Control $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { gen_message!(@gen_message_enum_proto_debug [$($tail)*] -> [Message::$variant(msg) => format!("{msg:?}"), $($body)*]); }; (@gen_message_enum_proto_debug [$variant:ident $ty:ident noparse $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { @@ -167,6 +167,9 @@ macro_rules! gen_message { } } }; + (@gen_message_type_enum_allows_ack [$variant:ident Control allows_ack = $id:literal, $($tail:tt)*] -> [$($variants:tt)*]) => { + gen_message!(@gen_message_type_enum_allows_ack [$($tail)*] -> [MessageType::$variant | $($variants)*]); + }; (@gen_message_type_enum_allows_ack [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($variants:tt)*]) => { gen_message!(@gen_message_type_enum_allows_ack [$($tail)*] -> [$($variants)*]); }; @@ -190,7 +193,7 @@ macro_rules! gen_message { } } }; - (@gen_message_type_enum_decode [$variant:ident Control = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { + (@gen_message_type_enum_decode [$variant:ident Control $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { paste::paste! { gen_message!(@gen_message_type_enum_decode [$($tail)*] -> [(MessageType::$variant, buf, service_protocol_version) => Ok(Message::$variant(] as $crate::message_codec::encoding::ServiceWireDecoder>::decode(buf, service_protocol_version)?)), $($body)*]); } }; (@gen_message_type_enum_decode [$variant:ident $ty:ident noparse $($ignore:ident)* = $id:literal, $($tail:tt)*] -> [$($body:tt)*]) => { @@ -334,8 +337,9 @@ gen_message!( Error Control = 0x0002, End Control = 0x0003, CommandAck Control = 0x0004, - ProposeRunCompletion Control = 0x0005, + ProposeRunCompletion Control allows_ack = 0x0005, AwaitingOn Control = 0x0006, + ProposeRunCompletionAck Control = 0x0007, Input Command noparse allows_ack = 0x0400, Output Command noparse allows_ack = 0x0401, @@ -420,4 +424,8 @@ impl Message { pub fn new_command_ack(command_index: CommandIndex) -> Self { Self::CommandAck(proto::CommandAckMessage { command_index }) } + + pub fn new_propose_run_completion_ack(completion_id: u32) -> Self { + Self::ProposeRunCompletionAck(proto::ProposeRunCompletionAckMessage { completion_id }) + } } diff --git a/crates/types/src/journal_v2/notification.rs b/crates/types/src/journal_v2/notification.rs index 5789d20043..79ee0e2ff1 100644 --- a/crates/types/src/journal_v2/notification.rs +++ b/crates/types/src/journal_v2/notification.rs @@ -21,7 +21,7 @@ use crate::journal_v2::{ }; /// See [`Notification`]. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::EnumTryAs)] pub enum NotificationId { CompletionId(CompletionId), SignalIndex(SignalIndex), diff --git a/service-protocol/dev/restate/service/protocol.proto b/service-protocol/dev/restate/service/protocol.proto index 8851b4af6d..e9f15b1e51 100644 --- a/service-protocol/dev/restate/service/protocol.proto +++ b/service-protocol/dev/restate/service/protocol.proto @@ -44,6 +44,7 @@ enum ServiceProtocolVersion { // * WorkflowTarget.scope // * IdempotentRequestTarget.scope // * StartMessage.scope, StartMessage.limit_key and StartMessage.idempotency_key + // * Semantic changes to Run proposal response, introduced ProposeRunCompletionAckMessage // * ErrorMessage.should_pause V7 = 7; } @@ -193,10 +194,14 @@ message CommandAckMessage { uint32 command_index = 1; } +// Type: 0x0000 + 5 +// // This is a special control message to propose ctx.run completions to the runtime. // This won't be written to the journal immediately, but will appear later as a new notification (meaning the result was stored). // -// Type: 0x0000 + 5 +// In response to this message, the SDK expects the runtime to either: +// * if requested_ack = true -> Send back ProposeRunCompletionAckMessage with the related completion_id +// * if requested_ack = false -> Send back the whole notification just proposed message ProposeRunCompletionMessage { uint32 result_completion_id = 1; oneof result { @@ -215,6 +220,33 @@ message AwaitingOnMessage { bool executing_side_effects = 2; } +// Type: 0x0000 + 7 +// +// This is a message sent as response to ProposeRunCompletionMessage to acknowledge the proposal was correctly stored and replicated. +// +// Its ordering is considered to be "relative" to the ordering of notifications. +// In other words, the SDK expects that on replay the RunCompletionMessage, +// to which the ProposeRunCompletionAckMessage relates to, +// is sent in the same order relative to the other notifications. +// +// For example, if when PROCESSING the first time the SDK sees: +// +// -> SleepCompletionMessage id = 1 +// -> ProposeRunCompletionAckMessage id = 3 +// -> SleepCompletionMessage id = 2 +// +// Then on the next replay, the SDK expects to see notifications in the following order: +// +// -> SleepCompletionMessage id = 1 +// -> RunCompletionMessage id = 3 (the actual completion here replaces the ack) +// -> SleepCompletionMessage id = 2 + +// +// This message will only ever be sent as response to ProposeRunCompletionMessage, and will be sent only in the PROCESSING phase of the protocol, never during REPLAY. +message ProposeRunCompletionAckMessage { + uint32 completion_id = 1; +} + // --- Commands and Notifications --- // The Journal is modelled as commands and notifications. diff --git a/tools/service-protocol-wireshark-dissector/dissector.lua b/tools/service-protocol-wireshark-dissector/dissector.lua index 7a9673804e..38286b61e6 100644 --- a/tools/service-protocol-wireshark-dissector/dissector.lua +++ b/tools/service-protocol-wireshark-dissector/dissector.lua @@ -16,9 +16,9 @@ local p_service_protocol = Proto("restate_service_protocol", "Restate Service En -- Define the fields local f_ty = ProtoField.uint16("restate_service_protocol.message_type", "Message Type", base.HEX) -local f_requires_ack = ProtoField.bool("restate_service_protocol.requires_ack", "REQUIRES_ACK", base.NONE, { - "Requires ack", - "Doesn't require ack" +local f_requires_ack = ProtoField.bool("restate_service_protocol.requested_ack", "REQUESTED_ACK", base.NONE, { + "Requested ack", + "Ack not requested" }) local f_len = ProtoField.uint16("restate_service_protocol.length", "Length", base.DEC) local f_message = ProtoField.string("restate_service_protocol.message", "Message", base.UNICODE) diff --git a/tools/service-protocol-wireshark-dissector/src/lib.rs b/tools/service-protocol-wireshark-dissector/src/lib.rs index 236c7bea98..ce65c18eb1 100644 --- a/tools/service-protocol-wireshark-dissector/src/lib.rs +++ b/tools/service-protocol-wireshark-dissector/src/lib.rs @@ -58,8 +58,8 @@ fn decode_packages(lua: &Lua, buf_lua: Value) -> LuaResult { ); // Optional flags - if let Some(requires_ack) = header.requires_ack() { - set_table_values!(message_table, "requires_ack" => requires_ack); + if let Some(requested_ack) = header.requested_ack() { + set_table_values!(message_table, "requested_ack" => requested_ack); } // For some messages, spit out more stuff