diff --git a/src/error.rs b/src/error.rs index b6dd87e..20d5007 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,7 +12,7 @@ pub struct ServiceError { inner: Arc, } -/// An error produced when the a buffer's worker closes unexpectedly. +/// An error produced when the batch worker closes unexpectedly. pub struct Closed { _p: (), } diff --git a/src/future.rs b/src/future.rs index ece3b58..7e84d81 100644 --- a/src/future.rs +++ b/src/future.rs @@ -12,7 +12,7 @@ use pin_project_lite::pin_project; use super::{error::Closed, message}; pin_project! { - /// Future that completes when the buffered service eventually services the submitted request. + /// Future that resolves once the batch worker has processed the submitted request. #[derive(Debug)] pub struct ResponseFuture { #[pin] diff --git a/src/layer.rs b/src/layer.rs index 5ae6a2c..5a0feb2 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -4,12 +4,13 @@ use tower::{layer::Layer, Service}; use super::{service::Batch, BatchControl}; -/// Adds a layer performing batch processing of requests. +/// A [`Layer`] that wraps an inner service with [`Batch`]. /// -/// The default Tokio executor is used to run the given service, -/// which means that this layer can only be used on the Tokio runtime. +/// The background worker is spawned on the default Tokio executor, so +/// this layer can only be used on the Tokio runtime. /// -/// See the module documentation for more details. +/// See the [module documentation](crate) for the full lifecycle and error +/// semantics. pub struct BatchLayer { size: usize, time: Duration, @@ -19,11 +20,8 @@ pub struct BatchLayer { impl BatchLayer { /// Creates a new [`BatchLayer`]. /// - /// The wrapper is responsible for telling the inner service when to flush a batch of requests. - /// Two parameters control this policy: - /// - /// * `size` gives the maximum number of items per batch. - /// * `time` gives the maximum duration before a batch is flushed. + /// * `size` – the maximum number of items per batch. + /// * `time` – the maximum duration before a batch is flushed. pub fn new(size: usize, time: Duration) -> Self { Self { size, @@ -49,7 +47,7 @@ where impl fmt::Debug for BatchLayer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("BufferLayer") + f.debug_struct("BatchLayer") .field("size", &self.size) .field("time", &self.time) .finish() diff --git a/src/lib.rs b/src/lib.rs index 91410aa..5d0e240 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,17 +1,59 @@ #![allow(clippy::type_complexity)] -//! A Tower middleware that provides a buffered mpsc for processing requests in batches. -//! -//! Writing data in bulk is a common technique for improving the efficiency of certain tasks. -//! `tower-batch` is a middleware that allows you to buffer requests for batch processing until -//! the buffer reaches a maximum size OR a maximum duration elapses. -//! -//! Clients enqueue requests by sending on the channel from any of the handles ([`Batch`]), and the -//! single service running elsewhere (usually spawned) receives and collects the requests wrapped -//! in [`Item(R)`](BatchControl::Item). Once the [`Batch`]) buffer is full or the maximum duration -//! elapses, the service is instructed to write the data with a [`Flush`](BatchControl::Flush) -//! request. Upon completion of the flush operation, the client's will receive a response with the -//! outcome. +//! A Tower middleware that buffers requests and flushes them in batches. +//! +//! Writing data in bulk is a common technique for improving the efficiency of +//! certain tasks – databases, message brokers, object stores, etc. `tower-batch` +//! collects individual requests and flushes them as a group when the buffer +//! reaches a maximum size **or** a maximum duration elapses. +//! +//! # Inner service contract +//! +//! Your inner service must implement `Service>` where `R` is +//! your request type. The middleware sends two kinds of calls: +//! +//! - [`BatchControl::Item(request)`](BatchControl::Item) – buffer this request. +//! - [`BatchControl::Flush`] – process the buffered items and return the result. +//! +//! # How the worker operates +//! +//! [`Batch::new`] (or [`BatchLayer`]) spawns a background worker that owns the +//! inner service. The worker cycles through three states: +//! +//! 1. **Collecting** – the worker pulls requests from the channel and forwards +//! each one to the inner service as a [`BatchControl::Item`]. A timer starts +//! when the first item of a new batch arrives. +//! 2. **Flushing** – triggered when the batch reaches `max_size` items or the +//! `max_time` duration elapses. The worker calls the inner service with +//! [`BatchControl::Flush`]. Once the flush completes, all callers in the +//! batch receive the outcome and the worker returns to collecting. +//! 3. **Finished** – the worker shuts down, either because all [`Batch`] handles +//! were dropped (no more requests possible) or because the inner service +//! returned an error. +//! +//! # Backpressure +//! +//! [`Batch`] handles are cheap to clone – each clone shares the same worker. +//! Backpressure is enforced via a semaphore with `max_size` permits: once +//! `max_size` callers have received `Ready` from [`poll_ready`](tower::Service::poll_ready) +//! without yet calling [`call`](tower::Service::call), subsequent `poll_ready` +//! calls will return `Pending` until capacity is freed. +//! +//! # Errors +//! +//! Callers receive one of two error types through the [`BoxError`] returned by +//! [`ResponseFuture`](future::ResponseFuture): +//! +//! - [`error::ServiceError`] – the inner service returned an error, either +//! during an item call or during a flush. The worker terminates and all +//! pending callers in the current batch receive this error. The original +//! error is accessible via [`source()`](std::error::Error::source). +//! +//! - [`error::Closed`] – the worker shut down before the caller's request +//! could be completed. This happens when all [`Batch`] handles are dropped +//! while items are still collecting (the batch was never flushed), or when +//! the worker is dropped for any other reason. Callers should treat this as +//! meaning their request was **NOT** processed. /// Export tower's alias for a type-erased error type. pub use tower::BoxError; diff --git a/src/service.rs b/src/service.rs index 890398d..1f8e02d 100644 --- a/src/service.rs +++ b/src/service.rs @@ -16,9 +16,14 @@ use super::{ BatchControl, }; -/// Allows batch processing of requests. +/// Handle for submitting requests to a batch worker. /// -/// See the module documentation for more details. +/// Each `Batch` handle communicates with a single background worker over a +/// shared channel. Handles are cheap to [`Clone`] – every clone sends to the +/// same worker, so you can hand them to multiple tasks. +/// +/// See the [module documentation](crate) for the full lifecycle and error +/// semantics. #[derive(Debug)] pub struct Batch where @@ -53,11 +58,12 @@ where { /// Creates a new `Batch` wrapping `service`. /// - /// The wrapper is responsible for telling the inner service when to flush a - /// batch of requests. + /// `size` is the maximum number of items per batch and `time` is the + /// maximum duration before a batch is flushed. The worker flushes + /// whichever limit is hit first. /// - /// The default Tokio executor is used to run the given service, which means - /// that this method must be called while on the Tokio runtime. + /// The background worker is spawned on the default Tokio executor, so + /// this method must be called while on the Tokio runtime. pub fn new(service: T, size: usize, time: std::time::Duration) -> Self where T: Send + 'static, diff --git a/tests/main.rs b/tests/main.rs index ba09027..c7e4639 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -831,11 +831,11 @@ async fn batch_layer_wraps_service() { let aggregator: Aggregator = Aggregator::new(); let layer = BatchLayer::::new(10, Duration::from_secs(1)); - // Cover Debug impl (prints "BufferLayer") + // Cover Debug impl let debug_str = format!("{:?}", layer); assert!( - debug_str.contains("BufferLayer"), - "Debug should contain 'BufferLayer', got: {}", + debug_str.contains("BatchLayer"), + "Debug should contain 'BatchLayer', got: {}", debug_str );