From 16c28d5edb35c41e99c19a2f4a92ea3d82340135 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 7 May 2026 22:58:27 +0200 Subject: [PATCH 1/4] Fetch docs in batches --- .../protos/quickwit/search.proto | 5 + .../src/codegen/quickwit/quickwit.search.rs | 92 +++++++++++++++++++ quickwit/quickwit-search/src/client.rs | 25 ++++- .../src/search_api/grpc_adapter.rs | 49 +++++++++- 4 files changed, 166 insertions(+), 5 deletions(-) diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 9c468507e22..fc5870f57ca 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -37,6 +37,11 @@ service SearchService { /// This methods takes `PartialHit`s and returns `Hit`s. rpc FetchDocs(FetchDocsRequest) returns (FetchDocsResponse); + /// Streams document contents from the document store. + /// This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// to avoid hitting gRPC message size limits. + rpc StreamFetchDocs(FetchDocsRequest) returns (stream FetchDocsResponse); + // Root list terms API. // This RPC identifies the set of splits on which the query should run on, // and dispatches the several calls to `LeafListTerms`. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 991ca8d988a..5a8b04835b2 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -893,6 +893,35 @@ pub mod search_service_client { .insert(GrpcMethod::new("quickwit.search.SearchService", "FetchDocs")); self.inner.unary(req, path, codec).await } + /// / Streams document contents from the document store. + /// / This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// / to avoid hitting gRPC message size limits. + pub async fn stream_fetch_docs( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.search.SearchService/StreamFetchDocs", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("quickwit.search.SearchService", "StreamFetchDocs"), + ); + self.inner.server_streaming(req, path, codec).await + } /// Root list terms API. /// This RPC identifies the set of splits on which the query should run on, /// and dispatches the several calls to `LeafListTerms`. @@ -1174,6 +1203,22 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; + /// Server streaming response type for the StreamFetchDocs method. + type StreamFetchDocsStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// / Streams document contents from the document store. + /// / This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// / to avoid hitting gRPC message size limits. + async fn stream_fetch_docs( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Root list terms API. /// This RPC identifies the set of splits on which the query should run on, /// and dispatches the several calls to `LeafListTerms`. @@ -1458,6 +1503,53 @@ pub mod search_service_server { }; Box::pin(fut) } + "/quickwit.search.SearchService/StreamFetchDocs" => { + #[allow(non_camel_case_types)] + struct StreamFetchDocsSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::ServerStreamingService + for StreamFetchDocsSvc { + type Response = super::FetchDocsResponse; + type ResponseStream = T::StreamFetchDocsStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::stream_fetch_docs(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = StreamFetchDocsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.search.SearchService/RootListTerms" => { #[allow(non_camel_case_types)] struct RootListTermsSvc(pub Arc); diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 194bf0b2bd0..745a4620912 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::Duration; use bytesize::ByteSize; +use futures::TryStreamExt; use http::Uri; use quickwit_proto::search::{GetKvRequest, PutKvRequest, ReportSplitsRequest}; use quickwit_proto::tonic::Request; @@ -152,11 +153,27 @@ impl SearchServiceClient { match &mut self.client_impl { SearchServiceClientImpl::Grpc(grpc_client) => { let tonic_request = Request::new(request); - let tonic_response = grpc_client - .fetch_docs(tonic_request) + + // // get all in one shot + // let tonic_response = grpc_client + // .fetch_docs(tonic_request) + // .await + // .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; + // Ok(tonic_response.into_inner()) + + // stream in batches + let all_hits = grpc_client + .stream_fetch_docs(tonic_request) .await - .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; - Ok(tonic_response.into_inner()) + .map_err(|tonic_error| parse_grpc_error(&tonic_error))? + .into_inner() + .map_err(|tonic_error| parse_grpc_error(&tonic_error)) + .try_fold(Vec::new(), |mut acc, response| async move { + acc.extend(response.hits); + Ok(acc) + }) + .await?; + Ok(quickwit_proto::search::FetchDocsResponse { hits: all_hits }) } SearchServiceClientImpl::Local(service) => service.fetch_docs(request).await, } diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index c5250ee2465..7943affddc6 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -15,15 +15,18 @@ use std::sync::Arc; use async_trait::async_trait; +use futures::stream; use quickwit_proto::error::convert_to_grpc_result; use quickwit_proto::search::{ GetKvRequest, GetKvResponse, LeafListFieldsRequest, ListFieldsRequest, ListFieldsResponse, ReportSplitsRequest, ReportSplitsResponse, search_service_server as grpc, }; -use quickwit_proto::{set_parent_span_from_request_metadata, tonic}; +use quickwit_proto::{GrpcServiceError, set_parent_span_from_request_metadata, tonic}; use quickwit_search::SearchService; use tracing::instrument; +const FETCH_DOCS_BATCH_SIZE: usize = 100; + #[derive(Clone)] pub struct GrpcSearchAdapter(Arc); @@ -68,6 +71,50 @@ impl grpc::SearchService for GrpcSearchAdapter { convert_to_grpc_result(fetch_docs_result) } + type StreamFetchDocsStream = + quickwit_proto::tonic::codegen::BoxStream; + + #[instrument(skip(self, request))] + async fn stream_fetch_docs( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + set_parent_span_from_request_metadata(request.metadata()); + let fetch_docs_request = request.into_inner(); + + // Call the regular fetch_docs method + let fetch_docs_result = self.0.fetch_docs(fetch_docs_request).await; + + if let Err(err) = fetch_docs_result { + return Err(err.into_grpc_status()); + } + + let fetch_docs_response = match fetch_docs_result { + Ok(response) => response, + Err(err) => return Err(err.into_grpc_status()), + }; + + let batches = if fetch_docs_response.hits.is_empty() { + vec![Ok(quickwit_proto::search::FetchDocsResponse { + hits: vec![], + })] + } else { + fetch_docs_response + .hits + .chunks(FETCH_DOCS_BATCH_SIZE) + .map(|chunk| { + Ok(quickwit_proto::search::FetchDocsResponse { + hits: chunk.to_vec(), + }) + }) + .collect() + }; + + let grpc_stream = stream::iter(batches); + + Ok(tonic::Response::new(Box::pin(grpc_stream))) + } + #[instrument(skip(self, request))] async fn root_list_terms( &self, From d6b18680c796e057f4b86d515e6e202f2167e6df Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Fri, 8 May 2026 09:46:20 +0200 Subject: [PATCH 2/4] Less bloated GRPC implem --- .../protos/quickwit/search.proto | 6 ++-- .../src/search_api/grpc_adapter.rs | 29 +++++++------------ 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index fc5870f57ca..f6acff2877b 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -37,9 +37,9 @@ service SearchService { /// This methods takes `PartialHit`s and returns `Hit`s. rpc FetchDocs(FetchDocsRequest) returns (FetchDocsResponse); - /// Streams document contents from the document store. - /// This method takes `PartialHit`s and streams back `LeafHit`s in batches - /// to avoid hitting gRPC message size limits. + // Streams document contents from the document store. + // This method takes `PartialHit`s and streams back `LeafHit`s in batches + // to avoid hitting gRPC message size limits. rpc StreamFetchDocs(FetchDocsRequest) returns (stream FetchDocsResponse); // Root list terms API. diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index 7943affddc6..b180bc480a1 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::stream; +use itertools::Itertools; use quickwit_proto::error::convert_to_grpc_result; use quickwit_proto::search::{ GetKvRequest, GetKvResponse, LeafListFieldsRequest, ListFieldsRequest, ListFieldsResponse, @@ -85,30 +86,22 @@ impl grpc::SearchService for GrpcSearchAdapter { // Call the regular fetch_docs method let fetch_docs_result = self.0.fetch_docs(fetch_docs_request).await; - if let Err(err) = fetch_docs_result { - return Err(err.into_grpc_status()); - } - let fetch_docs_response = match fetch_docs_result { Ok(response) => response, Err(err) => return Err(err.into_grpc_status()), }; - let batches = if fetch_docs_response.hits.is_empty() { - vec![Ok(quickwit_proto::search::FetchDocsResponse { - hits: vec![], - })] - } else { - fetch_docs_response - .hits - .chunks(FETCH_DOCS_BATCH_SIZE) - .map(|chunk| { - Ok(quickwit_proto::search::FetchDocsResponse { - hits: chunk.to_vec(), - }) + let batches: Vec<_> = fetch_docs_response + .hits + .into_iter() + .chunks(FETCH_DOCS_BATCH_SIZE) + .into_iter() + .map(|chunk| { + Ok(quickwit_proto::search::FetchDocsResponse { + hits: chunk.collect(), }) - .collect() - }; + }) + .collect(); let grpc_stream = stream::iter(batches); From edf5265a29862491660d7e60100793238112bcdb Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Fri, 8 May 2026 13:54:00 +0200 Subject: [PATCH 3/4] Disable call to streaming fetch docs --- .../src/codegen/quickwit/quickwit.search.rs | 12 +++--- quickwit/quickwit-search/src/client.rs | 39 +++++++++---------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 5a8b04835b2..b2dfd344806 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -893,9 +893,9 @@ pub mod search_service_client { .insert(GrpcMethod::new("quickwit.search.SearchService", "FetchDocs")); self.inner.unary(req, path, codec).await } - /// / Streams document contents from the document store. - /// / This method takes `PartialHit`s and streams back `LeafHit`s in batches - /// / to avoid hitting gRPC message size limits. + /// Streams document contents from the document store. + /// This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// to avoid hitting gRPC message size limits. pub async fn stream_fetch_docs( &mut self, request: impl tonic::IntoRequest, @@ -1209,9 +1209,9 @@ pub mod search_service_server { > + std::marker::Send + 'static; - /// / Streams document contents from the document store. - /// / This method takes `PartialHit`s and streams back `LeafHit`s in batches - /// / to avoid hitting gRPC message size limits. + /// Streams document contents from the document store. + /// This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// to avoid hitting gRPC message size limits. async fn stream_fetch_docs( &self, request: tonic::Request, diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 745a4620912..614b265d51b 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::time::Duration; use bytesize::ByteSize; -use futures::TryStreamExt; use http::Uri; use quickwit_proto::search::{GetKvRequest, PutKvRequest, ReportSplitsRequest}; use quickwit_proto::tonic::Request; @@ -154,26 +153,26 @@ impl SearchServiceClient { SearchServiceClientImpl::Grpc(grpc_client) => { let tonic_request = Request::new(request); - // // get all in one shot - // let tonic_response = grpc_client - // .fetch_docs(tonic_request) - // .await - // .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; - // Ok(tonic_response.into_inner()) - - // stream in batches - let all_hits = grpc_client - .stream_fetch_docs(tonic_request) + // get all in one shot + let tonic_response = grpc_client + .fetch_docs(tonic_request) .await - .map_err(|tonic_error| parse_grpc_error(&tonic_error))? - .into_inner() - .map_err(|tonic_error| parse_grpc_error(&tonic_error)) - .try_fold(Vec::new(), |mut acc, response| async move { - acc.extend(response.hits); - Ok(acc) - }) - .await?; - Ok(quickwit_proto::search::FetchDocsResponse { hits: all_hits }) + .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; + Ok(tonic_response.into_inner()) + + // stream in batches (activate once this was deployed once) + // let all_hits = grpc_client + // .stream_fetch_docs(tonic_request) + // .await + // .map_err(|tonic_error| parse_grpc_error(&tonic_error))? + // .into_inner() + // .map_err(|tonic_error| parse_grpc_error(&tonic_error)) + // .try_fold(Vec::new(), |mut acc, response| async move { + // acc.extend(response.hits); + // Ok(acc) + // }) + // .await?; + // Ok(quickwit_proto::search::FetchDocsResponse { hits: all_hits }) } SearchServiceClientImpl::Local(service) => service.fetch_docs(request).await, } From 81496a2ee5a2d5a20c3c923e0ffea7d74d6e8030 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Sun, 10 May 2026 16:28:24 +0200 Subject: [PATCH 4/4] Avoid yet another allocation --- quickwit/quickwit-search/src/client.rs | 5 ++-- .../src/search_api/grpc_adapter.rs | 30 +++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 614b265d51b..f5026020739 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -152,6 +152,7 @@ impl SearchServiceClient { match &mut self.client_impl { SearchServiceClientImpl::Grpc(grpc_client) => { let tonic_request = Request::new(request); + // let nb_docs_fetched = request.partial_hits.len(); // get all in one shot let tonic_response = grpc_client @@ -167,8 +168,8 @@ impl SearchServiceClient { // .map_err(|tonic_error| parse_grpc_error(&tonic_error))? // .into_inner() // .map_err(|tonic_error| parse_grpc_error(&tonic_error)) - // .try_fold(Vec::new(), |mut acc, response| async move { - // acc.extend(response.hits); + // .try_fold(Vec::with_capacity(nb_docs_fetched), |mut acc, response| async move + // { acc.extend(response.hits); // Ok(acc) // }) // .await?; diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index b180bc480a1..463c5ca33b5 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -15,8 +15,7 @@ use std::sync::Arc; use async_trait::async_trait; -use futures::stream; -use itertools::Itertools; +use futures::stream::{self, StreamExt}; use quickwit_proto::error::convert_to_grpc_result; use quickwit_proto::search::{ GetKvRequest, GetKvResponse, LeafListFieldsRequest, ListFieldsRequest, ListFieldsResponse, @@ -26,7 +25,7 @@ use quickwit_proto::{GrpcServiceError, set_parent_span_from_request_metadata, to use quickwit_search::SearchService; use tracing::instrument; -const FETCH_DOCS_BATCH_SIZE: usize = 100; +const FETCH_DOCS_BATCH_SIZE: usize = 500; #[derive(Clone)] pub struct GrpcSearchAdapter(Arc); @@ -91,21 +90,20 @@ impl grpc::SearchService for GrpcSearchAdapter { Err(err) => return Err(err.into_grpc_status()), }; - let batches: Vec<_> = fetch_docs_response - .hits - .into_iter() - .chunks(FETCH_DOCS_BATCH_SIZE) - .into_iter() - .map(|chunk| { - Ok(quickwit_proto::search::FetchDocsResponse { - hits: chunk.collect(), - }) - }) - .collect(); + // If there is only one batch, return it directly to avoid copying to a new vec. + if fetch_docs_response.hits.len() <= FETCH_DOCS_BATCH_SIZE { + let batch = quickwit_proto::search::FetchDocsResponse { + hits: fetch_docs_response.hits, + }; + let batch_stream = stream::iter([Ok(batch)]); + return Ok(tonic::Response::new(Box::pin(batch_stream))); + } - let grpc_stream = stream::iter(batches); + let batch_stream = stream::iter(fetch_docs_response.hits) + .chunks(FETCH_DOCS_BATCH_SIZE) + .map(|batch| Ok(quickwit_proto::search::FetchDocsResponse { hits: batch })); - Ok(tonic::Response::new(Box::pin(grpc_stream))) + Ok(tonic::Response::new(Box::pin(batch_stream))) } #[instrument(skip(self, request))]