Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct ServiceError {
inner: Arc<BoxError>,
}

/// An error produced when the a buffer's worker closes unexpectedly.
/// An error produced when the batch worker closes unexpectedly.
pub struct Closed {
_p: (),
}
Expand Down
2 changes: 1 addition & 1 deletion src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use pin_project_lite::pin_project;
use super::{error::Closed, message};

pin_project! {
/// Future that completes when the buffered service eventually services the submitted request.
/// Future that resolves once the batch worker has processed the submitted request.
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
Expand Down
18 changes: 8 additions & 10 deletions src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use tower::{layer::Layer, Service};

use super::{service::Batch, BatchControl};

/// Adds a layer performing batch processing of requests.
/// A [`Layer`] that wraps an inner service with [`Batch`].
///
/// The default Tokio executor is used to run the given service,
/// which means that this layer can only be used on the Tokio runtime.
/// The background worker is spawned on the default Tokio executor, so
/// this layer can only be used on the Tokio runtime.
///
/// See the module documentation for more details.
/// See the [module documentation](crate) for the full lifecycle and error
/// semantics.
pub struct BatchLayer<Request> {
size: usize,
time: Duration,
Expand All @@ -19,11 +20,8 @@ pub struct BatchLayer<Request> {
impl<Request> BatchLayer<Request> {
/// Creates a new [`BatchLayer`].
///
/// The wrapper is responsible for telling the inner service when to flush a batch of requests.
/// Two parameters control this policy:
///
/// * `size` gives the maximum number of items per batch.
/// * `time` gives the maximum duration before a batch is flushed.
/// * `size` – the maximum number of items per batch.
/// * `time` – the maximum duration before a batch is flushed.
pub fn new(size: usize, time: Duration) -> Self {
Self {
size,
Expand All @@ -49,7 +47,7 @@ where

impl<Request> fmt::Debug for BatchLayer<Request> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BufferLayer")
f.debug_struct("BatchLayer")
.field("size", &self.size)
.field("time", &self.time)
.finish()
Expand Down
66 changes: 54 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,59 @@
#![allow(clippy::type_complexity)]

//! A Tower middleware that provides a buffered mpsc for processing requests in batches.
//!
//! Writing data in bulk is a common technique for improving the efficiency of certain tasks.
//! `tower-batch` is a middleware that allows you to buffer requests for batch processing until
//! the buffer reaches a maximum size OR a maximum duration elapses.
//!
//! Clients enqueue requests by sending on the channel from any of the handles ([`Batch`]), and the
//! single service running elsewhere (usually spawned) receives and collects the requests wrapped
//! in [`Item(R)`](BatchControl::Item). Once the [`Batch`]) buffer is full or the maximum duration
//! elapses, the service is instructed to write the data with a [`Flush`](BatchControl::Flush)
//! request. Upon completion of the flush operation, the client's will receive a response with the
//! outcome.
//! A Tower middleware that buffers requests and flushes them in batches.
//!
//! Writing data in bulk is a common technique for improving the efficiency of
//! certain tasks – databases, message brokers, object stores, etc. `tower-batch`
//! collects individual requests and flushes them as a group when the buffer
//! reaches a maximum size **or** a maximum duration elapses.
//!
//! # Inner service contract
//!
//! Your inner service must implement `Service<BatchControl<R>>` where `R` is
//! your request type. The middleware sends two kinds of calls:
//!
//! - [`BatchControl::Item(request)`](BatchControl::Item) – buffer this request.
//! - [`BatchControl::Flush`] – process the buffered items and return the result.
//!
//! # How the worker operates
//!
//! [`Batch::new`] (or [`BatchLayer`]) spawns a background worker that owns the
//! inner service. The worker cycles through three states:
//!
//! 1. **Collecting** – the worker pulls requests from the channel and forwards
//! each one to the inner service as a [`BatchControl::Item`]. A timer starts
//! when the first item of a new batch arrives.
//! 2. **Flushing** – triggered when the batch reaches `max_size` items or the
//! `max_time` duration elapses. The worker calls the inner service with
//! [`BatchControl::Flush`]. Once the flush completes, all callers in the
//! batch receive the outcome and the worker returns to collecting.
//! 3. **Finished** – the worker shuts down, either because all [`Batch`] handles
//! were dropped (no more requests possible) or because the inner service
//! returned an error.
//!
//! # Backpressure
//!
//! [`Batch`] handles are cheap to clone – each clone shares the same worker.
//! Backpressure is enforced via a semaphore with `max_size` permits: once
//! `max_size` callers have received `Ready` from [`poll_ready`](tower::Service::poll_ready)
//! without yet calling [`call`](tower::Service::call), subsequent `poll_ready`
//! calls will return `Pending` until capacity is freed.
//!
//! # Errors
//!
//! Callers receive one of two error types through the [`BoxError`] returned by
//! [`ResponseFuture`](future::ResponseFuture):
//!
//! - [`error::ServiceError`] – the inner service returned an error, either
//! during an item call or during a flush. The worker terminates and all
//! pending callers in the current batch receive this error. The original
//! error is accessible via [`source()`](std::error::Error::source).
//!
//! - [`error::Closed`] – the worker shut down before the caller's request
//! could be completed. This happens when all [`Batch`] handles are dropped
//! while items are still collecting (the batch was never flushed), or when
//! the worker is dropped for any other reason. Callers should treat this as
//! meaning their request was **NOT** processed.

/// Export tower's alias for a type-erased error type.
pub use tower::BoxError;
Expand Down
18 changes: 12 additions & 6 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ use super::{
BatchControl,
};

/// Allows batch processing of requests.
/// Handle for submitting requests to a batch worker.
///
/// See the module documentation for more details.
/// Each `Batch` handle communicates with a single background worker over a
/// shared channel. Handles are cheap to [`Clone`] – every clone sends to the
/// same worker, so you can hand them to multiple tasks.
///
/// See the [module documentation](crate) for the full lifecycle and error
/// semantics.
#[derive(Debug)]
pub struct Batch<T, Request>
where
Expand Down Expand Up @@ -53,11 +58,12 @@ where
{
/// Creates a new `Batch` wrapping `service`.
///
/// The wrapper is responsible for telling the inner service when to flush a
/// batch of requests.
/// `size` is the maximum number of items per batch and `time` is the
/// maximum duration before a batch is flushed. The worker flushes
/// whichever limit is hit first.
///
/// The default Tokio executor is used to run the given service, which means
/// that this method must be called while on the Tokio runtime.
/// The background worker is spawned on the default Tokio executor, so
/// this method must be called while on the Tokio runtime.
pub fn new(service: T, size: usize, time: std::time::Duration) -> Self
where
T: Send + 'static,
Expand Down
6 changes: 3 additions & 3 deletions tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,11 @@ async fn batch_layer_wraps_service() {
let aggregator: Aggregator<u32> = Aggregator::new();
let layer = BatchLayer::<u32>::new(10, Duration::from_secs(1));

// Cover Debug impl (prints "BufferLayer")
// Cover Debug impl
let debug_str = format!("{:?}", layer);
assert!(
debug_str.contains("BufferLayer"),
"Debug should contain 'BufferLayer', got: {}",
debug_str.contains("BatchLayer"),
"Debug should contain 'BatchLayer', got: {}",
debug_str
);

Expand Down
Loading