Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ jobs:
with:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
serviceImage: "ghcr.io/restatedev/test-services-java:main"
testArtifactOutput: sdk-java-integration-test-journal-table-v2
testArtifactOutput: sdk-java-integration-journal-table-v2-test-report
envVars: |
RESTATE_INTERNAL_FORCE_MIN_RESTATE_VERSION=1.6.0-dev

Expand All @@ -178,7 +178,7 @@ jobs:
with:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
serviceImage: "ghcr.io/restatedev/test-services-java:main"
testArtifactOutput: sdk-java-integration-test-vqueues
testArtifactOutput: sdk-java-integration-vqueues-test-report
envVars: |
RESTATE_EXPERIMENTAL_ENABLE_VQUEUES=true
# we don't expect all tests to pass, that's why we continue on error
Expand Down Expand Up @@ -215,7 +215,7 @@ jobs:
serviceImage: "ghcr.io/restatedev/test-services-python:main"
envVars: |
RESTATE_experimental_enable_protocol_v7=true
testArtifactOutput: sdk-python-protocol-v7
testArtifactOutput: sdk-python-protocol-v7-test-report

sdk-go:
name: Run SDK-Go integration tests
Expand Down Expand Up @@ -261,7 +261,7 @@ jobs:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
envVars: |
RESTATE_experimental_enable_protocol_v7=true
testArtifactOutput: sdk-typescript-protocol-v7
testArtifactOutput: sdk-typescript-protocol-v7-test-report

sdk-rust:
name: Run SDK-Rust integration tests
Expand Down
187 changes: 173 additions & 14 deletions crates/invoker-impl/src/invocation_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use restate_memory::LocalMemoryPool;
use restate_types::identifiers::EntryIndex;
use restate_types::retries;
use restate_types::schema::invocation_target::OnMaxAttempts;
use restate_types::service_protocol::ServiceProtocolVersion;
use restate_types::vqueues::VQueueId;
use restate_worker_api::resources::ReservedResources;

Expand Down Expand Up @@ -147,6 +148,17 @@ enum AttemptState<K: TimerKey> {
// Acks that should be propagated back to the SDK
command_acks_to_propagate: HashSet<CommandIndex>,

// Run completions the SDK proposed during this attempt. When the partition
// processor echoes the stored notification back via [`Self::notify_entry`],
// we swap [`Notification::Entry`] for [`Notification::ProposeRunCompletionAck`]
// so the SDK gets the ack message on the wire (only protocol >= v7).
//
// The SDK will replace the ack message with the full notification, kept around locally.
//
// This mechanism is used only in PROCESSING, for run completions proposed during the current attempt,
// and not when the invocation is REPLAYING.
run_completion_proposals_to_ack: HashSet<CompletionId>,
Comment thread
slinkydeveloper marked this conversation as resolved.

// Deployment being used during this attempt
using_deployment: Option<PinnedDeployment>,
// If true, we need to notify the deployment id to the partition processor
Expand Down Expand Up @@ -253,6 +265,7 @@ impl<K: TimerKey> InvocationStateMachine<K> {
journal_tracker: Default::default(),
abort_handle,
command_acks_to_propagate: Default::default(),
run_completion_proposals_to_ack: Default::default(),
using_deployment: None,
should_notify_pinned_deployment: false,
};
Expand Down Expand Up @@ -349,7 +362,7 @@ impl<K: TimerKey> InvocationStateMachine<K> {
}
}

