Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ service SearchService {
/// This methods takes `PartialHit`s and returns `Hit`s.
rpc FetchDocs(FetchDocsRequest) returns (FetchDocsResponse);

// Streams document contents from the document store.
// This method takes `PartialHit`s and streams back `LeafHit`s in batches
// to avoid hitting gRPC message size limits.
rpc StreamFetchDocs(FetchDocsRequest) returns (stream FetchDocsResponse);

// Root list terms API.
// This RPC identifies the set of splits on which the query should run on,
// and dispatches the several calls to `LeafListTerms`.
Expand Down
92 changes: 92 additions & 0 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions quickwit/quickwit-search/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,28 @@ impl SearchServiceClient {
match &mut self.client_impl {
SearchServiceClientImpl::Grpc(grpc_client) => {
let tonic_request = Request::new(request);
// let nb_docs_fetched = request.partial_hits.len();

// get all in one shot
let tonic_response = grpc_client
.fetch_docs(tonic_request)
.await
.map_err(|tonic_error| parse_grpc_error(&tonic_error))?;
Ok(tonic_response.into_inner())

// stream in batches (activate once this was deployed once)
// let all_hits = grpc_client
// .stream_fetch_docs(tonic_request)
// .await
// .map_err(|tonic_error| parse_grpc_error(&tonic_error))?
// .into_inner()
// .map_err(|tonic_error| parse_grpc_error(&tonic_error))
// .try_fold(Vec::with_capacity(nb_docs_fetched), |mut acc, response| async move
// { acc.extend(response.hits);
// Ok(acc)
// })
// .await?;
// Ok(quickwit_proto::search::FetchDocsResponse { hits: all_hits })
}
SearchServiceClientImpl::Local(service) => service.fetch_docs(request).await,
}
Expand Down
40 changes: 39 additions & 1 deletion quickwit/quickwit-serve/src/search_api/grpc_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::{self, StreamExt};
use quickwit_proto::error::convert_to_grpc_result;
use quickwit_proto::search::{
GetKvRequest, GetKvResponse, LeafListFieldsRequest, ListFieldsRequest, ListFieldsResponse,
ReportSplitsRequest, ReportSplitsResponse, search_service_server as grpc,
};
use quickwit_proto::{set_parent_span_from_request_metadata, tonic};
use quickwit_proto::{GrpcServiceError, set_parent_span_from_request_metadata, tonic};
use quickwit_search::SearchService;
use tracing::instrument;

const FETCH_DOCS_BATCH_SIZE: usize = 500;

#[derive(Clone)]
pub struct GrpcSearchAdapter(Arc<dyn SearchService>);

Expand Down Expand Up @@ -68,6 +71,41 @@ impl grpc::SearchService for GrpcSearchAdapter {
convert_to_grpc_result(fetch_docs_result)
}

type StreamFetchDocsStream =
quickwit_proto::tonic::codegen::BoxStream<quickwit_proto::search::FetchDocsResponse>;

#[instrument(skip(self, request))]
async fn stream_fetch_docs(
&self,
request: tonic::Request<quickwit_proto::search::FetchDocsRequest>,
) -> Result<tonic::Response<Self::StreamFetchDocsStream>, tonic::Status> {
set_parent_span_from_request_metadata(request.metadata());
let fetch_docs_request = request.into_inner();

// Call the regular fetch_docs method
let fetch_docs_result = self.0.fetch_docs(fetch_docs_request).await;

let fetch_docs_response = match fetch_docs_result {
Ok(response) => response,
Err(err) => return Err(err.into_grpc_status()),
};

Comment thread
rdettai-sk marked this conversation as resolved.
// If there is only one batch, return it directly to avoid copying to a new vec.
if fetch_docs_response.hits.len() <= FETCH_DOCS_BATCH_SIZE {
let batch = quickwit_proto::search::FetchDocsResponse {
hits: fetch_docs_response.hits,
};
let batch_stream = stream::iter([Ok(batch)]);
return Ok(tonic::Response::new(Box::pin(batch_stream)));
}

let batch_stream = stream::iter(fetch_docs_response.hits)
.chunks(FETCH_DOCS_BATCH_SIZE)
.map(|batch| Ok(quickwit_proto::search::FetchDocsResponse { hits: batch }));

Ok(tonic::Response::new(Box::pin(batch_stream)))
}

#[instrument(skip(self, request))]
async fn root_list_terms(
&self,
Expand Down
Loading