Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/ringbuffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,14 @@ impl<T> RingBuffer<T> {
cap.next_power_of_two()
};

unsafe {

// We really just want to use raw allocation, but unfortunately that isn't stable yet.
let mut positions = Vec::with_capacity(calculated_capacity);

// It's important that this not panic, since it would lead to uninitialized memory
// being freed.
for i in 0..calculated_capacity {
positions.push(Node::new(i));
unsafe { positions.push(Node::new(i)) };
}

RingBuffer{
Expand All @@ -183,7 +183,7 @@ impl<T> RingBuffer<T> {
mask: calculated_capacity-1,
positions: positions,
}
}

}

/// cap returns the actual capacity of the ring buffer. This
Expand Down Expand Up @@ -251,8 +251,8 @@ impl<T> RingBuffer<T> {
/// this operation blocks until it can be put. Returns an error if
/// this ring buffer is disposed.
pub fn put(&self, data: T) -> Result<(), RingBufferError> {
self.with_unique(&self.queue, |p| p, |n, p| unsafe {
ptr::write(n.item.get(), data);
self.with_unique(&self.queue, |p| p, |n, p| {
unsafe { ptr::write(n.item.get(), data) };
n.position.store(p, Ordering::Release);
})
}
Expand All @@ -261,8 +261,8 @@ impl<T> RingBuffer<T> {
/// if the ring buffer is empty until an item is placed. Returns an error
/// if the ring buffer is disposed.
pub fn get(&self) -> Result<T, RingBufferError> {
self.with_unique(&self.dequeue, |p| p.wrapping_add(1), |n, p| unsafe {
let data = ptr::read(n.item.get());
self.with_unique(&self.dequeue, |p| p.wrapping_add(1), |n, p| {
let data = unsafe { ptr::read(n.item.get()) };
n.position.store(p.wrapping_add(self.mask), Ordering::Release);
data
})
Expand Down