pub(super) fn notify_new_command(&mut self, command_index: CommandIndex, requires_ack: bool) {
pub(super) fn notify_new_command(&mut self, command_index: CommandIndex, requested_ack: bool) {
debug_assert!(matches!(
&self.invocation_state,
AttemptState::InFlight { .. }
Expand All @@ -365,23 +378,55 @@ impl<K: TimerKey> InvocationStateMachine<K> {
..
} = &mut self.invocation_state
{
if requires_ack {
if requested_ack {
entries_to_ack.insert(command_index);
}
journal_tracker.notify_command_sent_to_partition_processor(command_index);
}
}

pub(super) fn notify_new_notification_proposal(&mut self, notification_id: NotificationId) {
pub(super) fn notify_new_notification_proposal(
&mut self,
notification_type: NotificationType,
notification_id: NotificationId,
requested_ack: bool,
) {
debug_assert!(matches!(
&self.invocation_state,
AttemptState::InFlight { .. }
));

// The only notification proposal currently defined in the protocol is
// ProposeRunCompletionMessage. We assert the invariant explicitly so that
// if/when new proposal-like messages are added, this code is forced to be
// revisited rather than silently mistreating them as run completions.
assert_eq!(
notification_type,
NotificationType::Completion(CompletionType::Run),
"the only notification proposal currently defined in the protocol is ProposeRunCompletionMessage",
);
let completion_id = *notification_id
.try_as_completion_id_ref()
.expect("RunCompletion notification id must be a CompletionId");

if let AttemptState::InFlight {
journal_tracker, ..
journal_tracker,
run_completion_proposals_to_ack,
using_deployment,
..
} = &mut self.invocation_state
{
// We track the run completion proposal to ack only if the SDK asked for
// an ack (header flag) AND the negotiated protocol supports it (>= v7).
// If either condition is missing, the proposal flows through the normal
// `notify_entry` → `Notification::Entry` path and the SDK receives the
// full `RunCompletionNotificationMessage` like on older protocols.
if requested_ack
&& let Some(pinned_deployment) = using_deployment
&& pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V7
{
run_completion_proposals_to_ack.insert(completion_id);
}
journal_tracker.notify_notification_proposed_to_partition_processor(notification_id);
}
}
Expand All @@ -395,7 +440,10 @@ impl<K: TimerKey> InvocationStateMachine<K> {
..
} => {
if command_acks_to_propagate.remove(&command_index) {
Self::try_send_notification(notifications_tx, Notification::Ack(command_index));
Self::try_send_notification(
notifications_tx,
Notification::CommandAck(command_index),
);
}
journal_tracker.notify_acked_command_from_partition_processor(command_index);
}
Expand Down Expand Up @@ -426,11 +474,20 @@ impl<K: TimerKey> InvocationStateMachine<K> {
AttemptState::InFlight {
journal_tracker,
notifications_tx,
run_completion_proposals_to_ack,
..
} => {
let to_send = match &notification_id {
// We send RunCompletionAck only if we're tracking this specific run completion.
NotificationId::CompletionId(c)
if run_completion_proposals_to_ack.remove(c) =>
{
Notification::ProposeRunCompletionAck(*c)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the SDK needed to drop the run completion value in the mean time (e.g. it was evicted from a size bound cache)? Would it fail with a transient error so that a replay fixes the problem?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the SDK needed to drop the run completion value in the mean time (e.g. it was evicted from a size bound cache)? Would it fail with a transient error so that a replay fixes the problem?

The SDK never drops it. I guess worst case the sdk OOMs and a retry happens

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow in the protocol for this to happen? Otherwise, I could see an endpoint quite easily ooming compared to before when we have a lot of concurrent ctx.run steps.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow in the protocol for this to happen?

The SDK can decide to drop only when proposing the completion obviously, not later (otherwise needs another message, more sync runtime <-> sdk and co).

I guess this for you boils down to having a field requesting the ack or the whole completion in the proposal message?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be as easy as adding this option to the example SDK implementation or in some form of description.

With the behavior you've added right now, we can fill up the cache and stop caching entries once the cache is full. Instead of evicting entries we don't store new ones. Entries are only evicted once we see it's ack message. I think this can have better runtime properties than evicting the oldest entries and having to fail an invocation whose completion was dropped.

}
_ => Notification::Entry(entry_index),
};
journal_tracker.notify_acked_notification_from_partition_processor(notification_id);

Self::try_send_notification(notifications_tx, Notification::Entry(entry_index));
Self::try_send_notification(notifications_tx, to_send);
}
AttemptState::WaitingRetry {
journal_tracker, ..
Expand Down Expand Up @@ -709,7 +766,7 @@ mod tests {
}

#[test(tokio::test)]
async fn handle_requires_ack() {
async fn handle_requested_ack() {
let mut invocation_state_machine = create_test_invocation_state_machine();

let abort_handle = tokio::spawn(async {}).abort_handle();
Expand All @@ -726,15 +783,106 @@ mod tests {

// Check notification was sent for ack 1 and 3
let notification = rx.recv().await;
assert_that!(notification, some(pat!(Notification::Ack(eq(1)))));
assert_that!(notification, some(pat!(Notification::CommandAck(eq(1)))));
let notification = rx.recv().await;
assert_that!(notification, some(pat!(Notification::Ack(eq(3)))));
assert_that!(notification, some(pat!(Notification::CommandAck(eq(3)))));

// Channel should be empty
let try_recv = rx.try_recv();
assert_that!(try_recv, err(eq(TryRecvError::Empty)));
}

fn start_with_protocol(
ism: &mut InvocationStateMachine<u64>,
version: ServiceProtocolVersion,
) -> mpsc::UnboundedReceiver<Notification> {
let abort_handle = tokio::spawn(async {}).abort_handle();
let (tx, rx) = mpsc::unbounded_channel();
ism.start(abort_handle, tx);
ism.notify_pinned_deployment(
PinnedDeployment::new(DeploymentId::default(), version),
true,
);
rx
}

#[test(tokio::test)]
async fn notify_entry_swaps_proposed_run_completion_to_ack_on_v7() {
let mut ism = create_test_invocation_state_machine();
let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7);

// Track a proposal for CompletionId 5 with requested_ack=true
ism.notify_new_notification_proposal(
NotificationType::Completion(CompletionType::Run),
NotificationId::CompletionId(5),
true,
);

// PP echoes back the stored notification — the ISM must swap to the ack
ism.notify_entry(7, NotificationId::CompletionId(5));
assert_that!(
rx.recv().await,
some(pat!(Notification::ProposeRunCompletionAck(eq(5))))
);

// The proposal was consumed: a second notify_entry for the same id falls
// back to the regular Entry path.
ism.notify_entry(7, NotificationId::CompletionId(5));
assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(7)))));
}

#[test(tokio::test)]
async fn notify_entry_does_not_swap_on_v6_old_path_preserved() {
let mut ism = create_test_invocation_state_machine();
let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V6);

// Proposal is recorded in the journal tracker (for retry safety) but NOT
// tracked for swapping, because the deployment is on protocol v6 — even
// if the SDK had set requested_ack, the runtime caps the behaviour at v7.
ism.notify_new_notification_proposal(
NotificationType::Completion(CompletionType::Run),
NotificationId::CompletionId(5),
true,
);

// PP echoes back the stored notification — the ISM forwards the full Entry,
// exactly like before protocol v7 existed.
ism.notify_entry(7, NotificationId::CompletionId(5));
assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(7)))));
}

