From 823a82ac7100af30a14fbcd22a23da317784c6f6 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 7 May 2026 17:03:57 +0200 Subject: [PATCH 1/2] feat(search): Add streaming fetch_docs RPC to avoid gRPC size limits Implements StreamFetchDocs RPC that streams document contents in batches to avoid hitting gRPC message size limits. The batching size is controlled by SCROLL_BATCH_LEN (1000 documents per batch). Changes: - Add StreamFetchDocs RPC to search.proto - Implement fetch_docs_stream() function in fetch_docs.rs - Add stream_fetch_docs() to SearchService trait - Implement stream_fetch_docs() in SearchServiceImpl - Add StreamFetchDocs handler in gRPC adapter --- .../protos/quickwit/search.proto | 5 + .../src/codegen/quickwit/quickwit.search.rs | 92 +++++++++++++++++++ quickwit/quickwit-search/src/fetch_docs.rs | 75 ++++++++++++++- quickwit/quickwit-search/src/lib.rs | 2 +- quickwit/quickwit-search/src/service.rs | 60 +++++++++++- .../src/search_api/grpc_adapter.rs | 21 +++++ 6 files changed, 252 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 9c468507e22..fc5870f57ca 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -37,6 +37,11 @@ service SearchService { /// This methods takes `PartialHit`s and returns `Hit`s. rpc FetchDocs(FetchDocsRequest) returns (FetchDocsResponse); + /// Streams document contents from the document store. + /// This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// to avoid hitting gRPC message size limits. + rpc StreamFetchDocs(FetchDocsRequest) returns (stream FetchDocsResponse); + // Root list terms API. // This RPC identifies the set of splits on which the query should run on, // and dispatches the several calls to `LeafListTerms`. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 991ca8d988a..5a8b04835b2 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -893,6 +893,35 @@ pub mod search_service_client { .insert(GrpcMethod::new("quickwit.search.SearchService", "FetchDocs")); self.inner.unary(req, path, codec).await } + /// / Streams document contents from the document store. + /// / This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// / to avoid hitting gRPC message size limits. + pub async fn stream_fetch_docs( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.search.SearchService/StreamFetchDocs", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("quickwit.search.SearchService", "StreamFetchDocs"), + ); + self.inner.server_streaming(req, path, codec).await + } /// Root list terms API. /// This RPC identifies the set of splits on which the query should run on, /// and dispatches the several calls to `LeafListTerms`. @@ -1174,6 +1203,22 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; + /// Server streaming response type for the StreamFetchDocs method. + type StreamFetchDocsStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// / Streams document contents from the document store. + /// / This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// / to avoid hitting gRPC message size limits. + async fn stream_fetch_docs( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Root list terms API. /// This RPC identifies the set of splits on which the query should run on, /// and dispatches the several calls to `LeafListTerms`. @@ -1458,6 +1503,53 @@ pub mod search_service_server { }; Box::pin(fut) } + "/quickwit.search.SearchService/StreamFetchDocs" => { + #[allow(non_camel_case_types)] + struct StreamFetchDocsSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::ServerStreamingService + for StreamFetchDocsSvc { + type Response = super::FetchDocsResponse; + type ResponseStream = T::StreamFetchDocsStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::stream_fetch_docs(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = StreamFetchDocsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/quickwit.search.SearchService/RootListTerms" => { #[allow(non_camel_case_types)] struct RootListTermsSvc(pub Arc); diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 47552473fcc..71b0f1612c0 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -13,11 +13,13 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; +use std::pin::Pin; use std::sync::Arc; use anyhow::{Context, Ok}; -use futures::{StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use itertools::Itertools; +use quickwit_common::shared_consts::SCROLL_BATCH_LEN; use quickwit_doc_mapper::DocMapper; use quickwit_proto::search::{ FetchDocsResponse, PartialHit, SnippetRequest, SplitIdAndFooterOffsets, @@ -147,6 +149,77 @@ pub async fn fetch_docs( Ok(FetchDocsResponse { hits }) } +/// Creates a stream of `FetchDocsResponse` that yields documents in batches +/// to avoid hitting gRPC message size limits. +/// +/// This is the streaming version of `fetch_docs` that processes partial hits +/// and yields them in configurable batch sizes (default: SCROLL_BATCH_LEN). +pub fn fetch_docs_stream( + searcher_context: Arc, + partial_hits: Vec, + index_storage: Arc, + splits: Vec, + doc_mapper: Arc, + snippet_request_opt: Option, +) -> Pin> + Send>> { + Box::pin( + futures::stream::once(async move { + let global_doc_addrs: Vec = partial_hits + .iter() + .map(GlobalDocAddress::from_partial_hit) + .collect(); + + let mut global_doc_addr_to_doc_json = fetch_docs_to_map( + searcher_context, + global_doc_addrs, + index_storage, + &splits, + doc_mapper, + snippet_request_opt.as_ref(), + ) + .await?; + + // Yield hits in batches to avoid gRPC size limits + let batches: Vec> = partial_hits + .chunks(SCROLL_BATCH_LEN) + .map(|partial_hits_batch| { + let hits: Vec = partial_hits_batch + .iter() + .flat_map(|partial_hit| { + let global_doc_addr = GlobalDocAddress::from_partial_hit(partial_hit); + if let Some((_, document)) = + global_doc_addr_to_doc_json.remove_entry(&global_doc_addr) + { + Some(quickwit_proto::search::LeafHit { + leaf_json: document.content_json, + partial_hit: Some(partial_hit.clone()), + leaf_snippet_json: document.snippet_json, + }) + } else { + None + } + }) + .collect(); + + if !hits.is_empty() { + Ok(FetchDocsResponse { hits }) + } else { + // Skip empty batches + Err(anyhow::anyhow!("empty batch")) + } + }) + .filter(|r| r.is_ok()) + .collect(); + + anyhow::Ok(batches) + }) + .flat_map(|result| match result { + std::result::Result::Ok(batches) => futures::stream::iter(batches), + Err(e) => futures::stream::iter(vec![Err(e)]), + }), + ) +} + // number of concurrent fetch allowed for a single split. const NUM_CONCURRENT_REQUESTS: usize = 30; diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 74266f42bf2..0514fd2b7dd 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -82,7 +82,7 @@ pub use crate::client::{ }; pub use crate::cluster_client::ClusterClient; pub use crate::error::{SearchError, parse_grpc_error}; -use crate::fetch_docs::fetch_docs; +use crate::fetch_docs::{fetch_docs, fetch_docs_stream}; pub use crate::root::{ IndexMetasForLeafSearch, SearchJob, check_all_index_metadata_found, jobs_to_leaf_request, root_search, search_plan, diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 71efc16959f..18ca7ceef86 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; +use futures::future::Either; +use futures::{Stream, StreamExt}; use quickwit_common::uri::Uri; use quickwit_config::SearcherConfig; use quickwit_doc_mapper::DocMapper; @@ -43,7 +46,7 @@ use crate::metrics_trackers::LeafSearchMetricsFuture; use crate::root::fetch_docs_phase; use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; use crate::search_permit_provider::SearchPermitProvider; -use crate::{ClusterClient, SearchError, fetch_docs, root_search, search_plan}; +use crate::{ClusterClient, SearchError, fetch_docs, fetch_docs_stream, root_search, search_plan}; #[derive(Clone)] /// The search service implementation. @@ -84,6 +87,14 @@ pub trait SearchService: 'static + Send + Sync { /// This methods takes `PartialHit`s and returns `Hit`s. async fn fetch_docs(&self, request: FetchDocsRequest) -> crate::Result; + /// Streams document contents from the document store in batches. + /// This method takes `PartialHit`s and streams back `FetchDocsResponse`s + /// to avoid hitting gRPC message size limits. + fn stream_fetch_docs( + &self, + request: FetchDocsRequest, + ) -> Pin> + Send>>; + /// Root search API. /// This RPC identifies the set of splits on which the query should run on, /// and dispatches the multiple calls to `LeafSearch`. @@ -240,6 +251,53 @@ impl SearchService for SearchServiceImpl { Ok(fetch_docs_response) } + fn stream_fetch_docs( + &self, + fetch_docs_request: FetchDocsRequest, + ) -> Pin> + Send>> { + let index_uri_result = Uri::from_str(&fetch_docs_request.index_uri); + let storage_resolver = self.storage_resolver.clone(); + let searcher_context = self.searcher_context.clone(); + let doc_mapper_str = fetch_docs_request.doc_mapper.clone(); + let partial_hits = fetch_docs_request.partial_hits; + let split_offsets = fetch_docs_request.split_offsets; + let snippet_request_opt = fetch_docs_request.snippet_request; + + Box::pin( + futures::stream::once(async move { + let index_uri = match index_uri_result { + Ok(uri) => uri, + Err(e) => return Err(SearchError::from(e)), + }; + + let storage = match storage_resolver.resolve(&index_uri).await { + Ok(s) => s, + Err(e) => return Err(SearchError::Internal(e.to_string())), + }; + + let doc_mapper = match deserialize_doc_mapper(&doc_mapper_str) { + Ok(dm) => dm, + Err(e) => return Err(e), + }; + + Ok(fetch_docs_stream( + searcher_context, + partial_hits, + storage, + split_offsets, + doc_mapper, + snippet_request_opt, + )) + }) + .flat_map(|result| match result { + std::result::Result::Ok(stream) => Either::Left( + stream.map(|r| r.map_err(|e| SearchError::Internal(e.to_string()))), + ), + Err(e) => Either::Right(futures::stream::once(async move { Err(e) })), + }), + ) + } + async fn root_list_terms( &self, list_terms_request: ListTermsRequest, diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index c5250ee2465..7da8069da42 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -68,6 +68,27 @@ impl grpc::SearchService for GrpcSearchAdapter { convert_to_grpc_result(fetch_docs_result) } + type StreamFetchDocsStream = + quickwit_proto::tonic::codegen::BoxStream; + + #[instrument(skip(self, request))] + async fn stream_fetch_docs( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + use futures::TryStreamExt; + + set_parent_span_from_request_metadata(request.metadata()); + let fetch_docs_request = request.into_inner(); + let stream = self.0.stream_fetch_docs(fetch_docs_request); + + // Convert SearchError to tonic::Status + let grpc_stream = stream + .map_err(|err| tonic::Status::internal(format!("Stream fetch docs error: {}", err))); + + Ok(tonic::Response::new(Box::pin(grpc_stream))) + } + #[instrument(skip(self, request))] async fn root_list_terms( &self, From 9a154a6ee5670686fc0ce3e2a479a4fc1b08c6fa Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 7 May 2026 17:17:58 +0200 Subject: [PATCH 2/2] Remove root RPCs from GRPC --- .../protos/quickwit/search.proto | 20 -- .../src/codegen/quickwit/quickwit.search.rs | 318 ------------------ quickwit/quickwit-search/src/client.rs | 15 - .../src/search_api/grpc_adapter.rs | 43 +-- 4 files changed, 1 insertion(+), 395 deletions(-) diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index fc5870f57ca..e4decea2ffa 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -17,13 +17,6 @@ syntax = "proto3"; package quickwit.search; service SearchService { - // Root search API. - // This RPC identifies the set of splits on which the query should run on, - // and dispatch the several calls to `LeafSearch`. - // - // It is also in charge of merging back the results. - rpc RootSearch(SearchRequest) returns (SearchResponse); - // Perform a leaf search on a given set of splits. // // It is like a regular search except that: @@ -42,13 +35,6 @@ service SearchService { /// to avoid hitting gRPC message size limits. rpc StreamFetchDocs(FetchDocsRequest) returns (stream FetchDocsResponse); - // Root list terms API. - // This RPC identifies the set of splits on which the query should run on, - // and dispatches the several calls to `LeafListTerms`. - // - // It is also in charge of merging back the results. - rpc RootListTerms(ListTermsRequest) returns (ListTermsResponse); - // Performs a leaf list terms on a given set of splits. // // It is like a regular list term except that: @@ -57,9 +43,6 @@ service SearchService { // - it should be applied on the given subset of splits rpc LeafListTerms(LeafListTermsRequest) returns (LeafListTermsResponse); - // Performs a scroll request. - rpc Scroll(ScrollRequest) returns (SearchResponse); - // gRPC request used to store a key in the local storage of the targeted node. // This RPC is used in the mini distributed immutable KV store embedded in quickwit. rpc PutKV(PutKVRequest) returns (PutKVResponse); @@ -73,9 +56,6 @@ service SearchService { rpc ListFields(ListFieldsRequest) returns (ListFieldsResponse); rpc LeafListFields(LeafListFieldsRequest) returns (ListFieldsResponse); - - // Describe how a search would be processed. - rpc SearchPlan(SearchRequest) returns (SearchPlanResponse); } /// Scroll Request diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 5a8b04835b2..71724a893f5 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -809,32 +809,6 @@ pub mod search_service_client { self.inner = self.inner.max_encoding_message_size(limit); self } - /// Root search API. - /// This RPC identifies the set of splits on which the query should run on, - /// and dispatch the several calls to `LeafSearch`. - /// - /// It is also in charge of merging back the results. - pub async fn root_search( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic_prost::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.search.SearchService/RootSearch", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("quickwit.search.SearchService", "RootSearch")); - self.inner.unary(req, path, codec).await - } /// Perform a leaf search on a given set of splits. /// /// It is like a regular search except that: @@ -922,37 +896,6 @@ pub mod search_service_client { ); self.inner.server_streaming(req, path, codec).await } - /// Root list terms API. - /// This RPC identifies the set of splits on which the query should run on, - /// and dispatches the several calls to `LeafListTerms`. - /// - /// It is also in charge of merging back the results. - pub async fn root_list_terms( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic_prost::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.search.SearchService/RootListTerms", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new("quickwit.search.SearchService", "RootListTerms"), - ); - self.inner.unary(req, path, codec).await - } /// Performs a leaf list terms on a given set of splits. /// /// It is like a regular list term except that: @@ -986,28 +929,6 @@ pub mod search_service_client { ); self.inner.unary(req, path, codec).await } - /// Performs a scroll request. - pub async fn scroll( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic_prost::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.search.SearchService/Scroll", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("quickwit.search.SearchService", "Scroll")); - self.inner.unary(req, path, codec).await - } /// gRPC request used to store a key in the local storage of the targeted node. /// This RPC is used in the mini distributed immutable KV store embedded in quickwit. pub async fn put_kv( @@ -1130,31 +1051,6 @@ pub mod search_service_client { ); self.inner.unary(req, path, codec).await } - /// Describe how a search would be processed. - pub async fn search_plan( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic_prost::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.search.SearchService/SearchPlan", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("quickwit.search.SearchService", "SearchPlan")); - self.inner.unary(req, path, codec).await - } } } /// Generated server implementations. @@ -1170,15 +1066,6 @@ pub mod search_service_server { /// Generated trait containing gRPC methods that should be implemented for use with SearchServiceServer. #[async_trait] pub trait SearchService: std::marker::Send + std::marker::Sync + 'static { - /// Root search API. - /// This RPC identifies the set of splits on which the query should run on, - /// and dispatch the several calls to `LeafSearch`. - /// - /// It is also in charge of merging back the results. - async fn root_search( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; /// Perform a leaf search on a given set of splits. /// /// It is like a regular search except that: @@ -1219,18 +1106,6 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; - /// Root list terms API. - /// This RPC identifies the set of splits on which the query should run on, - /// and dispatches the several calls to `LeafListTerms`. - /// - /// It is also in charge of merging back the results. - async fn root_list_terms( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; /// Performs a leaf list terms on a given set of splits. /// /// It is like a regular list term except that: @@ -1245,11 +1120,6 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; - /// Performs a scroll request. - async fn scroll( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; /// gRPC request used to store a key in the local storage of the targeted node. /// This RPC is used in the mini distributed immutable KV store embedded in quickwit. async fn put_kv( @@ -1283,14 +1153,6 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; - /// Describe how a search would be processed. - async fn search_plan( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; } #[derive(Debug)] pub struct SearchServiceServer { @@ -1368,51 +1230,6 @@ pub mod search_service_server { } fn call(&mut self, req: http::Request) -> Self::Future { match req.uri().path() { - "/quickwit.search.SearchService/RootSearch" => { - #[allow(non_camel_case_types)] - struct RootSearchSvc(pub Arc); - impl< - T: SearchService, - > tonic::server::UnaryService - for RootSearchSvc { - type Response = super::SearchResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::root_search(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = RootSearchSvc(inner); - let codec = tonic_prost::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } "/quickwit.search.SearchService/LeafSearch" => { #[allow(non_camel_case_types)] struct LeafSearchSvc(pub Arc); @@ -1550,51 +1367,6 @@ pub mod search_service_server { }; Box::pin(fut) } - "/quickwit.search.SearchService/RootListTerms" => { - #[allow(non_camel_case_types)] - struct RootListTermsSvc(pub Arc); - impl< - T: SearchService, - > tonic::server::UnaryService - for RootListTermsSvc { - type Response = super::ListTermsResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::root_list_terms(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = RootListTermsSvc(inner); - let codec = tonic_prost::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } "/quickwit.search.SearchService/LeafListTerms" => { #[allow(non_camel_case_types)] struct LeafListTermsSvc(pub Arc); @@ -1640,51 +1412,6 @@ pub mod search_service_server { }; Box::pin(fut) } - "/quickwit.search.SearchService/Scroll" => { - #[allow(non_camel_case_types)] - struct ScrollSvc(pub Arc); - impl< - T: SearchService, - > tonic::server::UnaryService - for ScrollSvc { - type Response = super::SearchResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::scroll(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = ScrollSvc(inner); - let codec = tonic_prost::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } "/quickwit.search.SearchService/PutKV" => { #[allow(non_camel_case_types)] struct PutKVSvc(pub Arc); @@ -1909,51 +1636,6 @@ pub mod search_service_server { }; Box::pin(fut) } - "/quickwit.search.SearchService/SearchPlan" => { - #[allow(non_camel_case_types)] - struct SearchPlanSvc(pub Arc); - impl< - T: SearchService, - > tonic::server::UnaryService - for SearchPlanSvc { - type Response = super::SearchPlanResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::search_plan(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = SearchPlanSvc(inner); - let codec = tonic_prost::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } _ => { Box::pin(async move { let mut response = http::Response::new( diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 194bf0b2bd0..65b5c7d78a3 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -96,21 +96,6 @@ impl SearchServiceClient { matches!(self.client_impl, SearchServiceClientImpl::Local(_)) } - /// Perform root search. - pub async fn root_search( - &mut self, - request: quickwit_proto::search::SearchRequest, - ) -> crate::Result { - match &mut self.client_impl { - SearchServiceClientImpl::Grpc(grpc_client) => grpc_client - .root_search(request) - .await - .map(|tonic_response| tonic_response.into_inner()) - .map_err(|tonic_error| parse_grpc_error(&tonic_error)), - SearchServiceClientImpl::Local(service) => service.root_search(request).await, - } - } - /// Perform leaf search. pub async fn leaf_search( &mut self, diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index 7da8069da42..4b294292469 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -35,17 +35,6 @@ impl From> for GrpcSearchAdapter { #[async_trait] impl grpc::SearchService for GrpcSearchAdapter { - #[instrument(skip(self, request))] - async fn root_search( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - set_parent_span_from_request_metadata(request.metadata()); - let search_request = request.into_inner(); - let search_result = self.0.root_search(search_request).await; - convert_to_grpc_result(search_result) - } - #[instrument(skip(self, request))] async fn leaf_search( &self, @@ -89,17 +78,6 @@ impl grpc::SearchService for GrpcSearchAdapter { Ok(tonic::Response::new(Box::pin(grpc_stream))) } - #[instrument(skip(self, request))] - async fn root_list_terms( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - set_parent_span_from_request_metadata(request.metadata()); - let search_request = request.into_inner(); - let search_result = self.0.root_list_terms(search_request).await; - convert_to_grpc_result(search_result) - } - #[instrument(skip(self, request))] async fn leaf_list_terms( &self, @@ -111,15 +89,6 @@ impl grpc::SearchService for GrpcSearchAdapter { convert_to_grpc_result(leaf_search_result) } - async fn scroll( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let scroll_request = request.into_inner(); - let scroll_result = self.0.scroll(scroll_request).await; - convert_to_grpc_result(scroll_result) - } - #[instrument(skip(self, request))] async fn put_kv( &self, @@ -165,6 +134,7 @@ impl grpc::SearchService for GrpcSearchAdapter { let resp = self.0.root_list_fields(request.into_inner()).await; convert_to_grpc_result(resp) } + #[instrument(skip(self, request))] async fn leaf_list_fields( &self, @@ -174,15 +144,4 @@ impl grpc::SearchService for GrpcSearchAdapter { let resp = self.0.leaf_list_fields(request.into_inner()).await; convert_to_grpc_result(resp) } - - #[instrument(skip(self, request))] - async fn search_plan( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - set_parent_span_from_request_metadata(request.metadata()); - let search_request = request.into_inner(); - let search_result = self.0.search_plan(search_request).await; - convert_to_grpc_result(search_result) - } }