From 0c27542beaf3326401e25c0d97d87c7fc2325449 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 30 Apr 2025 16:22:46 +0200 Subject: [PATCH 1/2] Move PartitionProcessorRpcClient in restate_types, and rename it InvocationClient. The InvocationClient should be completely orthogonal from the transport, and should be easy to mock. We're gonna use this same client in the Admin API soon. --- Cargo.lock | 2 - crates/core/src/worker_api/mod.rs | 2 + .../partition_processor_rpc_client.rs | 226 +++++++++--------- crates/ingress-http/Cargo.toml | 2 - crates/ingress-http/src/handler/invocation.rs | 14 +- crates/ingress-http/src/handler/responses.rs | 13 +- crates/ingress-http/src/handler/tests.rs | 32 ++- crates/ingress-http/src/handler/workflow.rs | 17 +- crates/ingress-http/src/lib.rs | 11 +- .../src/rpc_request_dispatcher.rs | 58 ++--- crates/ingress-http/src/server.rs | 4 +- crates/node/src/roles/ingress.rs | 18 +- crates/types/src/invocation/client.rs | 151 ++++++++++++ .../src/{invocation.rs => invocation/mod.rs} | 2 + crates/types/src/net/partition_processor.rs | 29 +-- .../src/partition/leadership/leader_state.rs | 4 +- crates/worker/src/partition/mod.rs | 11 +- .../src/partition/state_machine/actions.rs | 4 +- .../state_machine/lifecycle/cancel.rs | 4 +- .../worker/src/partition/state_machine/mod.rs | 12 +- .../state_machine/tests/idempotency.rs | 18 +- .../state_machine/tests/kill_cancel.rs | 10 +- .../src/partition/state_machine/tests/mod.rs | 7 +- .../partition/state_machine/tests/workflow.rs | 14 +- 24 files changed, 391 insertions(+), 274 deletions(-) rename crates/{ingress-http/src => core/src/worker_api}/partition_processor_rpc_client.rs (75%) create mode 100644 crates/types/src/invocation/client.rs rename crates/types/src/{invocation.rs => invocation/mod.rs} (99%) diff --git a/Cargo.lock b/Cargo.lock index 54e98de422..2352f2c30d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6998,12 +6998,10 @@ name = "restate-ingress-http" version = "1.3.3-dev" dependencies = [ "anyhow", - "assert2", "bytes", "bytestring", "chrono", "codederror", - "derive_builder", "futures", "googletest", "http 1.2.0", diff --git a/crates/core/src/worker_api/mod.rs b/crates/core/src/worker_api/mod.rs index 419b37b298..796e814062 100644 --- a/crates/core/src/worker_api/mod.rs +++ b/crates/core/src/worker_api/mod.rs @@ -9,5 +9,7 @@ // by the Apache License, Version 2.0. mod partition_processor_manager; +mod partition_processor_rpc_client; pub use partition_processor_manager::*; +pub use partition_processor_rpc_client::*; diff --git a/crates/ingress-http/src/partition_processor_rpc_client.rs b/crates/core/src/worker_api/partition_processor_rpc_client.rs similarity index 75% rename from crates/ingress-http/src/partition_processor_rpc_client.rs rename to crates/core/src/worker_api/partition_processor_rpc_client.rs index 08fe2f3cdf..0d1d09cc8f 100644 --- a/crates/ingress-http/src/partition_processor_rpc_client.rs +++ b/crates/core/src/worker_api/partition_processor_rpc_client.rs @@ -8,34 +8,37 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::ShutdownError; +use crate::network::ConnectError; +use crate::network::{NetworkSender, RpcReplyError, Swimlane}; +use crate::network::{Networking, TransportConnect}; +use crate::partitions::PartitionRouting; use assert2::let_assert; -use tracing::trace; - -use restate_core::ShutdownError; -use restate_core::network::{NetworkSender, RpcReplyError, Swimlane}; -use restate_core::network::{Networking, TransportConnect}; -use restate_core::partitions::PartitionRouting; use restate_types::identifiers::{ InvocationId, PartitionId, PartitionProcessorRpcRequestId, WithPartitionKey, }; +use restate_types::invocation::client::{ + AttachInvocationResponse, GetInvocationOutputResponse, InvocationClient, InvocationClientError, + InvocationOutput, SubmittedInvocationNotification, +}; use restate_types::invocation::{InvocationQuery, InvocationRequest, InvocationResponse}; use restate_types::journal_v2::Signal; use restate_types::live::Live; use restate_types::net::partition_processor::{ - AppendInvocationReplyOn, GetInvocationOutputResponseMode, InvocationOutput, - PartitionProcessorRpcError, PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner, - PartitionProcessorRpcResponse, SubmittedInvocationNotification, + AppendInvocationReplyOn, GetInvocationOutputResponseMode, PartitionProcessorRpcError, + PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse, }; use restate_types::partition_table::{FindPartition, PartitionTable, PartitionTableError}; +use tracing::trace; #[derive(Debug, thiserror::Error)] -pub enum PartitionProcessorRpcClientError { +pub enum PartitionProcessorInvocationClientError { #[error(transparent)] UnknownPartition(#[from] PartitionTableError), #[error("cannot find node for partition {0}")] UnknownNode(PartitionId), #[error(transparent)] - Connect(#[from] restate_core::network::ConnectError), + Connect(#[from] ConnectError), #[error("failed sending request")] SendFailed, #[error(transparent)] @@ -54,18 +57,18 @@ pub enum PartitionProcessorRpcClientError { Stopping, } -impl PartitionProcessorRpcClientError { +impl PartitionProcessorInvocationClientError { /// Returns true when the operation can be retried assuming no state mutation could have occurred in the PartitionProcessor. pub fn is_safe_to_retry(&self) -> bool { match self { - PartitionProcessorRpcClientError::UnknownPartition(_) - | PartitionProcessorRpcClientError::Connect(_) - | PartitionProcessorRpcClientError::UnknownNode(_) - | PartitionProcessorRpcClientError::NotLeader - | PartitionProcessorRpcClientError::Starting - | PartitionProcessorRpcClientError::Busy - | PartitionProcessorRpcClientError::SendFailed - | PartitionProcessorRpcClientError::Stopping => { + PartitionProcessorInvocationClientError::UnknownPartition(_) + | PartitionProcessorInvocationClientError::Connect(_) + | PartitionProcessorInvocationClientError::UnknownNode(_) + | PartitionProcessorInvocationClientError::NotLeader + | PartitionProcessorInvocationClientError::Starting + | PartitionProcessorInvocationClientError::Busy + | PartitionProcessorInvocationClientError::SendFailed + | PartitionProcessorInvocationClientError::Stopping => { // These are pre-flight error that we can distinguish, // and for which we know for certain that no message was proposed yet to the log. true @@ -75,7 +78,14 @@ impl PartitionProcessorRpcClientError { } } -impl From for PartitionProcessorRpcClientError { +impl From for InvocationClientError { + fn from(value: PartitionProcessorInvocationClientError) -> Self { + let is_safe_to_retry = value.is_safe_to_retry(); + Self::new(value, is_safe_to_retry) + } +} + +impl From for PartitionProcessorInvocationClientError { fn from(value: RpcReplyError) -> Self { match value { e @ RpcReplyError::Unknown(_) => Self::Internal(e.to_string()), @@ -91,47 +101,35 @@ impl From for PartitionProcessorRpcClientError { } } -impl From for PartitionProcessorRpcClientError { +impl From for PartitionProcessorInvocationClientError { fn from(value: PartitionProcessorRpcError) -> Self { match value { - PartitionProcessorRpcError::NotLeader(_) => PartitionProcessorRpcClientError::NotLeader, + PartitionProcessorRpcError::NotLeader(_) => { + PartitionProcessorInvocationClientError::NotLeader + } PartitionProcessorRpcError::LostLeadership(partition_id) => { - PartitionProcessorRpcClientError::LostLeadership(partition_id) + PartitionProcessorInvocationClientError::LostLeadership(partition_id) } PartitionProcessorRpcError::Internal(msg) => { - PartitionProcessorRpcClientError::Internal(msg) + PartitionProcessorInvocationClientError::Internal(msg) + } + PartitionProcessorRpcError::Starting => { + PartitionProcessorInvocationClientError::Starting + } + PartitionProcessorRpcError::Stopping => { + PartitionProcessorInvocationClientError::Stopping } - PartitionProcessorRpcError::Starting => PartitionProcessorRpcClientError::Starting, - PartitionProcessorRpcError::Stopping => PartitionProcessorRpcClientError::Stopping, } } } -#[derive(Debug, Clone)] -pub enum AttachInvocationResponse { - NotFound, - /// Returned when the invocation hasn't an idempotency key, nor it's a workflow run. - NotSupported, - Ready(InvocationOutput), -} - -#[derive(Debug, Clone)] -pub enum GetInvocationOutputResponse { - NotFound, - /// The invocation was found, but it's still processing and a result is not ready yet. - NotReady, - /// Returned when the invocation hasn't an idempotency key, nor it's a workflow run. - NotSupported, - Ready(InvocationOutput), -} - -pub struct PartitionProcessorRpcClient { +pub struct PartitionProcessorInvocationClient { networking: Networking, partition_table: Live, partition_routing: PartitionRouting, } -impl Clone for PartitionProcessorRpcClient { +impl Clone for PartitionProcessorInvocationClient { fn clone(&self) -> Self { Self { networking: self.networking.clone(), @@ -141,7 +139,7 @@ impl Clone for PartitionProcessorRpcClient { } } -impl PartitionProcessorRpcClient { +impl PartitionProcessorInvocationClient { pub fn new( networking: Networking, partition_table: Live, @@ -155,16 +153,70 @@ impl PartitionProcessorRpcClient { } } -impl PartitionProcessorRpcClient +impl PartitionProcessorInvocationClient +where + C: TransportConnect, +{ + async fn resolve_partition_id_and_send( + &self, + request_id: PartitionProcessorRpcRequestId, + inner_request: PartitionProcessorRpcRequestInner, + ) -> Result { + let partition_id = self + .partition_table + .pinned() + .find_partition_id(inner_request.partition_key())?; + + let node_id = self + .partition_routing + .get_node_by_partition(partition_id) + .ok_or(PartitionProcessorInvocationClientError::UnknownNode( + partition_id, + ))?; + + // find connection for this node + let connection = self + .networking + .get_connection(node_id, Swimlane::IngressData) + .await?; + let permit = connection + .reserve() + .await + .ok_or(PartitionProcessorInvocationClientError::SendFailed)?; + let rpc_result = permit + .send_rpc( + PartitionProcessorRpcRequest { + request_id, + partition_id, + inner: inner_request, + }, + Some(*partition_id as u64), + ) + .await?; + + if rpc_result.is_err() && rpc_result.as_ref().unwrap_err().likely_stale_route() { + trace!( + %partition_id, + %node_id, + %request_id, + "Received Partition Processor error indicating possible stale route" + ); + } + + Ok(rpc_result?) + } +} + +impl InvocationClient for PartitionProcessorInvocationClient where C: TransportConnect, { /// Append the invocation to the log, waiting for the submit notification emitted by the PartitionProcessor. - pub async fn append_invocation_and_wait_submit_notification( + async fn append_invocation_and_wait_submit_notification( &self, request_id: PartitionProcessorRpcRequestId, invocation_request: InvocationRequest, - ) -> Result { + ) -> Result { let response = self .resolve_partition_id_and_send( request_id, @@ -186,13 +238,12 @@ where Ok(submit_notification) } - /// Append the invocation and wait for its output. - pub async fn append_invocation_and_wait_output( + async fn append_invocation_and_wait_output( &self, request_id: PartitionProcessorRpcRequestId, invocation_request: InvocationRequest, - ) -> Result { + ) -> Result { let response = self .resolve_partition_id_and_send( request_id, @@ -214,12 +265,11 @@ where Ok(invocation_output) } - - pub async fn attach_invocation( + async fn attach_invocation( &self, request_id: PartitionProcessorRpcRequestId, invocation_query: InvocationQuery, - ) -> Result { + ) -> Result { let response = self .resolve_partition_id_and_send( request_id, @@ -243,12 +293,11 @@ where } }) } - - pub async fn get_invocation_output( + async fn get_invocation_output( &self, request_id: PartitionProcessorRpcRequestId, invocation_query: InvocationQuery, - ) -> Result { + ) -> Result { let response = self .resolve_partition_id_and_send( request_id, @@ -275,12 +324,11 @@ where } }) } - - pub async fn append_invocation_response( + async fn append_invocation_response( &self, request_id: PartitionProcessorRpcRequestId, invocation_response: InvocationResponse, - ) -> Result<(), PartitionProcessorRpcClientError> { + ) -> Result<(), InvocationClientError> { let response = self .resolve_partition_id_and_send( request_id, @@ -295,13 +343,12 @@ where Ok(()) } - - pub async fn append_signal( + async fn append_signal( &self, request_id: PartitionProcessorRpcRequestId, invocation_id: InvocationId, signal: Signal, - ) -> Result<(), PartitionProcessorRpcClientError> { + ) -> Result<(), InvocationClientError> { let response = self .resolve_partition_id_and_send( request_id, @@ -316,51 +363,4 @@ where Ok(()) } - - async fn resolve_partition_id_and_send( - &self, - request_id: PartitionProcessorRpcRequestId, - inner_request: PartitionProcessorRpcRequestInner, - ) -> Result { - let partition_id = self - .partition_table - .pinned() - .find_partition_id(inner_request.partition_key())?; - - let node_id = self - .partition_routing - .get_node_by_partition(partition_id) - .ok_or(PartitionProcessorRpcClientError::UnknownNode(partition_id))?; - - // find connection for this node - let connection = self - .networking - .get_connection(node_id, Swimlane::IngressData) - .await?; - let permit = connection - .reserve() - .await - .ok_or(PartitionProcessorRpcClientError::SendFailed)?; - let rpc_result = permit - .send_rpc( - PartitionProcessorRpcRequest { - request_id, - partition_id, - inner: inner_request, - }, - Some(*partition_id as u64), - ) - .await?; - - if rpc_result.is_err() && rpc_result.as_ref().unwrap_err().likely_stale_route() { - trace!( - %partition_id, - %node_id, - %request_id, - "Received Partition Processor error indicating possible stale route" - ); - } - - Ok(rpc_result?) - } } diff --git a/crates/ingress-http/Cargo.toml b/crates/ingress-http/Cargo.toml index 771a083b29..24a14c42e9 100644 --- a/crates/ingress-http/Cargo.toml +++ b/crates/ingress-http/Cargo.toml @@ -21,12 +21,10 @@ restate-tracing-instrumentation = { workspace = true } restate-types = { workspace = true } anyhow = { workspace = true } -assert2 = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } chrono = { workspace = true } codederror = { workspace = true } -derive_builder = { workspace = true } futures = { workspace = true } http = { workspace = true } http-body = { workspace = true } diff --git a/crates/ingress-http/src/handler/invocation.rs b/crates/ingress-http/src/handler/invocation.rs index 1aeb100836..a15fc74406 100644 --- a/crates/ingress-http/src/handler/invocation.rs +++ b/crates/ingress-http/src/handler/invocation.rs @@ -8,21 +8,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use bytes::Bytes; -use http::{Method, Request, Response}; -use http_body_util::Full; -use tracing::warn; - use super::Handler; use super::HandlerError; use super::path_parsing::{InvocationRequestType, InvocationTargetType, TargetType}; + use crate::RequestDispatcher; -use crate::partition_processor_rpc_client::{ - AttachInvocationResponse, GetInvocationOutputResponse, -}; +use bytes::Bytes; +use http::{Method, Request, Response}; +use http_body_util::Full; use restate_types::identifiers::IdempotencyId; use restate_types::invocation::InvocationQuery; +use restate_types::invocation::client::{AttachInvocationResponse, GetInvocationOutputResponse}; use restate_types::schema::invocation_target::InvocationTargetResolver; +use tracing::warn; impl Handler where diff --git a/crates/ingress-http/src/handler/responses.rs b/crates/ingress-http/src/handler/responses.rs index d37edbe6c7..5b6f95aedf 100644 --- a/crates/ingress-http/src/handler/responses.rs +++ b/crates/ingress-http/src/handler/responses.rs @@ -8,17 +8,16 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::handler::Handler; +use crate::handler::error::HandlerError; use bytes::Bytes; use chrono::DateTime; use http::{HeaderName, Response, header}; use http_body_util::Full; -use tracing::{info, trace}; - -use crate::handler::Handler; -use crate::handler::error::HandlerError; use restate_types::invocation::InvocationTarget; -use restate_types::net::partition_processor::{IngressResponseResult, InvocationOutput}; +use restate_types::invocation::client::{InvocationOutput, InvocationOutputResponse}; use restate_types::schema::invocation_target::InvocationTargetMetadata; +use tracing::{info, trace}; pub(crate) const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires"); /// Contains the string representation of the invocation id @@ -58,7 +57,7 @@ impl Handler { } match response { - IngressResponseResult::Success(invocation_target, response_payload) => { + InvocationOutputResponse::Success(invocation_target, response_payload) => { trace!(rpc.response = ?response_payload, "Complete external HTTP request successfully"); // Resolve invocation target metadata. @@ -77,7 +76,7 @@ impl Handler { Ok(response_builder.body(Full::new(response_payload)).unwrap()) } - IngressResponseResult::Failure(error) => { + InvocationOutputResponse::Failure(error) => { info!(rpc.response = ?error, "Complete external HTTP request with a failure"); Ok(HandlerError::Invocation(error).fill_builder(response_builder)) } diff --git a/crates/ingress-http/src/handler/tests.rs b/crates/ingress-http/src/handler/tests.rs index 24dbef5d69..cdbd7cac1b 100644 --- a/crates/ingress-http/src/handler/tests.rs +++ b/crates/ingress-http/src/handler/tests.rs @@ -24,14 +24,15 @@ use tracing_test::traced_test; use restate_core::TestCoreEnv; use restate_test_util::{assert, assert_eq}; use restate_types::identifiers::{IdempotencyId, InvocationId, ServiceId, WithInvocationId}; +use restate_types::invocation::client::{ + AttachInvocationResponse, GetInvocationOutputResponse, InvocationOutput, + InvocationOutputResponse, SubmittedInvocationNotification, +}; use restate_types::invocation::{ InvocationQuery, InvocationTarget, InvocationTargetType, VirtualObjectHandlerType, WorkflowHandlerType, }; use restate_types::live::Live; -use restate_types::net::partition_processor::{ - IngressResponseResult, InvocationOutput, SubmittedInvocationNotification, -}; use restate_types::schema::invocation_target::{ InputContentType, InputRules, InputValidationRule, InvocationTargetMetadata, OutputContentTypeRule, OutputRules, @@ -44,9 +45,6 @@ use super::mocks::*; use super::service_handler::*; use crate::MockRequestDispatcher; use crate::handler::responses::X_RESTATE_ID; -use crate::partition_processor_rpc_client::{ - AttachInvocationResponse, GetInvocationOutputResponse, -}; #[restate_core::test] #[traced_test] @@ -84,7 +82,7 @@ async fn call_service() { request_id: Default::default(), invocation_id: Some(invocation_request.invocation_id()), completion_expiry_time: None, - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( InvocationTarget::service("greeter.Greeter", "greet"), serde_json::to_vec(&GreetingResponse { greeting: "Igal".to_string(), @@ -130,7 +128,7 @@ async fn call_service_with_get() { request_id: Default::default(), invocation_id: Some(InvocationId::mock_random()), completion_expiry_time: None, - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( invocation_request.header.target, serde_json::to_vec(&GreetingResponse { greeting: "Igal".to_string(), @@ -200,7 +198,7 @@ async fn call_virtual_object() { request_id: Default::default(), invocation_id: Some(InvocationId::mock_random()), completion_expiry_time: None, - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( invocation_request.header.target, serde_json::to_vec(&GreetingResponse { greeting: "Igal".to_string(), @@ -405,7 +403,7 @@ async fn idempotency_key_parsing() { request_id: Default::default(), invocation_id: Some(InvocationId::mock_random()), completion_expiry_time: None, - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( invocation_request.header.target, serde_json::to_vec(&GreetingResponse { greeting: "Igal".to_string(), @@ -577,7 +575,7 @@ async fn attach_with_invocation_id() { request_id: Default::default(), invocation_id: Some(invocation_id), completion_expiry_time: None, - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( InvocationTarget::service("greeter.Greeter", "greet"), serde_json::to_vec(&GreetingResponse { greeting: "Igal".to_string(), @@ -633,7 +631,7 @@ async fn attach_with_idempotency_id_to_unkeyed_service() { request_id: Default::default(), invocation_id: Some(invocation_id), completion_expiry_time: None, - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( InvocationTarget::service("greeter.Greeter", "greet"), serde_json::to_vec(&GreetingResponse { greeting: "Igal".to_string(), @@ -691,7 +689,7 @@ async fn attach_with_idempotency_id_to_keyed_service() { request_id: Default::default(), invocation_id: Some(invocation_id), completion_expiry_time: None, - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( InvocationTarget::virtual_object( "greeter.Greeter", "mygreet", @@ -750,7 +748,7 @@ async fn get_output_with_invocation_id() { request_id: Default::default(), invocation_id: Some(invocation_id), completion_expiry_time: None, - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( InvocationTarget::service("greeter.Greeter", "greet"), serde_json::to_vec(&GreetingResponse { greeting: "Igal".to_string(), @@ -807,7 +805,7 @@ async fn get_output_with_workflow_key() { request_id: Default::default(), invocation_id: None, completion_expiry_time: None, - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( InvocationTarget::workflow( service_id.service_name, service_id.key, @@ -1098,7 +1096,7 @@ fn expect_invocation_and_reply_with_empty() -> MockRequestDispatcher { request_id: Default::default(), completion_expiry_time: None, invocation_id: Some(invocation_request.invocation_id()), - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( invocation_request.header.target, Bytes::new(), ), @@ -1118,7 +1116,7 @@ fn expect_invocation_and_reply_with_non_empty() -> MockRequestDispatcher { request_id: Default::default(), invocation_id: Some(invocation_request.invocation_id()), completion_expiry_time: None, - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( invocation_request.header.target, Bytes::from_static(b"123"), ), diff --git a/crates/ingress-http/src/handler/workflow.rs b/crates/ingress-http/src/handler/workflow.rs index 922deef83e..05af1bccba 100644 --- a/crates/ingress-http/src/handler/workflow.rs +++ b/crates/ingress-http/src/handler/workflow.rs @@ -8,22 +8,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use super::Handler; +use super::HandlerError; +use super::path_parsing::WorkflowRequestType; + +use crate::RequestDispatcher; use bytes::Bytes; use http::{Method, Request, Response}; use http_body_util::Full; -use tracing::{info, warn}; - use restate_types::identifiers::ServiceId; use restate_types::invocation::InvocationQuery; +use restate_types::invocation::client::{AttachInvocationResponse, GetInvocationOutputResponse}; use restate_types::schema::invocation_target::InvocationTargetResolver; - -use super::Handler; -use super::HandlerError; -use super::path_parsing::WorkflowRequestType; -use crate::RequestDispatcher; -use crate::partition_processor_rpc_client::{ - AttachInvocationResponse, GetInvocationOutputResponse, -}; +use tracing::{info, warn}; impl Handler where diff --git a/crates/ingress-http/src/lib.rs b/crates/ingress-http/src/lib.rs index f92cf98228..9cf517173b 100644 --- a/crates/ingress-http/src/lib.rs +++ b/crates/ingress-http/src/lib.rs @@ -11,21 +11,23 @@ mod handler; mod layers; mod metric_definitions; -pub mod partition_processor_rpc_client; -pub mod rpc_request_dispatcher; +mod rpc_request_dispatcher; mod server; +pub use rpc_request_dispatcher::InvocationClientRequestDispatcher; pub use server::{HyperServerIngress, IngressServerError, StartSignal}; use bytes::Bytes; use std::future::Future; use std::net::{IpAddr, SocketAddr}; -use partition_processor_rpc_client::{AttachInvocationResponse, GetInvocationOutputResponse}; use restate_types::identifiers::InvocationId; +use restate_types::invocation::client::{ + AttachInvocationResponse, GetInvocationOutputResponse, InvocationOutput, + SubmittedInvocationNotification, +}; use restate_types::invocation::{InvocationQuery, InvocationRequest, InvocationResponse}; use restate_types::journal_v2::Signal; -use restate_types::net::partition_processor::{InvocationOutput, SubmittedInvocationNotification}; /// Client connection information for a given RPC request #[derive(Clone, Copy, Debug)] @@ -101,7 +103,6 @@ mod mocks { use restate_types::invocation::{ InvocationQuery, InvocationTargetType, ServiceType, VirtualObjectHandlerType, }; - use restate_types::net::partition_processor::InvocationOutput; use restate_types::schema::invocation_target::test_util::MockInvocationTargetResolver; use restate_types::schema::invocation_target::{ DEFAULT_IDEMPOTENCY_RETENTION, InvocationTargetMetadata, InvocationTargetResolver, diff --git a/crates/ingress-http/src/rpc_request_dispatcher.rs b/crates/ingress-http/src/rpc_request_dispatcher.rs index 4127006122..3596d68d1a 100644 --- a/crates/ingress-http/src/rpc_request_dispatcher.rs +++ b/crates/ingress-http/src/rpc_request_dispatcher.rs @@ -8,44 +8,38 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::anyhow; -use std::future::Future; -use std::time::Duration; -use tracing::{Instrument, debug_span, trace}; +use super::{RequestDispatcher, RequestDispatcherError}; -use restate_core::network::TransportConnect; use restate_types::identifiers::{InvocationId, PartitionProcessorRpcRequestId, WithInvocationId}; +use restate_types::invocation::client::{ + AttachInvocationResponse, GetInvocationOutputResponse, InvocationClient, InvocationClientError, + InvocationOutput, SubmittedInvocationNotification, +}; use restate_types::invocation::{InvocationQuery, InvocationRequest, InvocationResponse}; use restate_types::journal_v2::Signal; -use restate_types::net::partition_processor::{InvocationOutput, SubmittedInvocationNotification}; use restate_types::retries::RetryPolicy; +use std::future::Future; +use std::time::Duration; +use tracing::{Instrument, debug_span, trace}; -use crate::partition_processor_rpc_client::{ - AttachInvocationResponse, GetInvocationOutputResponse, -}; -use crate::partition_processor_rpc_client::{ - PartitionProcessorRpcClient, PartitionProcessorRpcClientError, -}; -use crate::{RequestDispatcher, RequestDispatcherError}; - -pub struct RpcRequestDispatcher { - partition_processor_rpc_client: PartitionProcessorRpcClient, +pub struct InvocationClientRequestDispatcher { + invocation_client: IC, retry_policy: RetryPolicy, } -impl Clone for RpcRequestDispatcher { +impl Clone for InvocationClientRequestDispatcher { fn clone(&self) -> Self { - RpcRequestDispatcher { - partition_processor_rpc_client: self.partition_processor_rpc_client.clone(), + InvocationClientRequestDispatcher { + invocation_client: self.invocation_client.clone(), retry_policy: self.retry_policy.clone(), } } } -impl RpcRequestDispatcher { - pub fn new(partition_processor_rpc_client: PartitionProcessorRpcClient) -> Self { +impl InvocationClientRequestDispatcher { + pub fn new(invocation_client: IC) -> Self { Self { - partition_processor_rpc_client, + invocation_client, // TODO figure out how to tune this? retry_policy: RetryPolicy::fixed_delay(Duration::from_millis(50), None), } @@ -58,7 +52,7 @@ impl RpcRequestDispatcher { ) -> Result where Fn: FnMut() -> Fut, - Fut: Future>, + Fut: Future>, { Ok(self .retry_policy @@ -75,13 +69,13 @@ impl RpcRequestDispatcher { retry }) .await - .map_err(|e| anyhow!(e))?) + .map_err(|e| e.into_inner())?) } } -impl RequestDispatcher for RpcRequestDispatcher +impl RequestDispatcher for InvocationClientRequestDispatcher where - C: TransportConnect, + IC: InvocationClient + Clone + Send + Sync + 'static, { async fn send( &self, @@ -90,7 +84,7 @@ where let request_id = PartitionProcessorRpcRequestId::default(); let is_idempotent = invocation_request.is_idempotent(); self.execute_rpc(is_idempotent, || { - self.partition_processor_rpc_client + self.invocation_client .append_invocation_and_wait_submit_notification( request_id, invocation_request.clone(), @@ -107,7 +101,7 @@ where let request_id = PartitionProcessorRpcRequestId::default(); let is_idempotent = invocation_request.is_idempotent(); self.execute_rpc(is_idempotent, || { - self.partition_processor_rpc_client + self.invocation_client .append_invocation_and_wait_output(request_id, invocation_request.clone()) }) .instrument(debug_span!("call invocation", %request_id, invocation_id = %invocation_request.invocation_id())) @@ -120,7 +114,7 @@ where ) -> Result { let request_id = PartitionProcessorRpcRequestId::default(); self.execute_rpc(true, || { - self.partition_processor_rpc_client + self.invocation_client .attach_invocation(request_id, invocation_query.clone()) }) .instrument(debug_span!("attach to invocation", %request_id, invocation_id = %invocation_query.to_invocation_id())) @@ -133,7 +127,7 @@ where ) -> Result { let request_id = PartitionProcessorRpcRequestId::default(); self.execute_rpc(true, || { - self.partition_processor_rpc_client + self.invocation_client .get_invocation_output(request_id, invocation_query.clone()) }) .instrument(debug_span!("get invocation output", %request_id, invocation_id = %invocation_query.to_invocation_id())) @@ -146,7 +140,7 @@ where ) -> Result<(), RequestDispatcherError> { let request_id = PartitionProcessorRpcRequestId::default(); self.execute_rpc(true, || { - self.partition_processor_rpc_client + self.invocation_client .append_invocation_response(request_id, invocation_response.clone()) }) .instrument(debug_span!("send invocation response", %request_id, invocation_id = %invocation_response.target.caller_id)) @@ -160,7 +154,7 @@ where ) -> Result<(), RequestDispatcherError> { let request_id = PartitionProcessorRpcRequestId::default(); self.execute_rpc(true, || { - self.partition_processor_rpc_client + self.invocation_client .append_signal(request_id, target_invocation, signal.clone()) }) .instrument(debug_span!("send invocation response", %request_id, invocation_id = %target_invocation)) diff --git a/crates/ingress-http/src/server.rs b/crates/ingress-http/src/server.rs index 6495fdd60f..e1e38526dd 100644 --- a/crates/ingress-http/src/server.rs +++ b/crates/ingress-http/src/server.rs @@ -253,7 +253,7 @@ mod tests { use restate_types::health::Health; use restate_types::identifiers::WithInvocationId; use restate_types::invocation::InvocationTarget; - use restate_types::net::partition_processor::IngressResponseResult; + use restate_types::invocation::client::InvocationOutputResponse; use serde::{Deserialize, Serialize}; use std::future::ready; use std::net::SocketAddr; @@ -293,7 +293,7 @@ mod tests { request_id: Default::default(), invocation_id: Some(invocation_request.invocation_id()), completion_expiry_time: None, - response: IngressResponseResult::Success( + response: InvocationOutputResponse::Success( InvocationTarget::service("greeter.Greeter", "greet"), serde_json::to_vec(&GreetingResponse { greeting: "Igal".to_string(), diff --git a/crates/node/src/roles/ingress.rs b/crates/node/src/roles/ingress.rs index f8eb52ecf9..b581fd08fb 100644 --- a/crates/node/src/roles/ingress.rs +++ b/crates/node/src/roles/ingress.rs @@ -10,9 +10,8 @@ use restate_core::network::{Networking, TransportConnect}; use restate_core::partitions::PartitionRouting; -use restate_ingress_http::HyperServerIngress; -use restate_ingress_http::partition_processor_rpc_client::PartitionProcessorRpcClient; -use restate_ingress_http::rpc_request_dispatcher::RpcRequestDispatcher; +use restate_core::worker_api::PartitionProcessorInvocationClient; +use restate_ingress_http::{HyperServerIngress, InvocationClientRequestDispatcher}; use restate_types::config::IngressOptions; use restate_types::health::HealthStatus; use restate_types::live::{BoxLiveLoad, Live}; @@ -20,7 +19,10 @@ use restate_types::partition_table::PartitionTable; use restate_types::protobuf::common::IngressStatus; use restate_types::schema::Schema; -type IngressHttp = HyperServerIngress>; +type IngressHttp = HyperServerIngress< + Schema, + InvocationClientRequestDispatcher>, +>; pub struct IngressRole { ingress_http: IngressHttp, @@ -35,11 +37,9 @@ impl IngressRole { partition_table: Live, partition_routing: PartitionRouting, ) -> Self { - let dispatcher = RpcRequestDispatcher::new(PartitionProcessorRpcClient::new( - networking, - partition_table, - partition_routing, - )); + let dispatcher = InvocationClientRequestDispatcher::new( + PartitionProcessorInvocationClient::new(networking, partition_table, partition_routing), + ); let ingress_http = HyperServerIngress::from_options( ingress_options.live_load(), dispatcher, diff --git a/crates/types/src/invocation/client.rs b/crates/types/src/invocation/client.rs new file mode 100644 index 0000000000..5fdebcd313 --- /dev/null +++ b/crates/types/src/invocation/client.rs @@ -0,0 +1,151 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::errors::InvocationError; +use crate::identifiers::{InvocationId, PartitionProcessorRpcRequestId}; +use crate::invocation::{InvocationQuery, InvocationRequest, InvocationResponse, InvocationTarget}; +use crate::journal_v2::Signal; +use crate::time::MillisSinceEpoch; +use bytes::Bytes; +use std::error::Error; +use std::fmt; + +pub struct InvocationClientError { + is_safe_to_retry: bool, + inner: anyhow::Error, +} + +impl InvocationClientError { + pub fn new(inner: impl Into, is_safe_to_retry: bool) -> Self { + Self { + is_safe_to_retry, + inner: inner.into(), + } + } + + pub fn is_safe_to_retry(&self) -> bool { + self.is_safe_to_retry + } + + pub fn into_inner(self) -> anyhow::Error { + self.inner + } +} + +impl fmt::Debug for InvocationClientError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.inner, f) + } +} + +impl fmt::Display for InvocationClientError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.inner, f) + } +} + +impl Error for InvocationClientError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + self.inner.source() + } + + #[allow(deprecated)] + fn description(&self) -> &str { + self.inner.description() + } + + #[allow(deprecated)] + fn cause(&self) -> Option<&dyn Error> { + self.inner.cause() + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct SubmittedInvocationNotification { + pub request_id: PartitionProcessorRpcRequestId, + pub execution_time: Option, + /// If true, this request_id created a "fresh invocation", + /// otherwise the invocation was previously submitted. + pub is_new_invocation: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct InvocationOutput { + pub request_id: PartitionProcessorRpcRequestId, + pub invocation_id: Option, + pub completion_expiry_time: Option, + pub response: InvocationOutputResponse, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum InvocationOutputResponse { + Success(InvocationTarget, Bytes), + Failure(InvocationError), +} + +#[derive(Debug, Clone)] +pub enum AttachInvocationResponse { + NotFound, + /// Returned when the invocation hasn't an idempotency key, nor it's a workflow run. + NotSupported, + Ready(InvocationOutput), +} + +#[derive(Debug, Clone)] +pub enum GetInvocationOutputResponse { + NotFound, + /// The invocation was found, but it's still processing and a result is not ready yet. + NotReady, + /// Returned when the invocation hasn't an idempotency key, nor it's a workflow run. + NotSupported, + Ready(InvocationOutput), +} + +/// This trait provides the functionalities to interact with Restate invocations. +pub trait InvocationClient { + /// Append the invocation to the log, waiting for the submit notification emitted by the PartitionProcessor. + fn append_invocation_and_wait_submit_notification( + &self, + request_id: PartitionProcessorRpcRequestId, + invocation_request: InvocationRequest, + ) -> impl Future> + Send; + + /// Append the invocation and wait for its output. + fn append_invocation_and_wait_output( + &self, + request_id: PartitionProcessorRpcRequestId, + invocation_request: InvocationRequest, + ) -> impl Future> + Send; + + fn attach_invocation( + &self, + request_id: PartitionProcessorRpcRequestId, + invocation_query: InvocationQuery, + ) -> impl Future> + Send; + + fn get_invocation_output( + &self, + request_id: PartitionProcessorRpcRequestId, + invocation_query: InvocationQuery, + ) -> impl Future> + Send; + + fn append_invocation_response( + &self, + request_id: PartitionProcessorRpcRequestId, + invocation_response: InvocationResponse, + ) -> impl Future> + Send; + + fn append_signal( + &self, + request_id: PartitionProcessorRpcRequestId, + invocation_id: InvocationId, + signal: Signal, + ) -> impl Future> + Send; +} diff --git a/crates/types/src/invocation.rs b/crates/types/src/invocation/mod.rs similarity index 99% rename from crates/types/src/invocation.rs rename to crates/types/src/invocation/mod.rs index fa836352a9..c4f5492903 100644 --- a/crates/types/src/invocation.rs +++ b/crates/types/src/invocation/mod.rs @@ -10,6 +10,8 @@ //! This module contains all the core types representing a service invocation. +pub mod client; + use crate::GenerationalNodeId; use crate::errors::InvocationError; use crate::identifiers::{ diff --git a/crates/types/src/net/partition_processor.rs b/crates/types/src/net/partition_processor.rs index 489cfdf0a4..ea0f3b7f63 100644 --- a/crates/types/src/net/partition_processor.rs +++ b/crates/types/src/net/partition_processor.rs @@ -8,16 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::errors::InvocationError; use crate::identifiers::{ InvocationId, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, }; -use crate::invocation::{InvocationQuery, InvocationRequest, InvocationResponse, InvocationTarget}; +use crate::invocation::client::{InvocationOutput, SubmittedInvocationNotification}; +use crate::invocation::{InvocationQuery, InvocationRequest, InvocationResponse}; use crate::journal_v2::Signal; use crate::net::ServiceTag; use crate::net::{default_wire_codec, define_rpc, define_service}; -use crate::time::MillisSinceEpoch; -use bytes::Bytes; use serde::{Deserialize, Serialize}; pub struct PartitionLeaderService; @@ -115,26 +113,3 @@ pub enum PartitionProcessorRpcResponse { Submitted(SubmittedInvocationNotification), Output(InvocationOutput), } - -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct SubmittedInvocationNotification { - pub request_id: PartitionProcessorRpcRequestId, - pub execution_time: Option, - /// If true, this request_id created a "fresh invocation", - /// otherwise the invocation was previously submitted. - pub is_new_invocation: bool, -} - -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct InvocationOutput { - pub request_id: PartitionProcessorRpcRequestId, - pub invocation_id: Option, - pub completion_expiry_time: Option, - pub response: IngressResponseResult, -} - -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub enum IngressResponseResult { - Success(InvocationTarget, Bytes), - Failure(InvocationError), -} diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index 833474feab..e99cddc467 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -27,9 +27,9 @@ use restate_types::identifiers::{ InvocationId, LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, }; +use restate_types::invocation::client::{InvocationOutput, SubmittedInvocationNotification}; use restate_types::net::partition_processor::{ - InvocationOutput, PartitionProcessorRpcError, PartitionProcessorRpcResponse, - SubmittedInvocationNotification, + PartitionProcessorRpcError, PartitionProcessorRpcResponse, }; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::Command; diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 81b1124138..919e8f2af7 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -47,6 +47,7 @@ use restate_types::identifiers::{ LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, }; use restate_types::invocation; +use restate_types::invocation::client::{InvocationOutput, InvocationOutputResponse}; use restate_types::invocation::{ AttachInvocationRequest, InvocationQuery, InvocationTarget, InvocationTargetType, NotifySignalRequest, ResponseResult, ServiceInvocation, ServiceInvocationResponseSink, @@ -56,9 +57,9 @@ use restate_types::logs::MatchKeyQuery; use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber}; use restate_types::net::RpcRequest; use restate_types::net::partition_processor::{ - AppendInvocationReplyOn, GetInvocationOutputResponseMode, IngressResponseResult, - InvocationOutput, PartitionLeaderService, PartitionProcessorRpcError, - PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse, + AppendInvocationReplyOn, GetInvocationOutputResponseMode, PartitionLeaderService, + PartitionProcessorRpcError, PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner, + PartitionProcessorRpcResponse, }; use restate_types::storage::StorageDecodeError; use restate_types::time::MillisSinceEpoch; @@ -749,9 +750,9 @@ where request_id, response: match completed.response_result.clone() { ResponseResult::Success(res) => { - IngressResponseResult::Success(completed.invocation_target, res) + InvocationOutputResponse::Success(completed.invocation_target, res) } - ResponseResult::Failure(err) => IngressResponseResult::Failure(err), + ResponseResult::Failure(err) => InvocationOutputResponse::Failure(err), }, invocation_id: Some(invocation_id), completion_expiry_time, diff --git a/crates/worker/src/partition/state_machine/actions.rs b/crates/worker/src/partition/state_machine/actions.rs index bd2280591c..454f6b52b3 100644 --- a/crates/worker/src/partition/state_machine/actions.rs +++ b/crates/worker/src/partition/state_machine/actions.rs @@ -12,12 +12,12 @@ use restate_invoker_api::InvokeInputJournal; use restate_storage_api::outbox_table::OutboxMessage; use restate_storage_api::timer_table::TimerKey; use restate_types::identifiers::{InvocationId, PartitionProcessorRpcRequestId}; +use restate_types::invocation::client::InvocationOutputResponse; use restate_types::invocation::{InvocationEpoch, InvocationTarget}; use restate_types::journal::Completion; use restate_types::journal_v2::CommandIndex; use restate_types::journal_v2::raw::RawNotification; use restate_types::message::MessageIndex; -use restate_types::net::partition_processor::IngressResponseResult; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::timer::TimerKeyValue; use std::time::Duration; @@ -64,7 +64,7 @@ pub enum Action { request_id: PartitionProcessorRpcRequestId, invocation_id: Option, completion_expiry_time: Option, - response: IngressResponseResult, + response: InvocationOutputResponse, }, IngressSubmitNotification { request_id: PartitionProcessorRpcRequestId, diff --git a/crates/worker/src/partition/state_machine/lifecycle/cancel.rs b/crates/worker/src/partition/state_machine/lifecycle/cancel.rs index f013127eba..cf8b4263fc 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/cancel.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/cancel.rs @@ -107,12 +107,12 @@ mod tests { use restate_types::deployment::PinnedDeployment; use restate_types::errors::CANCELED_INVOCATION_ERROR; use restate_types::identifiers::{DeploymentId, InvocationId, PartitionProcessorRpcRequestId}; + use restate_types::invocation::client::InvocationOutputResponse; use restate_types::invocation::{ InvocationTarget, InvocationTermination, JournalCompletionTarget, NotifySignalRequest, ResponseResult, ServiceInvocation, ServiceInvocationResponseSink, }; use restate_types::journal_v2::CANCEL_SIGNAL; - use restate_types::net::partition_processor::IngressResponseResult; use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::Command; @@ -240,7 +240,7 @@ mod tests { contains(pat!(Action::IngressResponse { request_id: eq(rpc_id), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Failure(CANCELED_INVOCATION_ERROR)) + response: eq(InvocationOutputResponse::Failure(CANCELED_INVOCATION_ERROR)) })) ); diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 624cbc4215..4ed3940b68 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -59,6 +59,7 @@ use restate_types::identifiers::{ PartitionProcessorRpcRequestId, ServiceId, }; use restate_types::identifiers::{IdempotencyId, WithPartitionKey}; +use restate_types::invocation::client::InvocationOutputResponse; use restate_types::invocation::{ AttachInvocationRequest, InvocationEpoch, InvocationQuery, InvocationResponse, InvocationTarget, InvocationTargetType, InvocationTermination, JournalCompletionTarget, @@ -83,7 +84,6 @@ use restate_types::journal_v2::{ CommandType, CompletionId, EntryMetadata, NotificationId, Signal, SignalResult, }; use restate_types::message::MessageIndex; -use restate_types::net::partition_processor::IngressResponseResult; use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::state_mut::ExternalStateMutation; use restate_types::state_mut::StateMutationVersion; @@ -2040,7 +2040,7 @@ impl StateMachineApplyContext<'_, S> { invocation_id, completion_expiry_time, match result.clone() { - ResponseResult::Success(res) => IngressResponseResult::Success( + ResponseResult::Success(res) => InvocationOutputResponse::Success( invocation_target .expect( "For success responses, there must be an invocation target!", @@ -2048,7 +2048,7 @@ impl StateMachineApplyContext<'_, S> { .clone(), res, ), - ResponseResult::Failure(err) => IngressResponseResult::Failure(err), + ResponseResult::Failure(err) => InvocationOutputResponse::Failure(err), }, ), } @@ -3297,17 +3297,17 @@ impl StateMachineApplyContext<'_, S> { request_id: PartitionProcessorRpcRequestId, invocation_id: Option, completion_expiry_time: Option, - response: IngressResponseResult, + response: InvocationOutputResponse, ) { match &response { - IngressResponseResult::Success(_, _) => { + InvocationOutputResponse::Success(_, _) => { debug_if_leader!( self.is_leader, "Send response to ingress with request id '{:?}': Success", request_id ) } - IngressResponseResult::Failure(e) => { + InvocationOutputResponse::Failure(e) => { debug_if_leader!( self.is_leader, "Send response to ingress with request id '{:?}': Failure({})", diff --git a/crates/worker/src/partition/state_machine/tests/idempotency.rs b/crates/worker/src/partition/state_machine/tests/idempotency.rs index 9bd39b5159..3cbcd74d82 100644 --- a/crates/worker/src/partition/state_machine/tests/idempotency.rs +++ b/crates/worker/src/partition/state_machine/tests/idempotency.rs @@ -93,7 +93,7 @@ async fn start_and_complete_idempotent_invocation() { contains(pat!(Action::IngressResponse { request_id: eq(request_id), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -184,7 +184,7 @@ async fn start_and_complete_idempotent_invocation_neo_table() { contains(pat!(Action::IngressResponse { request_id: eq(request_id), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -259,7 +259,7 @@ async fn complete_already_completed_invocation() { contains(pat!(Action::IngressResponse { request_id: eq(request_id), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -344,7 +344,7 @@ async fn attach_with_service_invocation_command_while_executing() { contains(pat!(Action::IngressResponse { request_id: eq(request_id_1), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -352,7 +352,7 @@ async fn attach_with_service_invocation_command_while_executing() { contains(pat!(Action::IngressResponse { request_id: eq(request_id_1), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -458,7 +458,7 @@ async fn attach_with_send_service_invocation(#[case] use_same_request_id: bool) contains(pat!(Action::IngressResponse { request_id: eq(request_id_1), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -471,7 +471,7 @@ async fn attach_with_send_service_invocation(#[case] use_same_request_id: bool) contains(pat!(Action::IngressResponse { request_id: eq(request_id_1), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -658,7 +658,7 @@ async fn attach_command() { contains(pat!(Action::IngressResponse { invocation_id: some(eq(invocation_id)), request_id: eq(request_id_1), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -666,7 +666,7 @@ async fn attach_command() { contains(pat!(Action::IngressResponse { invocation_id: some(eq(invocation_id)), request_id: eq(request_id_2), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) diff --git a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs index d43f041068..1c44f9d25b 100644 --- a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs +++ b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs @@ -146,10 +146,12 @@ async fn terminate_scheduled_invocation( contains(pat!(Action::IngressResponse { request_id: eq(rpc_id), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Failure(match termination_flavor { - TerminationFlavor::Kill => KILLED_INVOCATION_ERROR, - TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR, - })) + response: eq(InvocationOutputResponse::Failure( + match termination_flavor { + TerminationFlavor::Kill => KILLED_INVOCATION_ERROR, + TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR, + } + )) })) ); diff --git a/crates/worker/src/partition/state_machine/tests/mod.rs b/crates/worker/src/partition/state_machine/tests/mod.rs index fcdd48b189..8e015b9dff 100644 --- a/crates/worker/src/partition/state_machine/tests/mod.rs +++ b/crates/worker/src/partition/state_machine/tests/mod.rs @@ -53,6 +53,7 @@ use restate_types::identifiers::{ AwakeableIdentifier, InvocationId, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, ServiceId, }; +use restate_types::invocation::client::InvocationOutputResponse; use restate_types::invocation::{ Header, InvocationResponse, InvocationTarget, InvocationTermination, ResponseResult, ServiceInvocation, ServiceInvocationResponseSink, Source, VirtualObjectHandlerType, @@ -950,21 +951,21 @@ async fn send_ingress_response_to_multiple_targets() -> TestResult { all!( contains(pat!(Action::IngressResponse { request_id: eq(request_id_1), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) })), contains(pat!(Action::IngressResponse { request_id: eq(request_id_2), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) })), contains(pat!(Action::IngressResponse { request_id: eq(request_id_3), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) diff --git a/crates/worker/src/partition/state_machine/tests/workflow.rs b/crates/worker/src/partition/state_machine/tests/workflow.rs index 0d807db7b2..6939b8acd8 100644 --- a/crates/worker/src/partition/state_machine/tests/workflow.rs +++ b/crates/worker/src/partition/state_machine/tests/workflow.rs @@ -79,7 +79,7 @@ async fn start_workflow_method() { contains(pat!(Action::IngressResponse { request_id: eq(request_id_2), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Failure( + response: eq(InvocationOutputResponse::Failure( WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR )) })) @@ -115,7 +115,7 @@ async fn start_workflow_method() { contains(pat!(Action::IngressResponse { request_id: eq(request_id_1), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -124,7 +124,7 @@ async fn start_workflow_method() { not(contains(pat!(Action::IngressResponse { request_id: eq(request_id_2), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -162,7 +162,7 @@ async fn start_workflow_method() { contains(pat!(Action::IngressResponse { request_id: eq(request_id_3), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Failure( + response: eq(InvocationOutputResponse::Failure( WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR )) })) @@ -252,7 +252,7 @@ async fn attach_by_workflow_key() { contains(pat!(Action::IngressResponse { request_id: eq(request_id_1), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -260,7 +260,7 @@ async fn attach_by_workflow_key() { contains(pat!(Action::IngressResponse { request_id: eq(request_id_2), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) @@ -298,7 +298,7 @@ async fn attach_by_workflow_key() { contains(pat!(Action::IngressResponse { request_id: eq(request_id_3), invocation_id: some(eq(invocation_id)), - response: eq(IngressResponseResult::Success( + response: eq(InvocationOutputResponse::Success( invocation_target.clone(), response_bytes.clone() )) From 3e2cf2c0ec7038e9f3f0f3ddf0366be460d21eb8 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 5 May 2025 09:26:26 +0200 Subject: [PATCH 2/2] Feedback --- crates/types/src/invocation/client.rs | 39 ++++++--------------------- 1 file changed, 8 insertions(+), 31 deletions(-) diff --git a/crates/types/src/invocation/client.rs b/crates/types/src/invocation/client.rs index 5fdebcd313..b8e47df953 100644 --- a/crates/types/src/invocation/client.rs +++ b/crates/types/src/invocation/client.rs @@ -14,11 +14,12 @@ use crate::invocation::{InvocationQuery, InvocationRequest, InvocationResponse, use crate::journal_v2::Signal; use crate::time::MillisSinceEpoch; use bytes::Bytes; -use std::error::Error; -use std::fmt; +#[derive(Debug, thiserror::Error)] +#[error("{inner}")] pub struct InvocationClientError { is_safe_to_retry: bool, + #[source] inner: anyhow::Error, } @@ -39,34 +40,6 @@ impl InvocationClientError { } } -impl fmt::Debug for InvocationClientError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Debug::fmt(&self.inner, f) - } -} - -impl fmt::Display for InvocationClientError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt(&self.inner, f) - } -} - -impl Error for InvocationClientError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - self.inner.source() - } - - #[allow(deprecated)] - fn description(&self) -> &str { - self.inner.description() - } - - #[allow(deprecated)] - fn cause(&self) -> Option<&dyn Error> { - self.inner.cause() - } -} - #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct SubmittedInvocationNotification { pub request_id: PartitionProcessorRpcRequestId, @@ -110,7 +83,7 @@ pub enum GetInvocationOutputResponse { /// This trait provides the functionalities to interact with Restate invocations. pub trait InvocationClient { - /// Append the invocation to the log, waiting for the submit notification emitted by the PartitionProcessor. + /// Append the invocation to the log, waiting for the PP to emit [`SubmittedInvocationNotification`] when the command is processed. fn append_invocation_and_wait_submit_notification( &self, request_id: PartitionProcessorRpcRequestId, @@ -124,24 +97,28 @@ pub trait InvocationClient { invocation_request: InvocationRequest, ) -> impl Future> + Send; + /// Attach to an existing invocation and wait for its output. fn attach_invocation( &self, request_id: PartitionProcessorRpcRequestId, invocation_query: InvocationQuery, ) -> impl Future> + Send; + /// Get an invocation output, when present. fn get_invocation_output( &self, request_id: PartitionProcessorRpcRequestId, invocation_query: InvocationQuery, ) -> impl Future> + Send; + /// **DEPRECATED** Append [`InvocationResponse`] to an existing invocation journal. Only ServiceProtocol <= 3 fn append_invocation_response( &self, request_id: PartitionProcessorRpcRequestId, invocation_response: InvocationResponse, ) -> impl Future> + Send; + /// Append a signal to an existing invocation journal. fn append_signal( &self, request_id: PartitionProcessorRpcRequestId,