diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 9c468507e22..f6acff2877b 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -37,6 +37,11 @@ service SearchService { /// This methods takes `PartialHit`s and returns `Hit`s. rpc FetchDocs(FetchDocsRequest) returns (FetchDocsResponse); + // Streams document contents from the document store. + // This method takes `PartialHit`s and streams back `LeafHit`s in batches + // to avoid hitting gRPC message size limits. + rpc StreamFetchDocs(FetchDocsRequest) returns (stream FetchDocsResponse); + // Root list terms API. // This RPC identifies the set of splits on which the query should run on, // and dispatches the several calls to `LeafListTerms`. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 991ca8d988a..b2dfd344806 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -893,6 +893,35 @@ pub mod search_service_client { .insert(GrpcMethod::new("quickwit.search.SearchService", "FetchDocs")); self.inner.unary(req, path, codec).await } + /// Streams document contents from the document store. + /// This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// to avoid hitting gRPC message size limits. + pub async fn stream_fetch_docs( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.search.SearchService/StreamFetchDocs", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("quickwit.search.SearchService", "StreamFetchDocs"), + ); + self.inner.server_streaming(req, path, codec).await + } /// Root list terms API. /// This RPC identifies the set of splits on which the query should run on, /// and dispatches the several calls to `LeafListTerms`. @@ -1174,6 +1203,22 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; + /// Server streaming response type for the StreamFetchDocs method. + type StreamFetchDocsStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// Streams document contents from the document store. + /// This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// to avoid hitting gRPC message size limits. + async fn stream_fetch_docs( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Root list terms API. /// This RPC identifies the set of splits on which the query should run on, /// and dispatches the several calls to `LeafListTerms`. @@ -1458,6 +1503,53 @@ pub mod search_service_server { }; Box::pin(fut) } + "/quickwit.search.SearchService/StreamFetchDocs" => { + #[allow(non_camel_case_types)] + struct StreamFetchDocsSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::ServerStreamingService + for StreamFetchDocsSvc { + type Response = super::FetchDocsResponse; + type ResponseStream = T::StreamFetchDocsStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::stream_fetch_docs(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = StreamFetchDocsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.search.SearchService/RootListTerms" => { #[allow(non_camel_case_types)] struct RootListTermsSvc(pub Arc); diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 194bf0b2bd0..f5026020739 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -152,11 +152,28 @@ impl SearchServiceClient { match &mut self.client_impl { SearchServiceClientImpl::Grpc(grpc_client) => { let tonic_request = Request::new(request); + // let nb_docs_fetched = request.partial_hits.len(); + + // get all in one shot let tonic_response = grpc_client .fetch_docs(tonic_request) .await .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; Ok(tonic_response.into_inner()) + + // stream in batches (activate once this was deployed once) + // let all_hits = grpc_client + // .stream_fetch_docs(tonic_request) + // .await + // .map_err(|tonic_error| parse_grpc_error(&tonic_error))? + // .into_inner() + // .map_err(|tonic_error| parse_grpc_error(&tonic_error)) + // .try_fold(Vec::with_capacity(nb_docs_fetched), |mut acc, response| async move + // { acc.extend(response.hits); + // Ok(acc) + // }) + // .await?; + // Ok(quickwit_proto::search::FetchDocsResponse { hits: all_hits }) } SearchServiceClientImpl::Local(service) => service.fetch_docs(request).await, } diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index c5250ee2465..463c5ca33b5 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -15,15 +15,18 @@ use std::sync::Arc; use async_trait::async_trait; +use futures::stream::{self, StreamExt}; use quickwit_proto::error::convert_to_grpc_result; use quickwit_proto::search::{ GetKvRequest, GetKvResponse, LeafListFieldsRequest, ListFieldsRequest, ListFieldsResponse, ReportSplitsRequest, ReportSplitsResponse, search_service_server as grpc, }; -use quickwit_proto::{set_parent_span_from_request_metadata, tonic}; +use quickwit_proto::{GrpcServiceError, set_parent_span_from_request_metadata, tonic}; use quickwit_search::SearchService; use tracing::instrument; +const FETCH_DOCS_BATCH_SIZE: usize = 500; + #[derive(Clone)] pub struct GrpcSearchAdapter(Arc); @@ -68,6 +71,41 @@ impl grpc::SearchService for GrpcSearchAdapter { convert_to_grpc_result(fetch_docs_result) } + type StreamFetchDocsStream = + quickwit_proto::tonic::codegen::BoxStream; + + #[instrument(skip(self, request))] + async fn stream_fetch_docs( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + set_parent_span_from_request_metadata(request.metadata()); + let fetch_docs_request = request.into_inner(); + + // Call the regular fetch_docs method + let fetch_docs_result = self.0.fetch_docs(fetch_docs_request).await; + + let fetch_docs_response = match fetch_docs_result { + Ok(response) => response, + Err(err) => return Err(err.into_grpc_status()), + }; + + // If there is only one batch, return it directly to avoid copying to a new vec. + if fetch_docs_response.hits.len() <= FETCH_DOCS_BATCH_SIZE { + let batch = quickwit_proto::search::FetchDocsResponse { + hits: fetch_docs_response.hits, + }; + let batch_stream = stream::iter([Ok(batch)]); + return Ok(tonic::Response::new(Box::pin(batch_stream))); + } + + let batch_stream = stream::iter(fetch_docs_response.hits) + .chunks(FETCH_DOCS_BATCH_SIZE) + .map(|batch| Ok(quickwit_proto::search::FetchDocsResponse { hits: batch })); + + Ok(tonic::Response::new(Box::pin(batch_stream))) + } + #[instrument(skip(self, request))] async fn root_list_terms( &self,