Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ tokio = { version = "1", features = ["macros", "sync", "test-util", "rt-multi-th
tokio-test = "0.4"
tower = { version = "0.5", features = ["full"] }
tower-test = "0.4"
proptest = "1"
rusqlite = { version = "0.39", features = ["bundled", "array"] }

6 changes: 6 additions & 0 deletions proptest-regressions/property.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
85 changes: 3 additions & 82 deletions tests/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::{
fmt::Debug,
future::Future,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
Arc,
},
task::{Context, Poll},
time::Duration,
Expand All @@ -22,84 +21,7 @@ use tower_test::{
use tower_batch::{error, Batch, BatchControl, BatchLayer, BoxError};

mod support;

#[derive(Clone)]
struct Aggregator<T> {
items: Arc<Mutex<Vec<Vec<T>>>>,
current: Arc<AtomicUsize>,
}

impl<T> Aggregator<T> {
pub fn new() -> Self {
Self {
items: Arc::new(Mutex::new(Vec::new())),
current: Arc::new(AtomicUsize::new(0)),
}
}

fn batch_has_size(&self, index: usize, size: usize) -> bool {
if index == self.current.load(Ordering::Acquire) {
return false;
}
let items = &self.items.lock().unwrap();
items.get(index).is_some_and(|v| v.len() == size)
}

fn batch_items(&self, index: usize) -> Option<Vec<T>>
where
T: Clone,
{
if index == self.current.load(Ordering::Acquire) {
return None;
}
let items = self.items.lock().unwrap();
items.get(index).cloned()
}
}

impl<T> Service<BatchControl<T>> for Aggregator<T>
where
T: Debug,
{
type Response = ();
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + Sync + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: BatchControl<T>) -> Self::Future {
match req {
BatchControl::Item(item) => {
let mut items = self.items.lock().unwrap();
match items.get_mut(self.current.load(Ordering::Acquire)) {
None => {
items.push(vec![item]);
}
Some(v) => {
v.push(item);
}
}
}
BatchControl::Flush => {
self.current.fetch_add(1, Ordering::SeqCst);
return Box::pin(async {
tracing::info!("sleeping ...");
async {
// Simulate some activity to catch any flushing issues
tokio::time::sleep(Duration::from_nanos(5)).await;
}
.await;
tracing::info!("awaking ...");
Ok(())
});
}
}

Box::pin(futures::future::ready(Ok(())))
}
}
use support::Aggregator;

#[tokio::test]
async fn batch_flushes_on_max_size() -> Result<(), BoxError> {
Expand Down Expand Up @@ -236,8 +158,7 @@ async fn concurrent_clones_send_requests() -> Result<(), BoxError> {
tokio::time::sleep(Duration::from_millis(200)).await;

// Verify all 9 items were actually delivered to the aggregator.
let items = aggregator.items.lock().unwrap();
let delivered: usize = items.iter().map(Vec::len).sum();
let delivered = aggregator.all_items_flat().len();
assert_eq!(delivered, 9, "all 9 items should reach the aggregator");

Ok(())
Expand Down
Loading
Loading