Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/core/src/worker_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@
// by the Apache License, Version 2.0.

mod partition_processor_manager;
mod partition_processor_rpc_client;

pub use partition_processor_manager::*;
pub use partition_processor_rpc_client::*;
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,37 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::ShutdownError;
use crate::network::ConnectError;
use crate::network::{NetworkSender, RpcReplyError, Swimlane};
use crate::network::{Networking, TransportConnect};
use crate::partitions::PartitionRouting;
use assert2::let_assert;
use tracing::trace;

use restate_core::ShutdownError;
use restate_core::network::{NetworkSender, RpcReplyError, Swimlane};
use restate_core::network::{Networking, TransportConnect};
use restate_core::partitions::PartitionRouting;
use restate_types::identifiers::{
InvocationId, PartitionId, PartitionProcessorRpcRequestId, WithPartitionKey,
};
use restate_types::invocation::client::{
AttachInvocationResponse, GetInvocationOutputResponse, InvocationClient, InvocationClientError,
InvocationOutput, SubmittedInvocationNotification,
};
use restate_types::invocation::{InvocationQuery, InvocationRequest, InvocationResponse};
use restate_types::journal_v2::Signal;
use restate_types::live::Live;
use restate_types::net::partition_processor::{
AppendInvocationReplyOn, GetInvocationOutputResponseMode, InvocationOutput,
PartitionProcessorRpcError, PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner,
PartitionProcessorRpcResponse, SubmittedInvocationNotification,
AppendInvocationReplyOn, GetInvocationOutputResponseMode, PartitionProcessorRpcError,
PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse,
};
use restate_types::partition_table::{FindPartition, PartitionTable, PartitionTableError};
use tracing::trace;

