A lock-free, bounded, multi-producer multi-consumer (MPMC) queue in Rust using only std. Based on Dmitry Vyukov's classic bounded MPMC queue algorithm.
The queue is a fixed-size circular buffer of slots, each containing an atomic sequence counter and the stored value:
┌──────────┬──────────┬──────────┬──────────┐
│ Slot 0 │ Slot 1 │ Slot 2 │ Slot 3 │ ← Box<[Slot<T>]>
│ seq: 4 │ seq: 5 │ seq: 2 │ seq: 3 │
│ val: ... │ val: ... │ val: ... │ val: ... │
└──────────┴──────────┴──────────┴──────────┘
↑ head=2 ↑ tail=5 (wraps to slot 1)
Two cache-padded atomic counters track the next positions for consumers (head) and producers (tail). Cache-line padding (128 bytes) prevents false sharing between cores contending on head vs tail.
Each slot has an atomic sequence number that acts as a state machine:
Producing (try_push):
- Read
tail. Compute the target slot index (tail & mask). - Read the slot's
sequence. Ifsequence == tail, the slot is writable. - CAS
tailfrom its current value totail + 1. If the CAS fails, another producer won — retry. - Write the value into the slot.
- Store
sequence = tail + 1(Release), signalling consumers.
Consuming (try_pop):
- Read
head. Compute the target slot index (head & mask). - Read the slot's
sequence. Ifsequence == head + 1, the slot is readable. - CAS
headfrom its current value tohead + 1. If the CAS fails, another consumer won — retry. - Read the value out of the slot.
- Store
sequence = head + capacity(Release), freeing the slot for the next lap of producers.
If the sequence check shows the slot isn't ready (full for producers, empty for consumers), try_push/try_pop returns immediately. The blocking push/pop variants retry with exponential backoff (spin-loop hint → thread yield).
The CAS on tail/head ensures exactly one thread wins each slot. The sequence counter acts as a handoff: a producer sets it to tail + 1 only after writing, and a consumer sets it to head + capacity only after reading. No two threads ever access a slot's value concurrently.
Capacity is rounded up to the next power of two (minimum 2) so that index & mask replaces the modulo operation. A minimum of 2 is enforced because with capacity=1 the "readable" and "next-writable" sequence numbers for the same slot collide.
There are 4 uses of unsafe, each justified:
| Location | Reason |
|---|---|
CachePadded::new |
Initialises padding bytes to zero via raw pointer writes. Padding is never read as meaningful data. |
try_push — slot value write |
Writes to UnsafeCell<MaybeUninit<T>> after the producer won the CAS on tail, guaranteeing exclusive access. |
try_pop — slot value read |
Reads from UnsafeCell<MaybeUninit<T>> after the consumer won the CAS on head, guaranteeing exclusive access and that the value was previously written. |
Drop — draining remaining items |
Drops unconsumed items between head and tail. Called with &mut self so no concurrent access is possible. |
pub trait BoundedQueue<T: Send>: Send + Sync {
fn new(capacity: usize) -> Self;
fn push(&self, item: T); // blocks if full
fn pop(&self) -> T; // blocks if empty
fn try_push(&self, item: T) -> Result<(), T>; // non-blocking
fn try_pop(&self) -> Option<T>; // non-blocking
}# Tests (12 tests: single-threaded, concurrent, stress, blocking, drop safety)
cargo test
# Benchmarks (symmetric + asymmetric throughput across thread counts and capacities)
cargo bench --bench throughputsrc/lib.rs Queue trait, implementation, and tests
benches/throughput.rs Throughput benchmarks
BENCHMARK_RESULTS.md Recorded benchmark numbers and analysis