diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 9c468507e22..e4decea2ffa 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -17,13 +17,6 @@ syntax = "proto3"; package quickwit.search; service SearchService { - // Root search API. - // This RPC identifies the set of splits on which the query should run on, - // and dispatch the several calls to `LeafSearch`. - // - // It is also in charge of merging back the results. - rpc RootSearch(SearchRequest) returns (SearchResponse); - // Perform a leaf search on a given set of splits. // // It is like a regular search except that: @@ -37,12 +30,10 @@ service SearchService { /// This methods takes `PartialHit`s and returns `Hit`s. rpc FetchDocs(FetchDocsRequest) returns (FetchDocsResponse); - // Root list terms API. - // This RPC identifies the set of splits on which the query should run on, - // and dispatches the several calls to `LeafListTerms`. - // - // It is also in charge of merging back the results. - rpc RootListTerms(ListTermsRequest) returns (ListTermsResponse); + /// Streams document contents from the document store. + /// This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// to avoid hitting gRPC message size limits. + rpc StreamFetchDocs(FetchDocsRequest) returns (stream FetchDocsResponse); // Performs a leaf list terms on a given set of splits. // @@ -52,9 +43,6 @@ service SearchService { // - it should be applied on the given subset of splits rpc LeafListTerms(LeafListTermsRequest) returns (LeafListTermsResponse); - // Performs a scroll request. - rpc Scroll(ScrollRequest) returns (SearchResponse); - // gRPC request used to store a key in the local storage of the targeted node. // This RPC is used in the mini distributed immutable KV store embedded in quickwit. rpc PutKV(PutKVRequest) returns (PutKVResponse); @@ -68,9 +56,6 @@ service SearchService { rpc ListFields(ListFieldsRequest) returns (ListFieldsResponse); rpc LeafListFields(LeafListFieldsRequest) returns (ListFieldsResponse); - - // Describe how a search would be processed. - rpc SearchPlan(SearchRequest) returns (SearchPlanResponse); } /// Scroll Request diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 991ca8d988a..71724a893f5 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -809,32 +809,6 @@ pub mod search_service_client { self.inner = self.inner.max_encoding_message_size(limit); self } - /// Root search API. - /// This RPC identifies the set of splits on which the query should run on, - /// and dispatch the several calls to `LeafSearch`. - /// - /// It is also in charge of merging back the results. - pub async fn root_search( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic_prost::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.search.SearchService/RootSearch", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("quickwit.search.SearchService", "RootSearch")); - self.inner.unary(req, path, codec).await - } /// Perform a leaf search on a given set of splits. /// /// It is like a regular search except that: @@ -893,16 +867,14 @@ pub mod search_service_client { .insert(GrpcMethod::new("quickwit.search.SearchService", "FetchDocs")); self.inner.unary(req, path, codec).await } - /// Root list terms API. - /// This RPC identifies the set of splits on which the query should run on, - /// and dispatches the several calls to `LeafListTerms`. - /// - /// It is also in charge of merging back the results. - pub async fn root_list_terms( + /// / Streams document contents from the document store. + /// / This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// / to avoid hitting gRPC message size limits. + pub async fn stream_fetch_docs( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response>, tonic::Status, > { self.inner @@ -915,14 +887,14 @@ pub mod search_service_client { })?; let codec = tonic_prost::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/quickwit.search.SearchService/RootListTerms", + "/quickwit.search.SearchService/StreamFetchDocs", ); let mut req = request.into_request(); req.extensions_mut() .insert( - GrpcMethod::new("quickwit.search.SearchService", "RootListTerms"), + GrpcMethod::new("quickwit.search.SearchService", "StreamFetchDocs"), ); - self.inner.unary(req, path, codec).await + self.inner.server_streaming(req, path, codec).await } /// Performs a leaf list terms on a given set of splits. /// @@ -957,28 +929,6 @@ pub mod search_service_client { ); self.inner.unary(req, path, codec).await } - /// Performs a scroll request. - pub async fn scroll( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic_prost::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.search.SearchService/Scroll", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("quickwit.search.SearchService", "Scroll")); - self.inner.unary(req, path, codec).await - } /// gRPC request used to store a key in the local storage of the targeted node. /// This RPC is used in the mini distributed immutable KV store embedded in quickwit. pub async fn put_kv( @@ -1101,31 +1051,6 @@ pub mod search_service_client { ); self.inner.unary(req, path, codec).await } - /// Describe how a search would be processed. - pub async fn search_plan( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic_prost::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.search.SearchService/SearchPlan", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("quickwit.search.SearchService", "SearchPlan")); - self.inner.unary(req, path, codec).await - } } } /// Generated server implementations. @@ -1141,15 +1066,6 @@ pub mod search_service_server { /// Generated trait containing gRPC methods that should be implemented for use with SearchServiceServer. #[async_trait] pub trait SearchService: std::marker::Send + std::marker::Sync + 'static { - /// Root search API. - /// This RPC identifies the set of splits on which the query should run on, - /// and dispatch the several calls to `LeafSearch`. - /// - /// It is also in charge of merging back the results. - async fn root_search( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; /// Perform a leaf search on a given set of splits. /// /// It is like a regular search except that: @@ -1174,16 +1090,20 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; - /// Root list terms API. - /// This RPC identifies the set of splits on which the query should run on, - /// and dispatches the several calls to `LeafListTerms`. - /// - /// It is also in charge of merging back the results. - async fn root_list_terms( + /// Server streaming response type for the StreamFetchDocs method. + type StreamFetchDocsStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// / Streams document contents from the document store. + /// / This method takes `PartialHit`s and streams back `LeafHit`s in batches + /// / to avoid hitting gRPC message size limits. + async fn stream_fetch_docs( &self, - request: tonic::Request, + request: tonic::Request, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; /// Performs a leaf list terms on a given set of splits. @@ -1200,11 +1120,6 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; - /// Performs a scroll request. - async fn scroll( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; /// gRPC request used to store a key in the local storage of the targeted node. /// This RPC is used in the mini distributed immutable KV store embedded in quickwit. async fn put_kv( @@ -1238,14 +1153,6 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; - /// Describe how a search would be processed. - async fn search_plan( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; } #[derive(Debug)] pub struct SearchServiceServer { @@ -1323,51 +1230,6 @@ pub mod search_service_server { } fn call(&mut self, req: http::Request) -> Self::Future { match req.uri().path() { - "/quickwit.search.SearchService/RootSearch" => { - #[allow(non_camel_case_types)] - struct RootSearchSvc(pub Arc); - impl< - T: SearchService, - > tonic::server::UnaryService - for RootSearchSvc { - type Response = super::SearchResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::root_search(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = RootSearchSvc(inner); - let codec = tonic_prost::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } "/quickwit.search.SearchService/LeafSearch" => { #[allow(non_camel_case_types)] struct LeafSearchSvc(pub Arc); @@ -1458,25 +1320,27 @@ pub mod search_service_server { }; Box::pin(fut) } - "/quickwit.search.SearchService/RootListTerms" => { + "/quickwit.search.SearchService/StreamFetchDocs" => { #[allow(non_camel_case_types)] - struct RootListTermsSvc(pub Arc); + struct StreamFetchDocsSvc(pub Arc); impl< T: SearchService, - > tonic::server::UnaryService - for RootListTermsSvc { - type Response = super::ListTermsResponse; + > tonic::server::ServerStreamingService + for StreamFetchDocsSvc { + type Response = super::FetchDocsResponse; + type ResponseStream = T::StreamFetchDocsStream; type Future = BoxFuture< - tonic::Response, + tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::root_list_terms(&inner, request).await + ::stream_fetch_docs(&inner, request) + .await }; Box::pin(fut) } @@ -1487,7 +1351,7 @@ pub mod search_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = RootListTermsSvc(inner); + let method = StreamFetchDocsSvc(inner); let codec = tonic_prost::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -1498,7 +1362,7 @@ pub mod search_service_server { max_decoding_message_size, max_encoding_message_size, ); - let res = grpc.unary(method, req).await; + let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) @@ -1548,51 +1412,6 @@ pub mod search_service_server { }; Box::pin(fut) } - "/quickwit.search.SearchService/Scroll" => { - #[allow(non_camel_case_types)] - struct ScrollSvc(pub Arc); - impl< - T: SearchService, - > tonic::server::UnaryService - for ScrollSvc { - type Response = super::SearchResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::scroll(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = ScrollSvc(inner); - let codec = tonic_prost::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } "/quickwit.search.SearchService/PutKV" => { #[allow(non_camel_case_types)] struct PutKVSvc(pub Arc); @@ -1817,51 +1636,6 @@ pub mod search_service_server { }; Box::pin(fut) } - "/quickwit.search.SearchService/SearchPlan" => { - #[allow(non_camel_case_types)] - struct SearchPlanSvc(pub Arc); - impl< - T: SearchService, - > tonic::server::UnaryService - for SearchPlanSvc { - type Response = super::SearchPlanResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::search_plan(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = SearchPlanSvc(inner); - let codec = tonic_prost::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } _ => { Box::pin(async move { let mut response = http::Response::new( diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 194bf0b2bd0..65b5c7d78a3 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -96,21 +96,6 @@ impl SearchServiceClient { matches!(self.client_impl, SearchServiceClientImpl::Local(_)) } - /// Perform root search. - pub async fn root_search( - &mut self, - request: quickwit_proto::search::SearchRequest, - ) -> crate::Result { - match &mut self.client_impl { - SearchServiceClientImpl::Grpc(grpc_client) => grpc_client - .root_search(request) - .await - .map(|tonic_response| tonic_response.into_inner()) - .map_err(|tonic_error| parse_grpc_error(&tonic_error)), - SearchServiceClientImpl::Local(service) => service.root_search(request).await, - } - } - /// Perform leaf search. pub async fn leaf_search( &mut self, diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 47552473fcc..71b0f1612c0 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -13,11 +13,13 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; +use std::pin::Pin; use std::sync::Arc; use anyhow::{Context, Ok}; -use futures::{StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use itertools::Itertools; +use quickwit_common::shared_consts::SCROLL_BATCH_LEN; use quickwit_doc_mapper::DocMapper; use quickwit_proto::search::{ FetchDocsResponse, PartialHit, SnippetRequest, SplitIdAndFooterOffsets, @@ -147,6 +149,77 @@ pub async fn fetch_docs( Ok(FetchDocsResponse { hits }) } +/// Creates a stream of `FetchDocsResponse` that yields documents in batches +/// to avoid hitting gRPC message size limits. +/// +/// This is the streaming version of `fetch_docs` that processes partial hits +/// and yields them in configurable batch sizes (default: SCROLL_BATCH_LEN). +pub fn fetch_docs_stream( + searcher_context: Arc, + partial_hits: Vec, + index_storage: Arc, + splits: Vec, + doc_mapper: Arc, + snippet_request_opt: Option, +) -> Pin> + Send>> { + Box::pin( + futures::stream::once(async move { + let global_doc_addrs: Vec = partial_hits + .iter() + .map(GlobalDocAddress::from_partial_hit) + .collect(); + + let mut global_doc_addr_to_doc_json = fetch_docs_to_map( + searcher_context, + global_doc_addrs, + index_storage, + &splits, + doc_mapper, + snippet_request_opt.as_ref(), + ) + .await?; + + // Yield hits in batches to avoid gRPC size limits + let batches: Vec> = partial_hits + .chunks(SCROLL_BATCH_LEN) + .map(|partial_hits_batch| { + let hits: Vec = partial_hits_batch + .iter() + .flat_map(|partial_hit| { + let global_doc_addr = GlobalDocAddress::from_partial_hit(partial_hit); + if let Some((_, document)) = + global_doc_addr_to_doc_json.remove_entry(&global_doc_addr) + { + Some(quickwit_proto::search::LeafHit { + leaf_json: document.content_json, + partial_hit: Some(partial_hit.clone()), + leaf_snippet_json: document.snippet_json, + }) + } else { + None + } + }) + .collect(); + + if !hits.is_empty() { + Ok(FetchDocsResponse { hits }) + } else { + // Skip empty batches + Err(anyhow::anyhow!("empty batch")) + } + }) + .filter(|r| r.is_ok()) + .collect(); + + anyhow::Ok(batches) + }) + .flat_map(|result| match result { + std::result::Result::Ok(batches) => futures::stream::iter(batches), + Err(e) => futures::stream::iter(vec![Err(e)]), + }), + ) +} + // number of concurrent fetch allowed for a single split. const NUM_CONCURRENT_REQUESTS: usize = 30; diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 74266f42bf2..0514fd2b7dd 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -82,7 +82,7 @@ pub use crate::client::{ }; pub use crate::cluster_client::ClusterClient; pub use crate::error::{SearchError, parse_grpc_error}; -use crate::fetch_docs::fetch_docs; +use crate::fetch_docs::{fetch_docs, fetch_docs_stream}; pub use crate::root::{ IndexMetasForLeafSearch, SearchJob, check_all_index_metadata_found, jobs_to_leaf_request, root_search, search_plan, diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 71efc16959f..18ca7ceef86 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; +use futures::future::Either; +use futures::{Stream, StreamExt}; use quickwit_common::uri::Uri; use quickwit_config::SearcherConfig; use quickwit_doc_mapper::DocMapper; @@ -43,7 +46,7 @@ use crate::metrics_trackers::LeafSearchMetricsFuture; use crate::root::fetch_docs_phase; use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; use crate::search_permit_provider::SearchPermitProvider; -use crate::{ClusterClient, SearchError, fetch_docs, root_search, search_plan}; +use crate::{ClusterClient, SearchError, fetch_docs, fetch_docs_stream, root_search, search_plan}; #[derive(Clone)] /// The search service implementation. @@ -84,6 +87,14 @@ pub trait SearchService: 'static + Send + Sync { /// This methods takes `PartialHit`s and returns `Hit`s. async fn fetch_docs(&self, request: FetchDocsRequest) -> crate::Result; + /// Streams document contents from the document store in batches. + /// This method takes `PartialHit`s and streams back `FetchDocsResponse`s + /// to avoid hitting gRPC message size limits. + fn stream_fetch_docs( + &self, + request: FetchDocsRequest, + ) -> Pin> + Send>>; + /// Root search API. /// This RPC identifies the set of splits on which the query should run on, /// and dispatches the multiple calls to `LeafSearch`. @@ -240,6 +251,53 @@ impl SearchService for SearchServiceImpl { Ok(fetch_docs_response) } + fn stream_fetch_docs( + &self, + fetch_docs_request: FetchDocsRequest, + ) -> Pin> + Send>> { + let index_uri_result = Uri::from_str(&fetch_docs_request.index_uri); + let storage_resolver = self.storage_resolver.clone(); + let searcher_context = self.searcher_context.clone(); + let doc_mapper_str = fetch_docs_request.doc_mapper.clone(); + let partial_hits = fetch_docs_request.partial_hits; + let split_offsets = fetch_docs_request.split_offsets; + let snippet_request_opt = fetch_docs_request.snippet_request; + + Box::pin( + futures::stream::once(async move { + let index_uri = match index_uri_result { + Ok(uri) => uri, + Err(e) => return Err(SearchError::from(e)), + }; + + let storage = match storage_resolver.resolve(&index_uri).await { + Ok(s) => s, + Err(e) => return Err(SearchError::Internal(e.to_string())), + }; + + let doc_mapper = match deserialize_doc_mapper(&doc_mapper_str) { + Ok(dm) => dm, + Err(e) => return Err(e), + }; + + Ok(fetch_docs_stream( + searcher_context, + partial_hits, + storage, + split_offsets, + doc_mapper, + snippet_request_opt, + )) + }) + .flat_map(|result| match result { + std::result::Result::Ok(stream) => Either::Left( + stream.map(|r| r.map_err(|e| SearchError::Internal(e.to_string()))), + ), + Err(e) => Either::Right(futures::stream::once(async move { Err(e) })), + }), + ) + } + async fn root_list_terms( &self, list_terms_request: ListTermsRequest, diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index c5250ee2465..4b294292469 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -35,17 +35,6 @@ impl From> for GrpcSearchAdapter { #[async_trait] impl grpc::SearchService for GrpcSearchAdapter { - #[instrument(skip(self, request))] - async fn root_search( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - set_parent_span_from_request_metadata(request.metadata()); - let search_request = request.into_inner(); - let search_result = self.0.root_search(search_request).await; - convert_to_grpc_result(search_result) - } - #[instrument(skip(self, request))] async fn leaf_search( &self, @@ -68,15 +57,25 @@ impl grpc::SearchService for GrpcSearchAdapter { convert_to_grpc_result(fetch_docs_result) } + type StreamFetchDocsStream = + quickwit_proto::tonic::codegen::BoxStream; + #[instrument(skip(self, request))] - async fn root_list_terms( + async fn stream_fetch_docs( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { + request: tonic::Request, + ) -> Result, tonic::Status> { + use futures::TryStreamExt; + set_parent_span_from_request_metadata(request.metadata()); - let search_request = request.into_inner(); - let search_result = self.0.root_list_terms(search_request).await; - convert_to_grpc_result(search_result) + let fetch_docs_request = request.into_inner(); + let stream = self.0.stream_fetch_docs(fetch_docs_request); + + // Convert SearchError to tonic::Status + let grpc_stream = stream + .map_err(|err| tonic::Status::internal(format!("Stream fetch docs error: {}", err))); + + Ok(tonic::Response::new(Box::pin(grpc_stream))) } #[instrument(skip(self, request))] @@ -90,15 +89,6 @@ impl grpc::SearchService for GrpcSearchAdapter { convert_to_grpc_result(leaf_search_result) } - async fn scroll( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let scroll_request = request.into_inner(); - let scroll_result = self.0.scroll(scroll_request).await; - convert_to_grpc_result(scroll_result) - } - #[instrument(skip(self, request))] async fn put_kv( &self, @@ -144,6 +134,7 @@ impl grpc::SearchService for GrpcSearchAdapter { let resp = self.0.root_list_fields(request.into_inner()).await; convert_to_grpc_result(resp) } + #[instrument(skip(self, request))] async fn leaf_list_fields( &self, @@ -153,15 +144,4 @@ impl grpc::SearchService for GrpcSearchAdapter { let resp = self.0.leaf_list_fields(request.into_inner()).await; convert_to_grpc_result(resp) } - - #[instrument(skip(self, request))] - async fn search_plan( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - set_parent_span_from_request_metadata(request.metadata()); - let search_request = request.into_inner(); - let search_result = self.0.search_plan(search_request).await; - convert_to_grpc_result(search_result) - } }