#[derive(Debug, thiserror::Error)]
pub enum PartitionProcessorRpcClientError {
pub enum PartitionProcessorInvocationClientError {
#[error(transparent)]
UnknownPartition(#[from] PartitionTableError),
#[error("cannot find node for partition {0}")]
UnknownNode(PartitionId),
#[error(transparent)]
Connect(#[from] restate_core::network::ConnectError),
Connect(#[from] ConnectError),
#[error("failed sending request")]
SendFailed,
#[error(transparent)]
Expand All @@ -54,18 +57,18 @@ pub enum PartitionProcessorRpcClientError {
Stopping,
}

impl PartitionProcessorRpcClientError {
impl PartitionProcessorInvocationClientError {
/// Returns true when the operation can be retried assuming no state mutation could have occurred in the PartitionProcessor.
pub fn is_safe_to_retry(&self) -> bool {
match self {
PartitionProcessorRpcClientError::UnknownPartition(_)
| PartitionProcessorRpcClientError::Connect(_)
| PartitionProcessorRpcClientError::UnknownNode(_)
| PartitionProcessorRpcClientError::NotLeader
| PartitionProcessorRpcClientError::Starting
| PartitionProcessorRpcClientError::Busy
| PartitionProcessorRpcClientError::SendFailed
| PartitionProcessorRpcClientError::Stopping => {
PartitionProcessorInvocationClientError::UnknownPartition(_)
| PartitionProcessorInvocationClientError::Connect(_)
| PartitionProcessorInvocationClientError::UnknownNode(_)
| PartitionProcessorInvocationClientError::NotLeader
| PartitionProcessorInvocationClientError::Starting
| PartitionProcessorInvocationClientError::Busy
| PartitionProcessorInvocationClientError::SendFailed
| PartitionProcessorInvocationClientError::Stopping => {
// These are pre-flight error that we can distinguish,
// and for which we know for certain that no message was proposed yet to the log.
true
Expand All @@ -75,7 +78,14 @@ impl PartitionProcessorRpcClientError {
}
}

impl From<RpcReplyError> for PartitionProcessorRpcClientError {
impl From<PartitionProcessorInvocationClientError> for InvocationClientError {
fn from(value: PartitionProcessorInvocationClientError) -> Self {
let is_safe_to_retry = value.is_safe_to_retry();
Self::new(value, is_safe_to_retry)
}
}

impl From<RpcReplyError> for PartitionProcessorInvocationClientError {
fn from(value: RpcReplyError) -> Self {
match value {
e @ RpcReplyError::Unknown(_) => Self::Internal(e.to_string()),
Expand All @@ -91,47 +101,35 @@ impl From<RpcReplyError> for PartitionProcessorRpcClientError {
}
}

impl From<PartitionProcessorRpcError> for PartitionProcessorRpcClientError {
impl From<PartitionProcessorRpcError> for PartitionProcessorInvocationClientError {
fn from(value: PartitionProcessorRpcError) -> Self {
match value {
PartitionProcessorRpcError::NotLeader(_) => PartitionProcessorRpcClientError::NotLeader,
PartitionProcessorRpcError::NotLeader(_) => {
PartitionProcessorInvocationClientError::NotLeader
}
PartitionProcessorRpcError::LostLeadership(partition_id) => {
PartitionProcessorRpcClientError::LostLeadership(partition_id)
PartitionProcessorInvocationClientError::LostLeadership(partition_id)
}
PartitionProcessorRpcError::Internal(msg) => {
PartitionProcessorRpcClientError::Internal(msg)
PartitionProcessorInvocationClientError::Internal(msg)
}
PartitionProcessorRpcError::Starting => {
PartitionProcessorInvocationClientError::Starting
}
PartitionProcessorRpcError::Stopping => {
PartitionProcessorInvocationClientError::Stopping
}
PartitionProcessorRpcError::Starting => PartitionProcessorRpcClientError::Starting,
PartitionProcessorRpcError::Stopping => PartitionProcessorRpcClientError::Stopping,
}
}
}

#[derive(Debug, Clone)]
pub enum AttachInvocationResponse {
NotFound,
/// Returned when the invocation hasn't an idempotency key, nor it's a workflow run.
NotSupported,
Ready(InvocationOutput),
}

#[derive(Debug, Clone)]
pub enum GetInvocationOutputResponse {
NotFound,
/// The invocation was found, but it's still processing and a result is not ready yet.
NotReady,
/// Returned when the invocation hasn't an idempotency key, nor it's a workflow run.
NotSupported,
Ready(InvocationOutput),
}

pub struct PartitionProcessorRpcClient<C> {
pub struct PartitionProcessorInvocationClient<C> {
networking: Networking<C>,
partition_table: Live<PartitionTable>,
partition_routing: PartitionRouting,
}

impl<C: Clone> Clone for PartitionProcessorRpcClient<C> {
impl<C: Clone> Clone for PartitionProcessorInvocationClient<C> {
fn clone(&self) -> Self {
Self {
networking: self.networking.clone(),
Expand All @@ -141,7 +139,7 @@ impl<C: Clone> Clone for PartitionProcessorRpcClient<C> {
}
}

impl<C> PartitionProcessorRpcClient<C> {
impl<C> PartitionProcessorInvocationClient<C> {
pub fn new(
networking: Networking<C>,
partition_table: Live<PartitionTable>,
Expand All @@ -155,16 +153,70 @@ impl<C> PartitionProcessorRpcClient<C> {
}
}

impl<C> PartitionProcessorRpcClient<C>
impl<C> PartitionProcessorInvocationClient<C>
where
C: TransportConnect,
{
async fn resolve_partition_id_and_send(
&self,
request_id: PartitionProcessorRpcRequestId,
inner_request: PartitionProcessorRpcRequestInner,
) -> Result<PartitionProcessorRpcResponse, PartitionProcessorInvocationClientError> {
let partition_id = self
.partition_table
.pinned()
.find_partition_id(inner_request.partition_key())?;

let node_id = self
.partition_routing
.get_node_by_partition(partition_id)
.ok_or(PartitionProcessorInvocationClientError::UnknownNode(
partition_id,
))?;

// find connection for this node
let connection = self
.networking
.get_connection(node_id, Swimlane::IngressData)
.await?;
let permit = connection
.reserve()
.await
.ok_or(PartitionProcessorInvocationClientError::SendFailed)?;
let rpc_result = permit
.send_rpc(
PartitionProcessorRpcRequest {
request_id,
partition_id,
inner: inner_request,
},
Some(*partition_id as u64),
)
.await?;

if rpc_result.is_err() && rpc_result.as_ref().unwrap_err().likely_stale_route() {
trace!(
%partition_id,
%node_id,
%request_id,
"Received Partition Processor error indicating possible stale route"
);
}

Ok(rpc_result?)
}
}

impl<C> InvocationClient for PartitionProcessorInvocationClient<C>
where
C: TransportConnect,
{
/// Append the invocation to the log, waiting for the submit notification emitted by the PartitionProcessor.
pub async fn append_invocation_and_wait_submit_notification(
async fn append_invocation_and_wait_submit_notification(
&self,
request_id: PartitionProcessorRpcRequestId,
invocation_request: InvocationRequest,
) -> Result<SubmittedInvocationNotification, PartitionProcessorRpcClientError> {
) -> Result<SubmittedInvocationNotification, InvocationClientError> {
let response = self
.resolve_partition_id_and_send(
request_id,
Expand All @@ -186,13 +238,12 @@ where

Ok(submit_notification)
}

/// Append the invocation and wait for its output.
pub async fn append_invocation_and_wait_output(
async fn append_invocation_and_wait_output(
&self,
request_id: PartitionProcessorRpcRequestId,
invocation_request: InvocationRequest,
) -> Result<InvocationOutput, PartitionProcessorRpcClientError> {
) -> Result<InvocationOutput, InvocationClientError> {
let response = self
.resolve_partition_id_and_send(
request_id,
Expand All @@ -214,12 +265,11 @@ where

Ok(invocation_output)
}

pub async fn attach_invocation(
async fn attach_invocation(
&self,
request_id: PartitionProcessorRpcRequestId,
invocation_query: InvocationQuery,
) -> Result<AttachInvocationResponse, PartitionProcessorRpcClientError> {
) -> Result<AttachInvocationResponse, InvocationClientError> {
let response = self
.resolve_partition_id_and_send(
request_id,
Expand All @@ -243,12 +293,11 @@ where
}
})
}

pub async fn get_invocation_output(
async fn get_invocation_output(
&self,
request_id: PartitionProcessorRpcRequestId,
invocation_query: InvocationQuery,
) -> Result<GetInvocationOutputResponse, PartitionProcessorRpcClientError> {
) -> Result<GetInvocationOutputResponse, InvocationClientError> {
let response = self
.resolve_partition_id_and_send(
request_id,
Expand All @@ -275,12 +324,11 @@ where
}
})
}

pub async fn append_invocation_response(
async fn append_invocation_response(
&self,
request_id: PartitionProcessorRpcRequestId,
invocation_response: InvocationResponse,
) -> Result<(), PartitionProcessorRpcClientError> {
) -> Result<(), InvocationClientError> {
let response = self
.resolve_partition_id_and_send(
request_id,
Expand All @@ -295,13 +343,12 @@ where

Ok(())
}

pub async fn append_signal(
async fn append_signal(
&self,
request_id: PartitionProcessorRpcRequestId,
invocation_id: InvocationId,
signal: Signal,
) -> Result<(), PartitionProcessorRpcClientError> {
) -> Result<(), InvocationClientError> {
let response = self
.resolve_partition_id_and_send(
request_id,
Expand All @@ -316,51 +363,4 @@ where

Ok(())
}

async fn resolve_partition_id_and_send(
&self,
request_id: PartitionProcessorRpcRequestId,
inner_request: PartitionProcessorRpcRequestInner,
) -> Result<PartitionProcessorRpcResponse, PartitionProcessorRpcClientError> {
let partition_id = self
.partition_table
.pinned()
.find_partition_id(inner_request.partition_key())?;

let node_id = self
.partition_routing
.get_node_by_partition(partition_id)
.ok_or(PartitionProcessorRpcClientError::UnknownNode(partition_id))?;

// find connection for this node
let connection = self
.networking
.get_connection(node_id, Swimlane::IngressData)
.await?;
let permit = connection
.reserve()
.await
.ok_or(PartitionProcessorRpcClientError::SendFailed)?;
let rpc_result = permit
.send_rpc(
PartitionProcessorRpcRequest {
request_id,
partition_id,
inner: inner_request,
},
Some(*partition_id as u64),
)
.await?;

if rpc_result.is_err() && rpc_result.as_ref().unwrap_err().likely_stale_route() {
trace!(
%partition_id,
%node_id,
%request_id,
"Received Partition Processor error indicating possible stale route"
);
}

Ok(rpc_result?)
}
}
Loading
Loading