#[test(tokio::test)]
async fn notify_entry_on_v7_without_requested_ack_falls_through_to_entry() {
let mut ism = create_test_invocation_state_machine();
let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7);

// V7 deployment but SDK did NOT set the requested_ack header flag — the
// proposal is tracked in the journal tracker for retry safety, but no
// swap happens, and the SDK gets the full notification back.
ism.notify_new_notification_proposal(
NotificationType::Completion(CompletionType::Run),
NotificationId::CompletionId(5),
false,
);

ism.notify_entry(7, NotificationId::CompletionId(5));
assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(7)))));
}

#[test(tokio::test)]
async fn notify_entry_on_v7_without_proposal_falls_through_to_entry() {
let mut ism = create_test_invocation_state_machine();
let mut rx = start_with_protocol(&mut ism, ServiceProtocolVersion::V7);

// No prior proposal: a completion notification flows through as Entry.
ism.notify_entry(3, NotificationId::CompletionId(5));
assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(3)))));

// Signals are also never swapped — only completion ids tracked at propose time qualify.
ism.notify_entry(4, NotificationId::SignalIndex(17));
assert_that!(rx.recv().await, some(pat!(Notification::Entry(eq(4)))));
}

#[test(tokio::test)]
async fn handle_error_counts_attempts_on_same_entry() {
let mut invocation_state_machine = create_test_invocation_state_machine();
Expand Down Expand Up @@ -825,8 +973,19 @@ mod tests {
let (tx, _rx) = mpsc::unbounded_channel();

invocation_state_machine.start(abort_handle, tx);
invocation_state_machine.notify_new_notification_proposal(NotificationId::SignalIndex(18));
invocation_state_machine.notify_new_notification_proposal(NotificationId::CompletionId(1));
// Only RunCompletion notifications are valid proposals today; the ISM asserts this.
// requested_ack=false because this test is about journal-tracker accounting for
// retry safety, not about the v7 ack swap.
invocation_state_machine.notify_new_notification_proposal(
NotificationType::Completion(CompletionType::Run),
NotificationId::CompletionId(18),
false,
);
invocation_state_machine.notify_new_notification_proposal(
NotificationType::Completion(CompletionType::Run),
NotificationId::CompletionId(1),
false,
);
let_assert!(
OnTaskError::Retrying(_) =
invocation_state_machine.handle_task_error(true, None, false, true, |_| 0)
Expand All @@ -836,8 +995,8 @@ mod tests {
assert!(!invocation_state_machine.is_ready_to_retry());
assert!(let AttemptState::WaitingRetry { .. } = invocation_state_machine.invocation_state);

// Got signal 18
invocation_state_machine.notify_entry(0, NotificationId::SignalIndex(18));
// Got completion 18
invocation_state_machine.notify_entry(0, NotificationId::CompletionId(18));

// Retry timer fired
invocation_state_machine.notify_retry_timer_fired(0);
Expand Down
8 changes: 7 additions & 1 deletion crates/invoker-impl/src/invocation_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,16 @@ pub(super) enum InvocationTaskOutputInner {
/// When reading the entry from the storage this flag will always be false, as we never need to send acks for entries sent during a journal replay.
///
/// See https://github.com/restatedev/service-protocol/blob/main/service-invocation-protocol.md#acknowledgment-of-stored-entries
requires_ack: bool,
requested_ack: bool,
},
NewNotificationProposal {
notification: RawNotification,
/// If true, the SDK requested to be notified when the proposed notification
/// is durably stored.
///
/// The runtime will send back `ProposeRunCompletionAckMessage` instead of `RunCompletionMessage`.
/// Only protocol >= v7.
requested_ack: bool,
},
AwaitingOn {
unresolved_future: UnresolvedFuture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,11 @@ where
trace!("Sending the completion to the wire");
crate::shortcircuit!(self.write_with_lease(&mut http_stream_tx, completion.into(), Some(lease)));
},
Some(Notification::Ack(entry_index)) => {
Some(Notification::CommandAck(entry_index)) => {
trace!("Sending the ack to the wire");
crate::shortcircuit!(self.write(&mut http_stream_tx, ProtocolMessage::new_entry_ack(entry_index)));
},
Some(Notification::Entry { .. }) => {
Some(Notification::Entry { .. }) | Some(Notification::ProposeRunCompletionAck(_)) => {
panic!("We don't expect to receive journal_v2 entries, this is an invoker bug.")
},
None => {
Expand Down
Loading
Loading