From 30116f1fbf39ad42ddd5ba53ea63828e73586f65 Mon Sep 17 00:00:00 2001 From: Christian Berkhoff Date: Mon, 6 Jan 2025 20:18:05 -0300 Subject: [PATCH 1/9] Added optional response to send --- ipa-core/src/helpers/gateway/transport.rs | 7 ++-- .../helpers/transport/in_memory/transport.rs | 7 ++-- ipa-core/src/helpers/transport/mod.rs | 34 ++++++++++++++++--- ipa-core/src/net/transport.rs | 29 +++++++++------- 4 files changed, 54 insertions(+), 23 deletions(-) diff --git a/ipa-core/src/helpers/gateway/transport.rs b/ipa-core/src/helpers/gateway/transport.rs index 09a1053cb..cb98e07f9 100644 --- a/ipa-core/src/helpers/gateway/transport.rs +++ b/ipa-core/src/helpers/gateway/transport.rs @@ -34,6 +34,7 @@ pub(super) struct Transports, S: Transport::RecordsStream; + type SendResponse = ::SendResponse; type Error = SendToRoleError; fn identity(&self) -> Role { @@ -50,7 +51,7 @@ impl Transport for RoleResolvingTransport { self.inner.peer_count() } - async fn send< + async fn send_and_receive< D: Stream> + Send + 'static, Q: QueryIdBinding, S: StepBinding, @@ -60,7 +61,7 @@ impl Transport for RoleResolvingTransport { dest: Role, route: R, data: D, - ) -> Result<(), Self::Error> + ) -> Result, Self::Error> where Option: From, Option: From, @@ -72,7 +73,7 @@ impl Transport for RoleResolvingTransport { "can't send message to itself" ); self.inner - .send(dest_helper, route, data) + .send_and_receive(dest_helper, route, data) .await .map_err(|e| SendToRoleError(dest, e)) } diff --git a/ipa-core/src/helpers/transport/in_memory/transport.rs b/ipa-core/src/helpers/transport/in_memory/transport.rs index 504921eb5..3c4aab36b 100644 --- a/ipa-core/src/helpers/transport/in_memory/transport.rs +++ b/ipa-core/src/helpers/transport/in_memory/transport.rs @@ -156,6 +156,7 @@ impl InMemoryTransport { impl Transport for Weak> { type Identity = I; type RecordsStream = ReceiveRecords; + type SendResponse = InMemoryStream; type Error = Error; fn identity(&self) -> I { @@ -172,7 +173,7 @@ impl Transport for Weak> { .into_iter() } - async fn send< + async fn send_and_receive< D: Stream> + Send + 'static, Q: QueryIdBinding, S: StepBinding, @@ -182,7 +183,7 @@ impl Transport for Weak> { dest: I, route: R, data: D, - ) -> Result<(), Error> + ) -> Result, Error> where Option: From, Option: From, @@ -225,7 +226,7 @@ impl Transport for Weak> { inner: e.into(), })?; - Ok(()) + Ok(None) } fn receive>( diff --git a/ipa-core/src/helpers/transport/mod.rs b/ipa-core/src/helpers/transport/mod.rs index 00021c62c..4d5813daa 100644 --- a/ipa-core/src/helpers/transport/mod.rs +++ b/ipa-core/src/helpers/transport/mod.rs @@ -326,6 +326,7 @@ impl From> for BroadcastError pub trait Transport: Clone + Send + Sync + 'static { type Identity: TransportIdentity; type RecordsStream: BytesStream; + type SendResponse: BytesStream; type Error: Debug + Send; /// Return my identity in the network (MPC or Sharded) @@ -350,6 +351,27 @@ pub trait Transport: Clone + Send + Sync + 'static { route: R, data: D, ) -> Result<(), Self::Error> + where + Option: From, + Option: From, + Q: QueryIdBinding, + S: StepBinding, + R: RouteParams, + D: Stream> + Send + 'static + { + self.send_and_receive(dest, route, data).await?; + Ok(()) // Todo, error if data found + } + + /// Sends a new request to the given destination helper party. + /// Depending on the specific request, it may or may not require acknowledgment by the remote + /// party + async fn send_and_receive( + &self, + dest: Self::Identity, + route: R, + data: D, + ) -> Result, Self::Error> where Option: From, Option: From, @@ -371,7 +393,7 @@ pub trait Transport: Clone + Send + Sync + 'static { async fn broadcast( &self, route: R, - ) -> Result<(), BroadcastError> + ) -> Result)>, BroadcastError> where Option: From, Option: From, @@ -382,20 +404,22 @@ pub trait Transport: Clone + Send + Sync + 'static { let mut futs = FuturesUnordered::new(); for peer_identity in self.peers() { futs.push( - Self::send(self, peer_identity, route.clone(), futures::stream::empty()) + Self::send_and_receive(self, peer_identity, route.clone(), futures::stream::empty()) .map(move |v| (peer_identity, v)), ); } let mut errs = Vec::new(); + let mut responses = Vec::new(); while let Some(r) = futs.next().await { - if let Err(e) = r.1 { - errs.push((r.0, e)); + match r.1 { + Err(e) => errs.push((r.0, e)), + Ok(re) => responses.push((r.0, re)), } } if errs.is_empty() { - Ok(()) + Ok(responses) } else { Err(errs.into()) } diff --git a/ipa-core/src/net/transport.rs b/ipa-core/src/net/transport.rs index 599024271..444eaebfc 100644 --- a/ipa-core/src/net/transport.rs +++ b/ipa-core/src/net/transport.rs @@ -69,7 +69,7 @@ impl RouteParams for QueryConfig { } impl HttpTransport { - async fn send< + async fn send_and_receive< D: Stream> + Send + 'static, Q: QueryIdBinding, S: StepBinding, @@ -79,7 +79,7 @@ impl HttpTransport { dest: F::Identity, route: R, data: D, - ) -> Result<(), Error> + ) -> Result, Error> where Option: From, Option: From, @@ -100,20 +100,23 @@ impl HttpTransport { self.http_runtime .spawn(resp_future.map_err(Into::into).and_then(resp_ok)) .await?; - Ok(()) + Ok(None) } RouteId::PrepareQuery => { let req = serde_json::from_str(route.extra().borrow()).unwrap(); - self.clients[client_ix].prepare_query(req).await + self.clients[client_ix].prepare_query(req).await?; + Ok(None) } RouteId::CompleteQuery => { let query_id = >::from(route.query_id()) .expect("query_id is required to call complete query API"); - self.clients[client_ix].complete_query(query_id).await + self.clients[client_ix].complete_query(query_id).await?; + Ok(None) } RouteId::QueryStatus => { let req = serde_json::from_str(route.extra().borrow())?; - self.clients[client_ix].status_match(req).await + self.clients[client_ix].status_match(req).await?; + Ok(None) } evt @ (RouteId::QueryInput | RouteId::ReceiveQuery @@ -273,6 +276,7 @@ impl MpcHttpTransport { impl Transport for MpcHttpTransport { type Identity = HelperIdentity; type RecordsStream = ReceiveRecords; + type SendResponse = BodyStream; type Error = Error; fn identity(&self) -> Self::Identity { @@ -290,7 +294,7 @@ impl Transport for MpcHttpTransport { 2 } - async fn send< + async fn send_and_receive< D: Stream> + Send + 'static, Q: QueryIdBinding, S: StepBinding, @@ -300,12 +304,12 @@ impl Transport for MpcHttpTransport { dest: Self::Identity, route: R, data: D, - ) -> Result<(), Error> + ) -> Result, Error> where Option: From, Option: From, { - self.inner_transport.send(dest, route, data).await + self.inner_transport.send_and_receive(dest, route, data).await } fn receive>( @@ -353,6 +357,7 @@ impl ShardHttpTransport { impl Transport for ShardHttpTransport { type Identity = ShardIndex; type RecordsStream = ReceiveRecords; + type SendResponse = BodyStream; type Error = ShardError; fn identity(&self) -> Self::Identity { @@ -368,12 +373,12 @@ impl Transport for ShardHttpTransport { u32::from(self.shard_count).saturating_sub(1) } - async fn send( + async fn send_and_receive( &self, dest: Self::Identity, route: R, data: D, - ) -> Result<(), Self::Error> + ) -> Result, Self::Error> where Option: From, Option: From, @@ -383,7 +388,7 @@ impl Transport for ShardHttpTransport { D: Stream> + Send + 'static, { self.inner_transport - .send(dest, route, data) + .send_and_receive(dest, route, data) .map_err(|source| ShardError { shard_index: self.identity(), source, From c0623338ec482b1c41db73c9e3ca36f8a5df6aae Mon Sep 17 00:00:00 2001 From: Christian Berkhoff Date: Tue, 7 Jan 2025 18:39:12 -0300 Subject: [PATCH 2/9] Modified processor to query instead of using errors and comparison in shards --- ipa-core/src/app.rs | 6 +- ipa-core/src/helpers/transport/query/mod.rs | 4 +- ipa-core/src/net/client/mod.rs | 64 ++--- ipa-core/src/net/error.rs | 20 +- ipa-core/src/net/http_serde.rs | 45 ---- ipa-core/src/net/server/handlers/query/mod.rs | 2 - .../net/server/handlers/query/status_match.rs | 227 ------------------ ipa-core/src/net/transport.rs | 5 +- ipa-core/src/query/processor.rs | 126 +++------- ipa-core/src/query/state.rs | 14 +- 10 files changed, 63 insertions(+), 450 deletions(-) delete mode 100644 ipa-core/src/net/server/handlers/query/status_match.rs diff --git a/ipa-core/src/app.rs b/ipa-core/src/app.rs index a87c253a0..03d1959da 100644 --- a/ipa-core/src/app.rs +++ b/ipa-core/src/app.rs @@ -6,7 +6,7 @@ use crate::{ cli::LoggingHandle, executor::IpaRuntime, helpers::{ - query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput}, + query::{PrepareQuery, QueryConfig, QueryInput}, routing::{Addr, RouteId}, ApiError, BodyStream, HandlerBox, HandlerRef, HelperIdentity, HelperResponse, MpcTransportImpl, RequestHandler, ShardTransportImpl, Transport, TransportIdentity, @@ -208,8 +208,8 @@ impl RequestHandler for Inner { HelperResponse::from(qp.prepare_shard(&self.shard_transport, req)?) } RouteId::QueryStatus => { - let req = req.into::()?; - HelperResponse::from(qp.shard_status(&self.shard_transport, &req)?) + let query_id = ext_query_id(&req)?; + HelperResponse::from(qp.shard_status(&self.shard_transport, query_id)?) } RouteId::CompleteQuery => { // The processing flow for this API is exactly the same, regardless diff --git a/ipa-core/src/helpers/transport/query/mod.rs b/ipa-core/src/helpers/transport/query/mod.rs index cd3e389d1..70161c1c6 100644 --- a/ipa-core/src/helpers/transport/query/mod.rs +++ b/ipa-core/src/helpers/transport/query/mod.rs @@ -239,7 +239,7 @@ impl Debug for QueryInput { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +/*#[derive(Clone, Debug, Serialize, Deserialize)] #[cfg_attr(test, derive(PartialEq, Eq))] pub struct CompareStatusRequest { pub query_id: QueryId, @@ -264,7 +264,7 @@ impl RouteParams for CompareStatusRequest { fn extra(&self) -> Self::Params { serde_json::to_string(self).unwrap() } -} +}*/ #[derive(Copy, Clone, Debug, Serialize, Deserialize)] #[cfg_attr(test, derive(PartialEq, Eq))] diff --git a/ipa-core/src/net/client/mod.rs b/ipa-core/src/net/client/mod.rs index 42d7c1377..fc0252b45 100644 --- a/ipa-core/src/net/client/mod.rs +++ b/ipa-core/src/net/client/mod.rs @@ -33,10 +33,10 @@ use crate::{ }, executor::IpaRuntime, helpers::{ - query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput}, + query::{PrepareQuery, QueryConfig, QueryInput}, TransportIdentity, }, - net::{error::ShardQueryStatusMismatchError, http_serde, Error, CRYPTO_PROVIDER}, + net::{http_serde, Error, CRYPTO_PROVIDER}, protocol::{Gate, QueryId}, }; @@ -385,28 +385,25 @@ impl IpaHttpClient { resp_ok(resp).await } - /// This API is used by leader shards in MPC to request query status information on peers. - /// If a given peer has status that doesn't match the one provided by the leader, it responds - /// with 412 error and encodes its status inside the response body. Otherwise, 200 is returned. + /// Retrieve the status of a query. /// - /// # Errors - /// If the request has illegal arguments, or fails to be delivered - pub async fn status_match(&self, data: CompareStatusRequest) -> Result<(), Error> { - let req = http_serde::query::status_match::try_into_http_request( - &data, - self.scheme.clone(), - self.authority.clone(), - )?; - let resp = self.request(req).await?; + /// ## Errors + /// If the request has illegal arguments, or fails to deliver to helper + pub async fn query_status( + &self, + query_id: QueryId, + ) -> Result { + let req = http_serde::query::status::Request::new(query_id); + let req = req.try_into_http_request(self.scheme.clone(), self.authority.clone())?; - match resp.status() { - StatusCode::OK => Ok(()), - StatusCode::PRECONDITION_FAILED => { - let bytes = response_to_bytes(resp).await?; - let err = serde_json::from_slice::(&bytes)?; - Err(err.into()) - } - _ => Err(Error::from_failed_resp(resp).await), + let resp = self.request(req).await?; + if resp.status().is_success() { + let bytes = response_to_bytes(resp).await?; + let http_serde::query::status::ResponseBody { status } = + serde_json::from_slice(&bytes)?; + Ok(status) + } else { + Err(Error::from_failed_resp(resp).await) } } } @@ -467,29 +464,6 @@ impl IpaHttpClient { resp_ok(resp).await } - /// Retrieve the status of a query. - /// - /// ## Errors - /// If the request has illegal arguments, or fails to deliver to helper - #[cfg(any(all(test, not(feature = "shuttle")), feature = "cli"))] - pub async fn query_status( - &self, - query_id: QueryId, - ) -> Result { - let req = http_serde::query::status::Request::new(query_id); - let req = req.try_into_http_request(self.scheme.clone(), self.authority.clone())?; - - let resp = self.request(req).await?; - if resp.status().is_success() { - let bytes = response_to_bytes(resp).await?; - let http_serde::query::status::ResponseBody { status } = - serde_json::from_slice(&bytes)?; - Ok(status) - } else { - Err(Error::from_failed_resp(resp).await) - } - } - /// Wait for completion of the query and pull the results of this query. This is a blocking /// API so it is not supposed to be used outside of CLI context. /// diff --git a/ipa-core/src/net/error.rs b/ipa-core/src/net/error.rs index f137c3232..066d66b1f 100644 --- a/ipa-core/src/net/error.rs +++ b/ipa-core/src/net/error.rs @@ -4,7 +4,7 @@ use axum::{ }; use crate::{ - error::BoxError, net::client::ResponseFromEndpoint, protocol::QueryId, query::QueryStatus, + error::BoxError, net::client::ResponseFromEndpoint, protocol::QueryId, sharding::ShardIndex, }; @@ -62,11 +62,6 @@ pub enum Error { }, #[error("{code}: {error}")] Application { code: StatusCode, error: BoxError }, - #[error(transparent)] - ShardQueryStatusMismatch { - #[from] - error: ShardQueryStatusMismatchError, - }, } impl Error { @@ -148,12 +143,6 @@ pub struct ShardError { pub source: Error, } -#[derive(Debug, thiserror::Error, serde::Deserialize, serde::Serialize)] -#[error("Query status mismatch. Actual status: {actual}")] -pub struct ShardQueryStatusMismatchError { - pub actual: QueryStatus, -} - impl IntoResponse for Error { fn into_response(self) -> Response { let status_code = match self { @@ -177,13 +166,6 @@ impl IntoResponse for Error { | Self::MissingExtension(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::Application { code, .. } => code, - Self::ShardQueryStatusMismatch { error } => { - return ( - StatusCode::PRECONDITION_FAILED, - serde_json::to_string(&error).unwrap(), - ) - .into_response(); - } }; (status_code, self.to_string()).into_response() } diff --git a/ipa-core/src/net/http_serde.rs b/ipa-core/src/net/http_serde.rs index be2bc6d83..e8c276934 100644 --- a/ipa-core/src/net/http_serde.rs +++ b/ipa-core/src/net/http_serde.rs @@ -498,12 +498,10 @@ pub mod query { } impl Request { - #[cfg(any(all(test, not(feature = "shuttle")), feature = "cli"))] // needed because client is blocking; remove when non-blocking pub fn new(query_id: QueryId) -> Self { Self { query_id } } - #[cfg(any(all(test, not(feature = "shuttle")), feature = "cli"))] // needed because client is blocking; remove when non-blocking pub fn try_into_http_request( self, scheme: axum::http::uri::Scheme, @@ -671,47 +669,4 @@ pub mod query { pub const AXUM_PATH: &str = "/:query_id/kill"; } - pub mod status_match { - use serde::{Deserialize, Serialize}; - - use crate::{helpers::query::CompareStatusRequest, query::QueryStatus}; - - #[derive(Serialize, Deserialize)] - pub struct StatusQueryString { - pub status: QueryStatus, - } - - impl StatusQueryString { - fn url_encode(&self) -> String { - // todo: serde urlencoded - format!("status={}", self.status) - } - } - - impl From for StatusQueryString { - fn from(value: QueryStatus) -> Self { - Self { status: value } - } - } - - pub fn try_into_http_request( - req: &CompareStatusRequest, - scheme: axum::http::uri::Scheme, - authority: axum::http::uri::Authority, - ) -> crate::net::http_serde::OutgoingRequest { - let uri = axum::http::uri::Uri::builder() - .scheme(scheme) - .authority(authority) - .path_and_query(format!( - "{}/{}/status-match?{}", - crate::net::http_serde::query::BASE_AXUM_PATH, - req.query_id.as_ref(), - StatusQueryString::from(req.status).url_encode(), - )) - .build()?; - Ok(hyper::Request::get(uri).body(axum::body::Body::empty())?) - } - - pub const AXUM_PATH: &str = "/:query_id/status-match"; - } } diff --git a/ipa-core/src/net/server/handlers/query/mod.rs b/ipa-core/src/net/server/handlers/query/mod.rs index 8a9881bb7..4b89d7f31 100644 --- a/ipa-core/src/net/server/handlers/query/mod.rs +++ b/ipa-core/src/net/server/handlers/query/mod.rs @@ -4,7 +4,6 @@ mod kill; mod prepare; mod results; mod status; -mod status_match; mod step; use std::marker::PhantomData; @@ -62,7 +61,6 @@ pub fn s2s_router(transport: Arc>) -> Router { .merge(step::router(Arc::clone(&transport))) .merge(prepare::router(Arc::clone(&transport))) .merge(results::router(Arc::clone(&transport))) - .merge(status_match::router(transport)) .layer(layer_fn(HelperAuthentication::<_, Shard>::new)) } diff --git a/ipa-core/src/net/server/handlers/query/status_match.rs b/ipa-core/src/net/server/handlers/query/status_match.rs deleted file mode 100644 index 5b2081c5e..000000000 --- a/ipa-core/src/net/server/handlers/query/status_match.rs +++ /dev/null @@ -1,227 +0,0 @@ -use axum::{ - extract::{Path, Query}, - routing::get, - Extension, Router, -}; -use hyper::StatusCode; - -use crate::{ - helpers::{query::CompareStatusRequest, ApiError, BodyStream}, - net::{ - http_serde::query::status_match::{ - StatusQueryString, {self}, - }, - server::Error, - HttpTransport, Shard, - }, - protocol::QueryId, - query::QueryStatusError, - sync::Arc, -}; - -async fn handler( - transport: Extension>>, - Path(query_id): Path, - Query(StatusQueryString { status }): Query, -) -> Result<(), Error> { - let req = CompareStatusRequest { query_id, status }; - match Arc::clone(&transport) - .dispatch(req, BodyStream::empty()) - .await - { - Ok(_) => Ok(()), - Err(ApiError::QueryStatus(QueryStatusError::DifferentStatus { my_status, .. })) => { - Err(crate::net::error::ShardQueryStatusMismatchError { actual: my_status }.into()) - } - Err(e) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, e)), - } -} - -pub fn router(transport: Arc>) -> Router { - Router::new() - .route(status_match::AXUM_PATH, get(handler)) - .layer(Extension(transport)) -} - -#[cfg(all(test, unit_test))] -mod tests { - use std::{borrow::Borrow, sync::Arc}; - - use axum::{ - body::Body, - http::uri::{Authority, Scheme}, - }; - use hyper::StatusCode; - - use crate::{ - helpers::{ - make_owned_handler, - query::CompareStatusRequest, - routing::{Addr, RouteId}, - ApiError, BodyStream, HelperResponse, RequestHandler, - }, - net::{ - error::ShardQueryStatusMismatchError, - http_serde::query::status_match::try_into_http_request, - server::ClientIdentity, - test::{TestServer, TestServerBuilder}, - Error, Shard, - }, - protocol::QueryId, - query::{QueryStatus, QueryStatusError}, - sharding::ShardIndex, - }; - - fn for_status(status: QueryStatus) -> CompareStatusRequest { - CompareStatusRequest { - query_id: QueryId, - status, - } - } - - fn http_request>(req: B) -> hyper::Request { - try_into_http_request( - req.borrow(), - Scheme::HTTP, - Authority::from_static("localhost"), - ) - .unwrap() - } - - fn authenticated(mut req: hyper::Request) -> hyper::Request { - req.extensions_mut() - .insert(ClientIdentity(ShardIndex::from(2))); - req - } - - fn handler_status_match(expected_status: QueryStatus) -> Arc> { - make_owned_handler( - move |addr: Addr, _data: BodyStream| async move { - let RouteId::QueryStatus = addr.route else { - panic!("unexpected call"); - }; - let req = addr.into::().unwrap(); - assert_eq!(req.query_id, QueryId); - assert_eq!(req.status, expected_status); - Ok(HelperResponse::ok()) - }, - ) - } - - fn handler_status_mismatch( - expected_status: QueryStatus, - ) -> Arc> { - assert_ne!(expected_status, QueryStatus::Running); - - make_owned_handler( - move |addr: Addr, _data: BodyStream| async move { - let RouteId::QueryStatus = addr.route else { - panic!("unexpected call"); - }; - let req = addr.into::().unwrap(); - assert_eq!(req.query_id, QueryId); - Err(ApiError::QueryStatus(QueryStatusError::DifferentStatus { - query_id: QueryId, - my_status: QueryStatus::Running, - other_status: expected_status, - })) - }, - ) - } - - #[tokio::test] - async fn status_success() { - let expected_status = QueryStatus::Running; - let req = authenticated(http_request(for_status(expected_status))); - - TestServer::::oneshot_success(req, handler_status_match(expected_status)).await; - } - - #[tokio::test] - async fn status_client_success() { - let expected_status = QueryStatus::Running; - let test_server = TestServerBuilder::::default() - .with_request_handler(handler_status_match(expected_status)) - .build() - .await; - - test_server - .client - .status_match(for_status(expected_status)) - .await - .unwrap(); - } - - #[tokio::test] - async fn status_client_mismatch() { - let diff_status = QueryStatus::Preparing; - let test_server = TestServerBuilder::::default() - .with_request_handler(handler_status_mismatch(diff_status)) - .build() - .await; - let e = test_server - .client - .status_match(for_status(diff_status)) - .await - .unwrap_err(); - assert!(matches!( - e, - Error::ShardQueryStatusMismatch { - error: ShardQueryStatusMismatchError { - actual: QueryStatus::Running - }, - } - )); - } - - #[tokio::test] - async fn status_mismatch() { - let req_status = QueryStatus::Completed; - let handler = handler_status_mismatch(req_status); - let req = authenticated(http_request(for_status(req_status))); - - let resp = TestServer::::oneshot(req, handler).await; - assert_eq!(StatusCode::PRECONDITION_FAILED, resp.status()); - } - - #[tokio::test] - async fn other_query_error() { - let handler = make_owned_handler( - move |_addr: Addr, _data: BodyStream| async move { - Err(ApiError::QueryStatus(QueryStatusError::NoSuchQuery( - QueryId, - ))) - }, - ); - let req = authenticated(http_request(for_status(QueryStatus::Running))); - - let resp = TestServer::::oneshot(req, handler).await; - assert_eq!(StatusCode::INTERNAL_SERVER_ERROR, resp.status()); - } - - #[tokio::test] - async fn unauthenticated() { - assert_eq!( - StatusCode::UNAUTHORIZED, - TestServer::::oneshot( - http_request(for_status(QueryStatus::Running)), - make_owned_handler(|_, _| async move { unimplemented!() }), - ) - .await - .status() - ); - } - - #[tokio::test] - async fn server_error() { - assert_eq!( - StatusCode::INTERNAL_SERVER_ERROR, - TestServer::::oneshot( - authenticated(http_request(for_status(QueryStatus::Running))), - make_owned_handler(|_, _| async move { Err(ApiError::BadRequest("".into())) }), - ) - .await - .status() - ); - } -} diff --git a/ipa-core/src/net/transport.rs b/ipa-core/src/net/transport.rs index 444eaebfc..24530d7b8 100644 --- a/ipa-core/src/net/transport.rs +++ b/ipa-core/src/net/transport.rs @@ -114,8 +114,9 @@ impl HttpTransport { Ok(None) } RouteId::QueryStatus => { - let req = serde_json::from_str(route.extra().borrow())?; - self.clients[client_ix].status_match(req).await?; + let query_id = >::from(route.query_id()) + .expect("query_id is required to call complete query API"); + self.clients[client_ix].query_status(query_id).await?; Ok(None) } evt @ (RouteId::QueryInput diff --git a/ipa-core/src/query/processor.rs b/ipa-core/src/query/processor.rs index 95cfa0f44..b245c0a3f 100644 --- a/ipa-core/src/query/processor.rs +++ b/ipa-core/src/query/processor.rs @@ -6,12 +6,12 @@ use std::{ use futures::{future::try_join, stream}; use serde::Serialize; -use super::min_status; +use super::{min_status, state::read_query_status}; use crate::{ error::Error as ProtocolError, executor::IpaRuntime, helpers::{ - query::{CompareStatusRequest, PrepareQuery, QueryConfig}, + query::{PrepareQuery, QueryConfig}, routing::RouteId, BodyStream, BroadcastError, Gateway, GatewayConfig, MpcTransportError, MpcTransportImpl, Role, RoleAssignment, ShardTransportError, ShardTransportImpl, Transport, @@ -118,11 +118,12 @@ pub enum QueryStatusError { NotLeader(ShardIndex), #[error("This is the leader shard")] Leader, - #[error("My status {my_status:?} for query {query_id:?} differs from {other_status:?}")] - DifferentStatus { - query_id: QueryId, - my_status: QueryStatus, - other_status: QueryStatus, + #[error("No response from shard {0:?}")] + NoResponse(ShardIndex), + #[error(transparent)] + UnexpectedResponse { + #[from] + source: crate::error::BoxError, }, } @@ -354,48 +355,6 @@ impl Processor { Some(status) } - /// This helper function is used to transform a [`BoxError`] into a - /// [`QueryStatusError::DifferentStatus`] and retrieve it's internal state. Returns [`None`] - /// if not possible. - #[cfg(feature = "in-memory-infra")] - fn downcast_state_error(box_error: &crate::error::BoxError) -> Option { - use crate::helpers::ApiError; - let api_error = box_error.downcast_ref::(); - if let Some(ApiError::QueryStatus(QueryStatusError::DifferentStatus { - my_status, .. - })) = api_error - { - return Some(*my_status); - } - None - } - - /// This helper is used by the in-memory stack to obtain the state of other shards via a - /// [`QueryStatusError::DifferentStatus`] error. - /// TODO: Ideally broadcast should return a value, that we could use to parse the state instead - /// of relying on errors. - #[cfg(feature = "in-memory-infra")] - fn get_state_from_error( - error: &crate::helpers::InMemoryTransportError, - ) -> Option { - if let crate::helpers::InMemoryTransportError::Rejected { inner, .. } = error { - return Self::downcast_state_error(inner); - } - None - } - - /// This helper is used by the HTTP stack to obtain the state of other shards via a - /// [`QueryStatusError::DifferentStatus`] error. - /// TODO: Ideally broadcast should return a value, that we could use to parse the state instead - /// of relying on errors. - #[cfg(feature = "real-world-infra")] - fn get_state_from_error(shard_error: &crate::net::ShardError) -> Option { - if let crate::net::Error::ShardQueryStatusMismatch { error, .. } = &shard_error.source { - return Some(error.actual); - } - None - } - /// Returns the query status in this helper, by querying all shards. /// /// ## Errors @@ -412,29 +371,24 @@ impl Processor { if shard_index != ShardIndex::FIRST { return Err(QueryStatusError::NotLeader(shard_index)); } - let mut status = self .get_status(query_id) .ok_or(QueryStatusError::NoSuchQuery(query_id))?; - let shard_query_status_req = CompareStatusRequest { query_id, status }; - - let shard_responses = shard_transport.broadcast(shard_query_status_req).await; - if let Err(e) = shard_responses { - for (shard, failure) in &e.failures { - if let Some(other) = Self::get_state_from_error(failure) { - status = min_status(status, other); - } else { - tracing::error!("failed to get status from shard {shard}: {failure:?}"); - return Err(e.into()); - } + let shard_responses = shard_transport.broadcast((RouteId::QueryStatus, query_id)).await?; + for (i, o) in shard_responses { + if o.is_none() { + return Err(QueryStatusError::NoResponse(i)); } + let other: QueryStatus = read_query_status(o.unwrap()) + .await.unwrap(); //TODO handle error + status = min_status(status, other); } Ok(status) } - /// Compares this shard status against the given type. Returns an error if different. + /// Returns the status of this shard for a query. /// /// ## Errors /// If query is not registered on this helper or @@ -444,22 +398,16 @@ impl Processor { pub fn shard_status( &self, shard_transport: &ShardTransportImpl, - req: &CompareStatusRequest, + query_id: QueryId, ) -> Result { let shard_index = shard_transport.identity(); if shard_index == ShardIndex::FIRST { return Err(QueryStatusError::Leader); } let status = self - .get_status(req.query_id) - .ok_or(QueryStatusError::NoSuchQuery(req.query_id))?; - if req.status != status { - return Err(QueryStatusError::DifferentStatus { - query_id: req.query_id, - my_status: status, - other_status: req.status, - }); - } + .get_status(query_id) + .ok_or(QueryStatusError::NoSuchQuery(query_id))?; + Ok(status) } @@ -1127,7 +1075,7 @@ mod tests { mod query_status { use super::*; - use crate::{helpers::query::CompareStatusRequest, protocol::QueryId}; + use crate::{protocol::QueryId}; /// * From the standpoint of leader shard in Helper 1 /// * On query_status @@ -1142,21 +1090,9 @@ mod tests { const THIRD_SHARD: ShardIndex = ShardIndex::from_u32(2); create_handler(move |_| async move { match si { - FOURTH_SHARD => { - Err(ApiError::QueryStatus(QueryStatusError::DifferentStatus { - query_id: QueryId, - my_status: QueryStatus::Completed, - other_status: QueryStatus::Preparing, - })) - } - THIRD_SHARD => { - Err(ApiError::QueryStatus(QueryStatusError::DifferentStatus { - query_id: QueryId, - my_status: QueryStatus::Running, - other_status: QueryStatus::Preparing, - })) - } - _ => Ok(HelperResponse::ok()), + FOURTH_SHARD => Ok(HelperResponse::from(QueryStatus::Completed)), + THIRD_SHARD => Ok(HelperResponse::from(QueryStatus::Running)), + _ =>Ok(HelperResponse::from(QueryStatus::AwaitingInputs)), } }) } @@ -1206,13 +1142,9 @@ mod tests { QueryId, ))) } else if si == ShardIndex::from(2) { - Err(ApiError::QueryStatus(QueryStatusError::DifferentStatus { - query_id: QueryId, - my_status: QueryStatus::Running, - other_status: QueryStatus::Preparing, - })) + Ok(HelperResponse::from(QueryStatus::Running)) } else { - Ok(HelperResponse::ok()) + Ok(HelperResponse::from(QueryStatus::AwaitingInputs)) } }) } @@ -1266,17 +1198,13 @@ mod tests { /// call. Only non-leaders (1,2,3...) should handle those calls. #[tokio::test] async fn shard_not_leader() { - let req = CompareStatusRequest { - query_id: QueryId, - status: QueryStatus::Running, - }; let t = TestComponents::new(TestComponentsArgs::default()); assert!(matches!( t.processor .shard_status( &t.shard_network .transport(HelperIdentity::TWO, ShardIndex::FIRST), - &req + QueryId ) .unwrap_err(), QueryStatusError::Leader diff --git a/ipa-core/src/query/state.rs b/ipa-core/src/query/state.rs index 148a40565..33822edda 100644 --- a/ipa-core/src/query/state.rs +++ b/ipa-core/src/query/state.rs @@ -6,15 +6,11 @@ use std::{ }; use ::tokio::sync::oneshot::{error::TryRecvError, Receiver}; -use futures::{ready, FutureExt}; +use futures::{ready, FutureExt, TryFutureExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::{ - executor::IpaJoinHandle, - helpers::{query::QueryConfig, RoleAssignment}, - protocol::QueryId, - query::runner::QueryResult, - sync::Mutex, + error::BoxError, executor::IpaJoinHandle, helpers::{query::QueryConfig, BytesStream, RoleAssignment}, protocol::QueryId, query::runner::QueryResult, sync::Mutex }; /// The status of query processing @@ -54,6 +50,12 @@ impl From<&QueryState> for QueryStatus { } } +pub async fn read_query_status(value: B) -> Result { + let bytes: bytes::BytesMut = value.try_collect().await?; + let qs : QueryStatus = serde_json::from_slice(bytes.as_ref())?; + Ok(qs) +} + /// This function is used, among others, by the [`Processor`] to return a unified response when /// queried about the state of a sharded helper. In such scenarios, there will be many different /// [`QueryStatus`] and the [`Processor`] needs to return a single one that describes the entire From d13aff6155c03e444aaca35a14f4f0f0bd21478d Mon Sep 17 00:00:00 2001 From: Christian Berkhoff Date: Wed, 8 Jan 2025 16:08:20 -0300 Subject: [PATCH 3/9] mem tests pass --- ipa-core/src/helpers/transport/handler.rs | 27 +++++++++++++++++-- .../helpers/transport/in_memory/transport.rs | 25 ++++++++++------- ipa-core/src/query/processor.rs | 25 +++++++---------- ipa-core/src/query/state.rs | 8 +----- ipa-core/src/sharding.rs | 8 +++++- 5 files changed, 59 insertions(+), 34 deletions(-) diff --git a/ipa-core/src/helpers/transport/handler.rs b/ipa-core/src/helpers/transport/handler.rs index 9a1f1b457..b50fe3901 100644 --- a/ipa-core/src/helpers/transport/handler.rs +++ b/ipa-core/src/helpers/transport/handler.rs @@ -1,8 +1,9 @@ use std::{fmt::Debug, future::Future, marker::PhantomData}; use async_trait::async_trait; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::json; +use futures_util::TryStreamExt; use crate::{ error::BoxError, @@ -17,6 +18,8 @@ use crate::{ sync::{Arc, Mutex, Weak}, }; +use super::BytesStream; + /// Represents some response sent from MPC helper acting on a given request. It is rudimental now /// because we sent everything as HTTP body, but it could evolve. /// @@ -113,6 +116,11 @@ impl HelperResponse { pub fn try_into_owned(self) -> Result { serde_json::from_slice(&self.body) } + + pub async fn from_bytesstream(value: B) -> Result { + let bytes: bytes::BytesMut = value.try_collect().await?; + Ok(Self { body: bytes.to_vec() }) + } } impl From for HelperResponse { @@ -128,13 +136,28 @@ impl From<()> for HelperResponse { } } +#[derive(Deserialize, Serialize)] +struct QueryStatusResponse { + status: QueryStatus, +} + impl From for HelperResponse { fn from(value: QueryStatus) -> Self { - let v = serde_json::to_vec(&json!({"status": value})).unwrap(); + let response = QueryStatusResponse { + status: value + }; + let v = serde_json::to_vec(&response).unwrap(); Self { body: v } } } +impl From for QueryStatus { + fn from(value: HelperResponse) -> Self { + let response: QueryStatusResponse = serde_json::from_slice(value.body.as_ref()).unwrap(); + response.status + } +} + impl From for HelperResponse { fn from(value: QueryKilled) -> Self { let v = serde_json::to_vec(&json!({"query_id": value.0, "status": "killed"})).unwrap(); diff --git a/ipa-core/src/helpers/transport/in_memory/transport.rs b/ipa-core/src/helpers/transport/in_memory/transport.rs index 3c4aab36b..2fc2c7f9b 100644 --- a/ipa-core/src/helpers/transport/in_memory/transport.rs +++ b/ipa-core/src/helpers/transport/in_memory/transport.rs @@ -12,7 +12,7 @@ use ::tokio::sync::{ }; use async_trait::async_trait; use bytes::Bytes; -use futures::{Stream, StreamExt}; +use futures::{stream, Stream, StreamExt}; #[cfg(all(feature = "shuttle", test))] use shuttle::future as tokio; use tokio_stream::wrappers::ReceiverStream; @@ -100,6 +100,7 @@ impl InMemoryTransport { { let streams = self.record_streams.clone(); async move { + // A tuple with these 3 things is sent from send while let Some((addr, stream, ack)) = rx.recv().await { tracing::trace!("received new message: {addr:?}"); @@ -215,18 +216,20 @@ impl Transport for Weak> { io::Error::new::(io::ErrorKind::ConnectionAborted, "channel closed".into()) })?; - ack_rx + let res = ack_rx .await .map_err(|_recv_error| Error::Rejected { dest, inner: "channel closed".into(), - })? - .map_err(|e| Error::Rejected { - dest, - inner: e.into(), - })?; - - Ok(None) + })?.map_err(|e| Error::Rejected { + dest, + inner: e.into(), + })?; + let body_bytes = res.into_body(); + if body_bytes.is_empty() { + return Ok(None); + } + Ok(Some(InMemoryStream::wrap_bytes(body_bytes))) } fn receive>( @@ -248,6 +251,10 @@ pub struct InMemoryStream { } impl InMemoryStream { + fn wrap_bytes(bytes: Vec) -> Self { + InMemoryStream::wrap(stream::once(async { Ok(Bytes::from(bytes))})) + } + fn wrap + Send + 'static>(value: S) -> Self { Self { inner: Box::pin(value), diff --git a/ipa-core/src/query/processor.rs b/ipa-core/src/query/processor.rs index b245c0a3f..1fde5ff76 100644 --- a/ipa-core/src/query/processor.rs +++ b/ipa-core/src/query/processor.rs @@ -6,15 +6,12 @@ use std::{ use futures::{future::try_join, stream}; use serde::Serialize; -use super::{min_status, state::read_query_status}; +use super::min_status; use crate::{ error::Error as ProtocolError, executor::IpaRuntime, helpers::{ - query::{PrepareQuery, QueryConfig}, - routing::RouteId, - BodyStream, BroadcastError, Gateway, GatewayConfig, MpcTransportError, MpcTransportImpl, - Role, RoleAssignment, ShardTransportError, ShardTransportImpl, Transport, + query::{PrepareQuery, QueryConfig}, routing::RouteId, BodyStream, BroadcastError, Gateway, GatewayConfig, HelperResponse, MpcTransportError, MpcTransportImpl, Role, RoleAssignment, ShardTransportError, ShardTransportImpl, Transport }, hpke::{KeyRegistry, PrivateKeyOnly}, protocol::QueryId, @@ -380,8 +377,8 @@ impl Processor { if o.is_none() { return Err(QueryStatusError::NoResponse(i)); } - let other: QueryStatus = read_query_status(o.unwrap()) - .await.unwrap(); //TODO handle error + let r = HelperResponse::from_bytesstream(o.unwrap()).await?; + let other = QueryStatus::from(r); status = min_status(status, other); } @@ -541,7 +538,7 @@ mod tests { } fn shard_respond_ok(_si: ShardIndex) -> Arc> { - create_handler(|_| async { Ok(HelperResponse::ok()) }) + create_handler(|_| async { Ok(HelperResponse::from(QueryStatus::Completed)) }) } fn test_multiply_config() -> QueryConfig { @@ -647,7 +644,7 @@ mod tests { let processor = Processor::default(); let query_config = test_multiply_config(); let [t0, t1, t2] = mpc_network.transports(); - let shard_transport = shard_network.transport(HelperIdentity::ONE, ShardIndex::FIRST); + let shard_transport = shard_network.transport(HelperIdentity::ONE, ShardIndex::LEADER); TestComponents { processor, query_config, @@ -1086,13 +1083,11 @@ mod tests { #[tokio::test] async fn combined_status_response() { fn shard_handle(si: ShardIndex) -> Arc> { - const FOURTH_SHARD: ShardIndex = ShardIndex::from_u32(3); - const THIRD_SHARD: ShardIndex = ShardIndex::from_u32(2); create_handler(move |_| async move { match si { - FOURTH_SHARD => Ok(HelperResponse::from(QueryStatus::Completed)), - THIRD_SHARD => Ok(HelperResponse::from(QueryStatus::Running)), - _ =>Ok(HelperResponse::from(QueryStatus::AwaitingInputs)), + ShardIndex::FOURTH => Ok(HelperResponse::from(QueryStatus::Completed)), + ShardIndex::THIRD => Ok(HelperResponse::from(QueryStatus::Running)), + _ => Ok(HelperResponse::from(QueryStatus::AwaitingInputs)), } }) } @@ -1109,7 +1104,7 @@ mod tests { t.processor .prepare_shard( &t.shard_network - .transport(HelperIdentity::ONE, ShardIndex::from(1)), + .transport(HelperIdentity::ONE, ShardIndex::SECOND), req, ) .unwrap(); diff --git a/ipa-core/src/query/state.rs b/ipa-core/src/query/state.rs index 33822edda..38244e861 100644 --- a/ipa-core/src/query/state.rs +++ b/ipa-core/src/query/state.rs @@ -10,7 +10,7 @@ use futures::{ready, FutureExt, TryFutureExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::{ - error::BoxError, executor::IpaJoinHandle, helpers::{query::QueryConfig, BytesStream, RoleAssignment}, protocol::QueryId, query::runner::QueryResult, sync::Mutex + error::BoxError, executor::IpaJoinHandle, helpers::{query::QueryConfig, BytesStream, HelperResponse, RoleAssignment}, protocol::QueryId, query::runner::QueryResult, sync::Mutex }; /// The status of query processing @@ -50,12 +50,6 @@ impl From<&QueryState> for QueryStatus { } } -pub async fn read_query_status(value: B) -> Result { - let bytes: bytes::BytesMut = value.try_collect().await?; - let qs : QueryStatus = serde_json::from_slice(bytes.as_ref())?; - Ok(qs) -} - /// This function is used, among others, by the [`Processor`] to return a unified response when /// queried about the state of a sharded helper. In such scenarios, there will be many different /// [`QueryStatus`] and the [`Processor`] needs to return a single one that describes the entire diff --git a/ipa-core/src/sharding.rs b/ipa-core/src/sharding.rs index d1dc7fc48..b5bc1b17b 100644 --- a/ipa-core/src/sharding.rs +++ b/ipa-core/src/sharding.rs @@ -46,7 +46,13 @@ impl ShardedHelperIdentity { pub struct ShardIndex(u32); impl ShardIndex { - pub const FIRST: Self = Self(0); + /// Shard with index 0, the first, is special as its the main entry point and coordinator + /// for all shards in a helper. + pub const LEADER: Self = Self(0); + pub const FIRST: Self = ShardIndex::LEADER; + pub const SECOND: Self = Self(1); + pub const THIRD: Self = Self(2); + pub const FOURTH: Self = Self(3); /// Returns an iterator over all shard indices that precede this one, excluding this one. pub fn iter(self) -> impl Iterator { From 8afb0ab3e8471fb1d57d90b2c7b639fe7cdf7801 Mon Sep 17 00:00:00 2001 From: Christian Berkhoff Date: Wed, 8 Jan 2025 18:38:25 -0300 Subject: [PATCH 4/9] Fixed http query --- ipa-core/src/helpers/transport/mod.rs | 2 +- .../src/helpers/transport/stream/axum_body.rs | 2 +- ipa-core/src/net/client/mod.rs | 22 +++++++++++++++++-- ipa-core/src/net/server/handlers/query/mod.rs | 5 +++-- .../src/net/server/handlers/query/status.rs | 14 +++++++----- ipa-core/src/net/transport.rs | 19 ++++++++-------- 6 files changed, 42 insertions(+), 22 deletions(-) diff --git a/ipa-core/src/helpers/transport/mod.rs b/ipa-core/src/helpers/transport/mod.rs index 4d5813daa..d45996227 100644 --- a/ipa-core/src/helpers/transport/mod.rs +++ b/ipa-core/src/helpers/transport/mod.rs @@ -310,7 +310,7 @@ impl RouteParams for (RouteId, QueryId) { } #[derive(thiserror::Error, Debug)] -#[error("One or more peers rejected the request: {failures:?}")] +#[error("One or more peer shards rejected the breadcast: {failures:?}")] pub struct BroadcastError { pub failures: Vec<(I, E)>, } diff --git a/ipa-core/src/helpers/transport/stream/axum_body.rs b/ipa-core/src/helpers/transport/stream/axum_body.rs index 5560f326e..fa3121d38 100644 --- a/ipa-core/src/helpers/transport/stream/axum_body.rs +++ b/ipa-core/src/helpers/transport/stream/axum_body.rs @@ -16,7 +16,7 @@ use crate::{error::BoxError, helpers::BytesStream}; pub struct WrappedAxumBodyStream(#[pin] BodyDataStream); impl WrappedAxumBodyStream { - pub(super) fn new(b: Body) -> Self { + pub fn new(b: Body) -> Self { Self(b.into_data_stream()) } diff --git a/ipa-core/src/net/client/mod.rs b/ipa-core/src/net/client/mod.rs index fc0252b45..23dcbfcf2 100644 --- a/ipa-core/src/net/client/mod.rs +++ b/ipa-core/src/net/client/mod.rs @@ -33,8 +33,7 @@ use crate::{ }, executor::IpaRuntime, helpers::{ - query::{PrepareQuery, QueryConfig, QueryInput}, - TransportIdentity, + query::{PrepareQuery, QueryConfig, QueryInput}, BodyStream, TransportIdentity, WrappedAxumBodyStream }, net::{http_serde, Error, CRYPTO_PROVIDER}, protocol::{Gate, QueryId}, @@ -385,6 +384,25 @@ impl IpaHttpClient { resp_ok(resp).await } + pub async fn query_status_bytes( + &self, + query_id: QueryId, + ) -> Result { + let req = http_serde::query::status::Request::new(query_id); + let req = req.try_into_http_request(self.scheme.clone(), self.authority.clone())?; + + let resp = self.request(req).await?; + if resp.status().is_success() { + //let wabs = WrappedAxumBodyStream::new(resp.inner.into_body()); + let bytes = response_to_bytes(resp).await?; + let bs = BodyStream::from(bytes.to_vec()); + Ok(bs) + //Ok(response_to_bytes(resp).await?); + } else { + Err(Error::from_failed_resp(resp).await) + } + } + /// Retrieve the status of a query. /// /// ## Errors diff --git a/ipa-core/src/net/server/handlers/query/mod.rs b/ipa-core/src/net/server/handlers/query/mod.rs index 4b89d7f31..1ee2c729b 100644 --- a/ipa-core/src/net/server/handlers/query/mod.rs +++ b/ipa-core/src/net/server/handlers/query/mod.rs @@ -36,9 +36,9 @@ pub fn query_router(transport: MpcHttpTransport) -> Router { Router::new() .merge(create::router(transport.clone())) .merge(input::router(transport.clone())) - .merge(status::router(transport.clone())) .merge(kill::router(transport.clone())) - .merge(results::router(transport.inner_transport)) + .merge(results::router(Arc::clone(&transport.inner_transport))) + .merge(status::router(transport.inner_transport)) } /// Construct router for helper-to-helper communications @@ -60,6 +60,7 @@ pub fn s2s_router(transport: Arc>) -> Router { Router::new() .merge(step::router(Arc::clone(&transport))) .merge(prepare::router(Arc::clone(&transport))) + .merge(status::router(Arc::clone(&transport))) .merge(results::router(Arc::clone(&transport))) .layer(layer_fn(HelperAuthentication::<_, Shard>::new)) } diff --git a/ipa-core/src/net/server/handlers/query/status.rs b/ipa-core/src/net/server/handlers/query/status.rs index 0056b76d0..3cd89254f 100644 --- a/ipa-core/src/net/server/handlers/query/status.rs +++ b/ipa-core/src/net/server/handlers/query/status.rs @@ -6,25 +6,27 @@ use crate::{ net::{ http_serde::query::status::{self, Request}, server::Error, - transport::MpcHttpTransport, + ConnectionFlavor, HttpTransport, + server::ClientIdentity, }, protocol::QueryId, + sync::Arc, }; -async fn handler( - transport: Extension, +async fn handler( + transport: Extension>>, Path(query_id): Path, ) -> Result, Error> { let req = Request { query_id }; - match transport.dispatch(req, BodyStream::empty()).await { + match Arc::clone(&transport).dispatch(req, BodyStream::empty()).await { Ok(state) => Ok(Json(status::ResponseBody::from(state))), Err(e) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, e)), } } -pub fn router(transport: MpcHttpTransport) -> Router { +pub fn router(transport: Arc>) -> Router { Router::new() - .route(status::AXUM_PATH, get(handler)) + .route(status::AXUM_PATH, get(handler::)) .layer(Extension(transport)) } diff --git a/ipa-core/src/net/transport.rs b/ipa-core/src/net/transport.rs index 24530d7b8..4e04630b9 100644 --- a/ipa-core/src/net/transport.rs +++ b/ipa-core/src/net/transport.rs @@ -116,8 +116,8 @@ impl HttpTransport { RouteId::QueryStatus => { let query_id = >::from(route.query_id()) .expect("query_id is required to call complete query API"); - self.clients[client_ix].query_status(query_id).await?; - Ok(None) + let response = self.clients[client_ix].query_status_bytes(query_id).await?; + Ok(Some(response)) } evt @ (RouteId::QueryInput | RouteId::ReceiveQuery @@ -421,21 +421,16 @@ mod tests { use super::*; use crate::{ - ff::{boolean_array::BA64, FieldType, Fp31, Serializable}, - helpers::{ + ff::{boolean_array::BA64, FieldType, Fp31, Serializable}, helpers::{ make_owned_handler, query::{ QueryInput, QueryType::{TestMultiply, TestShardedShuffle}, }, - }, - net::{ + }, net::{ client::ClientIdentity, test::{TestConfig, TestConfigBuilder, TestServer}, - }, - secret_sharing::{replicated::semi_honest::AdditiveShare, IntoShares}, - test_fixture::Reconstruct, - HelperApp, + }, query::QueryStatus, secret_sharing::{replicated::semi_honest::AdditiveShare, IntoShares}, test_fixture::Reconstruct, HelperApp }; static STEP: Lazy = Lazy::new(|| Gate::from("http-transport")); @@ -642,6 +637,8 @@ mod tests { .collect::>() }); + assert_eq!(leader_client.query_status(QueryId).await.unwrap(), QueryStatus::AwaitingInputs); + let _ = try_join_all(helper_shares.into_iter().enumerate().map( |(helper, shard_streams)| async move { @@ -659,6 +656,8 @@ mod tests { .await .unwrap(); + assert_eq!(leader_client.query_status(QueryId).await.unwrap(), QueryStatus::Running); + let result: [_; 3] = join_all(leader_ring_clients.each_ref().map(|client| async move { let r = client.query_results(query_id).await.unwrap(); AdditiveShare::::from_byte_slice_unchecked(&r).collect::>() From fe5064b6f4e54368afb0006e24c683b023b6ed20 Mon Sep 17 00:00:00 2001 From: Christian Berkhoff Date: Thu, 9 Jan 2025 12:02:05 -0300 Subject: [PATCH 5/9] Just send --- ipa-core/src/helpers/gateway/transport.rs | 4 ++-- .../helpers/transport/in_memory/transport.rs | 2 +- ipa-core/src/helpers/transport/mod.rs | 23 +------------------ ipa-core/src/net/transport.rs | 4 ++-- 4 files changed, 6 insertions(+), 27 deletions(-) diff --git a/ipa-core/src/helpers/gateway/transport.rs b/ipa-core/src/helpers/gateway/transport.rs index cb98e07f9..7a870bdca 100644 --- a/ipa-core/src/helpers/gateway/transport.rs +++ b/ipa-core/src/helpers/gateway/transport.rs @@ -51,7 +51,7 @@ impl Transport for RoleResolvingTransport { self.inner.peer_count() } - async fn send_and_receive< + async fn send< D: Stream> + Send + 'static, Q: QueryIdBinding, S: StepBinding, @@ -73,7 +73,7 @@ impl Transport for RoleResolvingTransport { "can't send message to itself" ); self.inner - .send_and_receive(dest_helper, route, data) + .send(dest_helper, route, data) .await .map_err(|e| SendToRoleError(dest, e)) } diff --git a/ipa-core/src/helpers/transport/in_memory/transport.rs b/ipa-core/src/helpers/transport/in_memory/transport.rs index 2fc2c7f9b..34b48ede7 100644 --- a/ipa-core/src/helpers/transport/in_memory/transport.rs +++ b/ipa-core/src/helpers/transport/in_memory/transport.rs @@ -174,7 +174,7 @@ impl Transport for Weak> { .into_iter() } - async fn send_and_receive< + async fn send< D: Stream> + Send + 'static, Q: QueryIdBinding, S: StepBinding, diff --git a/ipa-core/src/helpers/transport/mod.rs b/ipa-core/src/helpers/transport/mod.rs index d45996227..2395951b8 100644 --- a/ipa-core/src/helpers/transport/mod.rs +++ b/ipa-core/src/helpers/transport/mod.rs @@ -350,27 +350,6 @@ pub trait Transport: Clone + Send + Sync + 'static { dest: Self::Identity, route: R, data: D, - ) -> Result<(), Self::Error> - where - Option: From, - Option: From, - Q: QueryIdBinding, - S: StepBinding, - R: RouteParams, - D: Stream> + Send + 'static - { - self.send_and_receive(dest, route, data).await?; - Ok(()) // Todo, error if data found - } - - /// Sends a new request to the given destination helper party. - /// Depending on the specific request, it may or may not require acknowledgment by the remote - /// party - async fn send_and_receive( - &self, - dest: Self::Identity, - route: R, - data: D, ) -> Result, Self::Error> where Option: From, @@ -404,7 +383,7 @@ pub trait Transport: Clone + Send + Sync + 'static { let mut futs = FuturesUnordered::new(); for peer_identity in self.peers() { futs.push( - Self::send_and_receive(self, peer_identity, route.clone(), futures::stream::empty()) + Self::send(self, peer_identity, route.clone(), futures::stream::empty()) .map(move |v| (peer_identity, v)), ); } diff --git a/ipa-core/src/net/transport.rs b/ipa-core/src/net/transport.rs index 4e04630b9..db4d46fd5 100644 --- a/ipa-core/src/net/transport.rs +++ b/ipa-core/src/net/transport.rs @@ -295,7 +295,7 @@ impl Transport for MpcHttpTransport { 2 } - async fn send_and_receive< + async fn send< D: Stream> + Send + 'static, Q: QueryIdBinding, S: StepBinding, @@ -374,7 +374,7 @@ impl Transport for ShardHttpTransport { u32::from(self.shard_count).saturating_sub(1) } - async fn send_and_receive( + async fn send( &self, dest: Self::Identity, route: R, From ee7760425da3533b2724f73bce39d7ae6b9911b7 Mon Sep 17 00:00:00 2001 From: Christian Berkhoff Date: Thu, 9 Jan 2025 20:54:27 -0300 Subject: [PATCH 6/9] Minor changes --- .../helpers/transport/in_memory/transport.rs | 1 - ipa-core/src/helpers/transport/query/mod.rs | 27 ------------------- ipa-core/src/net/client/mod.rs | 1 - ipa-core/src/net/transport.rs | 6 ++--- ipa-core/src/query/processor.rs | 8 +++--- ipa-core/src/sharding.rs | 8 +----- 6 files changed, 9 insertions(+), 42 deletions(-) diff --git a/ipa-core/src/helpers/transport/in_memory/transport.rs b/ipa-core/src/helpers/transport/in_memory/transport.rs index 34b48ede7..e3021fb00 100644 --- a/ipa-core/src/helpers/transport/in_memory/transport.rs +++ b/ipa-core/src/helpers/transport/in_memory/transport.rs @@ -100,7 +100,6 @@ impl InMemoryTransport { { let streams = self.record_streams.clone(); async move { - // A tuple with these 3 things is sent from send while let Some((addr, stream, ack)) = rx.recv().await { tracing::trace!("received new message: {addr:?}"); diff --git a/ipa-core/src/helpers/transport/query/mod.rs b/ipa-core/src/helpers/transport/query/mod.rs index 70161c1c6..9da198625 100644 --- a/ipa-core/src/helpers/transport/query/mod.rs +++ b/ipa-core/src/helpers/transport/query/mod.rs @@ -239,33 +239,6 @@ impl Debug for QueryInput { } } -/*#[derive(Clone, Debug, Serialize, Deserialize)] -#[cfg_attr(test, derive(PartialEq, Eq))] -pub struct CompareStatusRequest { - pub query_id: QueryId, - pub status: QueryStatus, -} - -impl RouteParams for CompareStatusRequest { - type Params = String; - - fn resource_identifier(&self) -> RouteId { - RouteId::QueryStatus - } - - fn query_id(&self) -> QueryId { - self.query_id - } - - fn gate(&self) -> NoStep { - NoStep - } - - fn extra(&self) -> Self::Params { - serde_json::to_string(self).unwrap() - } -}*/ - #[derive(Copy, Clone, Debug, Serialize, Deserialize)] #[cfg_attr(test, derive(PartialEq, Eq))] pub enum QueryType { diff --git a/ipa-core/src/net/client/mod.rs b/ipa-core/src/net/client/mod.rs index 23dcbfcf2..05d641903 100644 --- a/ipa-core/src/net/client/mod.rs +++ b/ipa-core/src/net/client/mod.rs @@ -397,7 +397,6 @@ impl IpaHttpClient { let bytes = response_to_bytes(resp).await?; let bs = BodyStream::from(bytes.to_vec()); Ok(bs) - //Ok(response_to_bytes(resp).await?); } else { Err(Error::from_failed_resp(resp).await) } diff --git a/ipa-core/src/net/transport.rs b/ipa-core/src/net/transport.rs index db4d46fd5..fc1d155b9 100644 --- a/ipa-core/src/net/transport.rs +++ b/ipa-core/src/net/transport.rs @@ -69,7 +69,7 @@ impl RouteParams for QueryConfig { } impl HttpTransport { - async fn send_and_receive< + async fn send< D: Stream> + Send + 'static, Q: QueryIdBinding, S: StepBinding, @@ -310,7 +310,7 @@ impl Transport for MpcHttpTransport { Option: From, Option: From, { - self.inner_transport.send_and_receive(dest, route, data).await + self.inner_transport.send(dest, route, data).await } fn receive>( @@ -389,7 +389,7 @@ impl Transport for ShardHttpTransport { D: Stream> + Send + 'static, { self.inner_transport - .send_and_receive(dest, route, data) + .send(dest, route, data) .map_err(|source| ShardError { shard_index: self.identity(), source, diff --git a/ipa-core/src/query/processor.rs b/ipa-core/src/query/processor.rs index 1fde5ff76..25ca277e8 100644 --- a/ipa-core/src/query/processor.rs +++ b/ipa-core/src/query/processor.rs @@ -644,7 +644,7 @@ mod tests { let processor = Processor::default(); let query_config = test_multiply_config(); let [t0, t1, t2] = mpc_network.transports(); - let shard_transport = shard_network.transport(HelperIdentity::ONE, ShardIndex::LEADER); + let shard_transport = shard_network.transport(HelperIdentity::ONE, ShardIndex::FIRST); TestComponents { processor, query_config, @@ -1084,9 +1084,11 @@ mod tests { async fn combined_status_response() { fn shard_handle(si: ShardIndex) -> Arc> { create_handler(move |_| async move { + const FOURTH_SHARD: ShardIndex = ShardIndex::from_u32(3); + const THIRD_SHARD: ShardIndex = ShardIndex::from_u32(2); match si { - ShardIndex::FOURTH => Ok(HelperResponse::from(QueryStatus::Completed)), - ShardIndex::THIRD => Ok(HelperResponse::from(QueryStatus::Running)), + FOURTH_SHARD => Ok(HelperResponse::from(QueryStatus::Completed)), + THIRD_SHARD => Ok(HelperResponse::from(QueryStatus::Running)), _ => Ok(HelperResponse::from(QueryStatus::AwaitingInputs)), } }) diff --git a/ipa-core/src/sharding.rs b/ipa-core/src/sharding.rs index b5bc1b17b..d1dc7fc48 100644 --- a/ipa-core/src/sharding.rs +++ b/ipa-core/src/sharding.rs @@ -46,13 +46,7 @@ impl ShardedHelperIdentity { pub struct ShardIndex(u32); impl ShardIndex { - /// Shard with index 0, the first, is special as its the main entry point and coordinator - /// for all shards in a helper. - pub const LEADER: Self = Self(0); - pub const FIRST: Self = ShardIndex::LEADER; - pub const SECOND: Self = Self(1); - pub const THIRD: Self = Self(2); - pub const FOURTH: Self = Self(3); + pub const FIRST: Self = Self(0); /// Returns an iterator over all shard indices that precede this one, excluding this one. pub fn iter(self) -> impl Iterator { From 2889f1c9c1978f2107fd94b1f01e793fc6bc1057 Mon Sep 17 00:00:00 2001 From: Christian Berkhoff Date: Fri, 10 Jan 2025 10:07:03 -0300 Subject: [PATCH 7/9] format --- ipa-core/src/helpers/transport/handler.rs | 13 +++++------ .../helpers/transport/in_memory/transport.rs | 11 +++++----- ipa-core/src/helpers/transport/mod.rs | 5 ++++- ipa-core/src/net/client/mod.rs | 8 +++---- ipa-core/src/net/error.rs | 3 +-- ipa-core/src/net/http_serde.rs | 1 - .../src/net/server/handlers/query/status.rs | 6 +++-- ipa-core/src/net/transport.rs | 22 ++++++++++++++----- ipa-core/src/query/processor.rs | 15 ++++++++----- ipa-core/src/query/state.rs | 7 +++++- 10 files changed, 57 insertions(+), 34 deletions(-) diff --git a/ipa-core/src/helpers/transport/handler.rs b/ipa-core/src/helpers/transport/handler.rs index b50fe3901..54cd87732 100644 --- a/ipa-core/src/helpers/transport/handler.rs +++ b/ipa-core/src/helpers/transport/handler.rs @@ -1,10 +1,11 @@ use std::{fmt::Debug, future::Future, marker::PhantomData}; use async_trait::async_trait; +use futures_util::TryStreamExt; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::json; -use futures_util::TryStreamExt; +use super::BytesStream; use crate::{ error::BoxError, helpers::{ @@ -18,8 +19,6 @@ use crate::{ sync::{Arc, Mutex, Weak}, }; -use super::BytesStream; - /// Represents some response sent from MPC helper acting on a given request. It is rudimental now /// because we sent everything as HTTP body, but it could evolve. /// @@ -119,7 +118,9 @@ impl HelperResponse { pub async fn from_bytesstream(value: B) -> Result { let bytes: bytes::BytesMut = value.try_collect().await?; - Ok(Self { body: bytes.to_vec() }) + Ok(Self { + body: bytes.to_vec(), + }) } } @@ -143,9 +144,7 @@ struct QueryStatusResponse { impl From for HelperResponse { fn from(value: QueryStatus) -> Self { - let response = QueryStatusResponse { - status: value - }; + let response = QueryStatusResponse { status: value }; let v = serde_json::to_vec(&response).unwrap(); Self { body: v } } diff --git a/ipa-core/src/helpers/transport/in_memory/transport.rs b/ipa-core/src/helpers/transport/in_memory/transport.rs index e3021fb00..8f26d07bb 100644 --- a/ipa-core/src/helpers/transport/in_memory/transport.rs +++ b/ipa-core/src/helpers/transport/in_memory/transport.rs @@ -220,10 +220,11 @@ impl Transport for Weak> { .map_err(|_recv_error| Error::Rejected { dest, inner: "channel closed".into(), - })?.map_err(|e| Error::Rejected { - dest, - inner: e.into(), - })?; + })? + .map_err(|e| Error::Rejected { + dest, + inner: e.into(), + })?; let body_bytes = res.into_body(); if body_bytes.is_empty() { return Ok(None); @@ -251,7 +252,7 @@ pub struct InMemoryStream { impl InMemoryStream { fn wrap_bytes(bytes: Vec) -> Self { - InMemoryStream::wrap(stream::once(async { Ok(Bytes::from(bytes))})) + InMemoryStream::wrap(stream::once(async { Ok(Bytes::from(bytes)) })) } fn wrap + Send + 'static>(value: S) -> Self { diff --git a/ipa-core/src/helpers/transport/mod.rs b/ipa-core/src/helpers/transport/mod.rs index 2395951b8..9d587205c 100644 --- a/ipa-core/src/helpers/transport/mod.rs +++ b/ipa-core/src/helpers/transport/mod.rs @@ -372,7 +372,10 @@ pub trait Transport: Clone + Send + Sync + 'static { async fn broadcast( &self, route: R, - ) -> Result)>, BroadcastError> + ) -> Result< + Vec<(Self::Identity, Option)>, + BroadcastError, + > where Option: From, Option: From, diff --git a/ipa-core/src/net/client/mod.rs b/ipa-core/src/net/client/mod.rs index 05d641903..fc663814d 100644 --- a/ipa-core/src/net/client/mod.rs +++ b/ipa-core/src/net/client/mod.rs @@ -33,7 +33,8 @@ use crate::{ }, executor::IpaRuntime, helpers::{ - query::{PrepareQuery, QueryConfig, QueryInput}, BodyStream, TransportIdentity, WrappedAxumBodyStream + query::{PrepareQuery, QueryConfig, QueryInput}, + BodyStream, TransportIdentity, WrappedAxumBodyStream, }, net::{http_serde, Error, CRYPTO_PROVIDER}, protocol::{Gate, QueryId}, @@ -384,10 +385,7 @@ impl IpaHttpClient { resp_ok(resp).await } - pub async fn query_status_bytes( - &self, - query_id: QueryId, - ) -> Result { + pub async fn query_status_bytes(&self, query_id: QueryId) -> Result { let req = http_serde::query::status::Request::new(query_id); let req = req.try_into_http_request(self.scheme.clone(), self.authority.clone())?; diff --git a/ipa-core/src/net/error.rs b/ipa-core/src/net/error.rs index 066d66b1f..6665f9d20 100644 --- a/ipa-core/src/net/error.rs +++ b/ipa-core/src/net/error.rs @@ -4,8 +4,7 @@ use axum::{ }; use crate::{ - error::BoxError, net::client::ResponseFromEndpoint, protocol::QueryId, - sharding::ShardIndex, + error::BoxError, net::client::ResponseFromEndpoint, protocol::QueryId, sharding::ShardIndex, }; #[derive(thiserror::Error, Debug)] diff --git a/ipa-core/src/net/http_serde.rs b/ipa-core/src/net/http_serde.rs index e8c276934..f6e095745 100644 --- a/ipa-core/src/net/http_serde.rs +++ b/ipa-core/src/net/http_serde.rs @@ -668,5 +668,4 @@ pub mod query { pub const AXUM_PATH: &str = "/:query_id/kill"; } - } diff --git a/ipa-core/src/net/server/handlers/query/status.rs b/ipa-core/src/net/server/handlers/query/status.rs index 3cd89254f..7bae0e26e 100644 --- a/ipa-core/src/net/server/handlers/query/status.rs +++ b/ipa-core/src/net/server/handlers/query/status.rs @@ -7,7 +7,6 @@ use crate::{ http_serde::query::status::{self, Request}, server::Error, ConnectionFlavor, HttpTransport, - server::ClientIdentity, }, protocol::QueryId, sync::Arc, @@ -18,7 +17,10 @@ async fn handler( Path(query_id): Path, ) -> Result, Error> { let req = Request { query_id }; - match Arc::clone(&transport).dispatch(req, BodyStream::empty()).await { + match Arc::clone(&transport) + .dispatch(req, BodyStream::empty()) + .await + { Ok(state) => Ok(Json(status::ResponseBody::from(state))), Err(e) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, e)), } diff --git a/ipa-core/src/net/transport.rs b/ipa-core/src/net/transport.rs index fc1d155b9..7e3108faf 100644 --- a/ipa-core/src/net/transport.rs +++ b/ipa-core/src/net/transport.rs @@ -421,16 +421,22 @@ mod tests { use super::*; use crate::{ - ff::{boolean_array::BA64, FieldType, Fp31, Serializable}, helpers::{ + ff::{boolean_array::BA64, FieldType, Fp31, Serializable}, + helpers::{ make_owned_handler, query::{ QueryInput, QueryType::{TestMultiply, TestShardedShuffle}, }, - }, net::{ + }, + net::{ client::ClientIdentity, test::{TestConfig, TestConfigBuilder, TestServer}, - }, query::QueryStatus, secret_sharing::{replicated::semi_honest::AdditiveShare, IntoShares}, test_fixture::Reconstruct, HelperApp + }, + query::QueryStatus, + secret_sharing::{replicated::semi_honest::AdditiveShare, IntoShares}, + test_fixture::Reconstruct, + HelperApp, }; static STEP: Lazy = Lazy::new(|| Gate::from("http-transport")); @@ -637,7 +643,10 @@ mod tests { .collect::>() }); - assert_eq!(leader_client.query_status(QueryId).await.unwrap(), QueryStatus::AwaitingInputs); + assert_eq!( + leader_client.query_status(QueryId).await.unwrap(), + QueryStatus::AwaitingInputs + ); let _ = try_join_all(helper_shares.into_iter().enumerate().map( @@ -656,7 +665,10 @@ mod tests { .await .unwrap(); - assert_eq!(leader_client.query_status(QueryId).await.unwrap(), QueryStatus::Running); + assert_eq!( + leader_client.query_status(QueryId).await.unwrap(), + QueryStatus::Running + ); let result: [_; 3] = join_all(leader_ring_clients.each_ref().map(|client| async move { let r = client.query_results(query_id).await.unwrap(); diff --git a/ipa-core/src/query/processor.rs b/ipa-core/src/query/processor.rs index 25ca277e8..1e0ff8ff1 100644 --- a/ipa-core/src/query/processor.rs +++ b/ipa-core/src/query/processor.rs @@ -11,7 +11,10 @@ use crate::{ error::Error as ProtocolError, executor::IpaRuntime, helpers::{ - query::{PrepareQuery, QueryConfig}, routing::RouteId, BodyStream, BroadcastError, Gateway, GatewayConfig, HelperResponse, MpcTransportError, MpcTransportImpl, Role, RoleAssignment, ShardTransportError, ShardTransportImpl, Transport + query::{PrepareQuery, QueryConfig}, + routing::RouteId, + BodyStream, BroadcastError, Gateway, GatewayConfig, HelperResponse, MpcTransportError, + MpcTransportImpl, Role, RoleAssignment, ShardTransportError, ShardTransportImpl, Transport, }, hpke::{KeyRegistry, PrivateKeyOnly}, protocol::QueryId, @@ -372,7 +375,9 @@ impl Processor { .get_status(query_id) .ok_or(QueryStatusError::NoSuchQuery(query_id))?; - let shard_responses = shard_transport.broadcast((RouteId::QueryStatus, query_id)).await?; + let shard_responses = shard_transport + .broadcast((RouteId::QueryStatus, query_id)) + .await?; for (i, o) in shard_responses { if o.is_none() { return Err(QueryStatusError::NoResponse(i)); @@ -1072,7 +1077,7 @@ mod tests { mod query_status { use super::*; - use crate::{protocol::QueryId}; + use crate::protocol::QueryId; /// * From the standpoint of leader shard in Helper 1 /// * On query_status @@ -1106,7 +1111,7 @@ mod tests { t.processor .prepare_shard( &t.shard_network - .transport(HelperIdentity::ONE, ShardIndex::SECOND), + .transport(HelperIdentity::ONE, ShardIndex::from_u32(1)), req, ) .unwrap(); @@ -1201,7 +1206,7 @@ mod tests { .shard_status( &t.shard_network .transport(HelperIdentity::TWO, ShardIndex::FIRST), - QueryId + QueryId ) .unwrap_err(), QueryStatusError::Leader diff --git a/ipa-core/src/query/state.rs b/ipa-core/src/query/state.rs index 38244e861..e47d05876 100644 --- a/ipa-core/src/query/state.rs +++ b/ipa-core/src/query/state.rs @@ -10,7 +10,12 @@ use futures::{ready, FutureExt, TryFutureExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::{ - error::BoxError, executor::IpaJoinHandle, helpers::{query::QueryConfig, BytesStream, HelperResponse, RoleAssignment}, protocol::QueryId, query::runner::QueryResult, sync::Mutex + error::BoxError, + executor::IpaJoinHandle, + helpers::{query::QueryConfig, BytesStream, HelperResponse, RoleAssignment}, + protocol::QueryId, + query::runner::QueryResult, + sync::Mutex, }; /// The status of query processing From 967fdcad7b3ae10d02fbdc24d3ff50f2b084247e Mon Sep 17 00:00:00 2001 From: Christian Berkhoff Date: Fri, 10 Jan 2025 12:48:59 -0300 Subject: [PATCH 8/9] clippy --- ipa-core/src/helpers/transport/handler.rs | 5 ++ ipa-core/src/helpers/transport/query/mod.rs | 1 - .../src/helpers/transport/stream/axum_body.rs | 1 + ipa-core/src/net/client/mod.rs | 46 ++++++++++--------- ipa-core/src/net/server/handlers/query/mod.rs | 2 +- ipa-core/src/query/state.rs | 5 +- 6 files changed, 33 insertions(+), 27 deletions(-) diff --git a/ipa-core/src/helpers/transport/handler.rs b/ipa-core/src/helpers/transport/handler.rs index 54cd87732..130e752e8 100644 --- a/ipa-core/src/helpers/transport/handler.rs +++ b/ipa-core/src/helpers/transport/handler.rs @@ -116,6 +116,11 @@ impl HelperResponse { serde_json::from_slice(&self.body) } + /// Asynchronously collects and returns a newly created `HelperResponse`. + /// + /// # Errors + /// + /// If the `BytesStream` cannot be collected into a `BytesMut`, an error is returned. pub async fn from_bytesstream(value: B) -> Result { let bytes: bytes::BytesMut = value.try_collect().await?; Ok(Self { diff --git a/ipa-core/src/helpers/transport/query/mod.rs b/ipa-core/src/helpers/transport/query/mod.rs index 9da198625..08cc4d4e3 100644 --- a/ipa-core/src/helpers/transport/query/mod.rs +++ b/ipa-core/src/helpers/transport/query/mod.rs @@ -15,7 +15,6 @@ use crate::{ RoleAssignment, RouteParams, }, protocol::QueryId, - query::QueryStatus, }; #[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize)] diff --git a/ipa-core/src/helpers/transport/stream/axum_body.rs b/ipa-core/src/helpers/transport/stream/axum_body.rs index fa3121d38..234eba88e 100644 --- a/ipa-core/src/helpers/transport/stream/axum_body.rs +++ b/ipa-core/src/helpers/transport/stream/axum_body.rs @@ -16,6 +16,7 @@ use crate::{error::BoxError, helpers::BytesStream}; pub struct WrappedAxumBodyStream(#[pin] BodyDataStream); impl WrappedAxumBodyStream { + #[must_use] pub fn new(b: Body) -> Self { Self(b.into_data_stream()) } diff --git a/ipa-core/src/net/client/mod.rs b/ipa-core/src/net/client/mod.rs index fc663814d..36a4c3fe6 100644 --- a/ipa-core/src/net/client/mod.rs +++ b/ipa-core/src/net/client/mod.rs @@ -34,7 +34,7 @@ use crate::{ executor::IpaRuntime, helpers::{ query::{PrepareQuery, QueryConfig, QueryInput}, - BodyStream, TransportIdentity, WrappedAxumBodyStream, + BodyStream, TransportIdentity, }, net::{http_serde, Error, CRYPTO_PROVIDER}, protocol::{Gate, QueryId}, @@ -385,41 +385,43 @@ impl IpaHttpClient { resp_ok(resp).await } - pub async fn query_status_bytes(&self, query_id: QueryId) -> Result { + /// Sends a query status request and returns the response bytes. + /// + /// # Errors + /// If the request has illegal arguments, or fails to deliver to helper + async fn query_status_impl(&self, query_id: QueryId) -> Result { let req = http_serde::query::status::Request::new(query_id); let req = req.try_into_http_request(self.scheme.clone(), self.authority.clone())?; - let resp = self.request(req).await?; if resp.status().is_success() { - //let wabs = WrappedAxumBodyStream::new(resp.inner.into_body()); - let bytes = response_to_bytes(resp).await?; - let bs = BodyStream::from(bytes.to_vec()); - Ok(bs) + Ok(response_to_bytes(resp).await?) } else { Err(Error::from_failed_resp(resp).await) } } - - /// Retrieve the status of a query. + /// Retrieves the status of a query as a byte stream. /// - /// ## Errors + /// This function calls `query_status_impl` and returns the response bytes as a `BodyStream`. + /// + /// # Errors + /// If the request has illegal arguments, or fails to deliver to helper + pub async fn query_status_bytes(&self, query_id: QueryId) -> Result { + let bytes = self.query_status_impl(query_id).await?; + Ok(BodyStream::from(bytes.to_vec())) + } + /// Retrieves the status of a query. + /// + /// This function calls `query_status_impl` and deserializes the response bytes into a `QueryStatus` struct. + /// + /// # Errors /// If the request has illegal arguments, or fails to deliver to helper pub async fn query_status( &self, query_id: QueryId, ) -> Result { - let req = http_serde::query::status::Request::new(query_id); - let req = req.try_into_http_request(self.scheme.clone(), self.authority.clone())?; - - let resp = self.request(req).await?; - if resp.status().is_success() { - let bytes = response_to_bytes(resp).await?; - let http_serde::query::status::ResponseBody { status } = - serde_json::from_slice(&bytes)?; - Ok(status) - } else { - Err(Error::from_failed_resp(resp).await) - } + let bytes = self.query_status_impl(query_id).await?; + let http_serde::query::status::ResponseBody { status } = serde_json::from_slice(&bytes)?; + Ok(status) } } diff --git a/ipa-core/src/net/server/handlers/query/mod.rs b/ipa-core/src/net/server/handlers/query/mod.rs index 1ee2c729b..70d59db76 100644 --- a/ipa-core/src/net/server/handlers/query/mod.rs +++ b/ipa-core/src/net/server/handlers/query/mod.rs @@ -61,7 +61,7 @@ pub fn s2s_router(transport: Arc>) -> Router { .merge(step::router(Arc::clone(&transport))) .merge(prepare::router(Arc::clone(&transport))) .merge(status::router(Arc::clone(&transport))) - .merge(results::router(Arc::clone(&transport))) + .merge(results::router(transport)) .layer(layer_fn(HelperAuthentication::<_, Shard>::new)) } diff --git a/ipa-core/src/query/state.rs b/ipa-core/src/query/state.rs index e47d05876..148a40565 100644 --- a/ipa-core/src/query/state.rs +++ b/ipa-core/src/query/state.rs @@ -6,13 +6,12 @@ use std::{ }; use ::tokio::sync::oneshot::{error::TryRecvError, Receiver}; -use futures::{ready, FutureExt, TryFutureExt, TryStreamExt}; +use futures::{ready, FutureExt}; use serde::{Deserialize, Serialize}; use crate::{ - error::BoxError, executor::IpaJoinHandle, - helpers::{query::QueryConfig, BytesStream, HelperResponse, RoleAssignment}, + helpers::{query::QueryConfig, RoleAssignment}, protocol::QueryId, query::runner::QueryResult, sync::Mutex, From c7927c6b589482470e474c5a3c6055ecfc2cb31e Mon Sep 17 00:00:00 2001 From: Christian Berkhoff Date: Fri, 10 Jan 2025 13:46:44 -0300 Subject: [PATCH 9/9] typo --- ipa-core/src/helpers/transport/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ipa-core/src/helpers/transport/mod.rs b/ipa-core/src/helpers/transport/mod.rs index 9d587205c..7cfe9a39f 100644 --- a/ipa-core/src/helpers/transport/mod.rs +++ b/ipa-core/src/helpers/transport/mod.rs @@ -310,7 +310,7 @@ impl RouteParams for (RouteId, QueryId) { } #[derive(thiserror::Error, Debug)] -#[error("One or more peer shards rejected the breadcast: {failures:?}")] +#[error("One or more peer shards rejected the broadcast request: {failures:?}")] pub struct BroadcastError { pub failures: Vec<(I, E)>, } @@ -325,7 +325,9 @@ impl From> for BroadcastError #[async_trait] pub trait Transport: Clone + Send + Sync + 'static { type Identity: TransportIdentity; + /// They type used by [`receive`]. type RecordsStream: BytesStream; + /// The type used for responses to [`send`] and [`broadcast`]. type SendResponse: BytesStream; type Error: Debug + Send;