diff --git a/src/chunks.rs b/src/chunks.rs index c546622..ef8bce9 100644 --- a/src/chunks.rs +++ b/src/chunks.rs @@ -234,8 +234,7 @@ impl Producer { /// For a safe alternative that provides mutable slices of [`Default`]-initialized slots, /// see [`Producer::write_chunk()`]. pub fn write_chunk_uninit(&mut self, n: usize) -> Result, ChunkError> { - // "tail" is only ever written by the producer thread, "Relaxed" is enough - let tail = self.buffer.tail.load(Ordering::Relaxed); + let tail = self.cached_tail.get(); // Check if the queue has *possibly* not enough slots. if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n { @@ -285,8 +284,7 @@ impl Consumer { /// /// See the documentation of the [`chunks`](crate::chunks#examples) module. pub fn read_chunk(&mut self, n: usize) -> Result, ChunkError> { - // "head" is only ever written by the consumer thread, "Relaxed" is enough - let head = self.buffer.head.load(Ordering::Relaxed); + let head = self.cached_head.get(); // Check if the queue has *possibly* not enough slots. if self.buffer.distance(head, self.cached_tail.get()) < n { @@ -497,10 +495,9 @@ impl WriteChunkUninit<'_, T> { unsafe fn commit_unchecked(self, n: usize) -> usize { let p = self.producer; - // "tail" is only ever written by the producer thread, "Relaxed" is enough - let tail = p.buffer.tail.load(Ordering::Relaxed); - let tail = p.buffer.increment(tail, n); + let tail = p.buffer.increment(p.cached_tail.get(), n); p.buffer.tail.store(tail, Ordering::Release); + p.cached_tail.set(tail); n } @@ -744,10 +741,9 @@ impl ReadChunk<'_, T> { unsafe { self.second_ptr.add(i).drop_in_place() }; } let c = self.consumer; - // "head" is only ever written by the consumer thread, "Relaxed" is enough - let head = c.buffer.head.load(Ordering::Relaxed); - let head = c.buffer.increment(head, n); + let head = c.buffer.increment(c.cached_head.get(), n); c.buffer.head.store(head, Ordering::Release); + c.cached_head.set(head); n } @@ -799,10 +795,9 @@ impl Drop for ReadChunkIntoIter<'_, T> { /// Non-iterated items remain in the ring buffer and are *not* dropped. fn drop(&mut self) { let c = &self.chunk.consumer; - // "head" is only ever written by the consumer thread, "Relaxed" is enough - let head = c.buffer.head.load(Ordering::Relaxed); - let head = c.buffer.increment(head, self.iterated); + let head = c.buffer.increment(c.cached_head.get(), self.iterated); c.buffer.head.store(head, Ordering::Release); + c.cached_head.set(head); } } diff --git a/src/lib.rs b/src/lib.rs index c81dc21..cd6f216 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -131,9 +131,11 @@ impl RingBuffer { let p = Producer { buffer: buffer.clone(), cached_head: Cell::new(0), + cached_tail: Cell::new(0), }; let c = Consumer { buffer, + cached_head: Cell::new(0), cached_tail: Cell::new(0), }; (p, c) @@ -283,6 +285,13 @@ pub struct Producer { /// /// This value can be stale and sometimes needs to be resynchronized with `buffer.head`. cached_head: Cell, + + /// A copy of `buffer.tail` for quick access. + /// + /// This value is always in sync with `buffer.tail`. + // NB: Caching the tail seems to have little effect on Intel CPUs, but it seems to + // improve performance on AMD CPUs, see https://github.com/mgeier/rtrb/pull/132 + cached_tail: Cell, } // SAFETY: After moving a Producer to another thread, there is still only a single thread @@ -315,6 +324,7 @@ impl Producer { unsafe { self.buffer.slot_ptr(tail).write(value) }; let tail = self.buffer.increment1(tail); self.buffer.tail.store(tail, Ordering::Release); + self.cached_tail.set(tail); Ok(()) } else { Err(PushError::Full(value)) @@ -342,9 +352,7 @@ impl Producer { pub fn slots(&self) -> usize { let head = self.buffer.head.load(Ordering::Acquire); self.cached_head.set(head); - // "tail" is only ever written by the producer thread, "Relaxed" is enough - let tail = self.buffer.tail.load(Ordering::Relaxed); - self.buffer.capacity - self.buffer.distance(head, tail) + self.buffer.capacity - self.buffer.distance(head, self.cached_tail.get()) } /// Returns `true` if there are currently no slots available for writing. @@ -445,8 +453,7 @@ impl Producer { /// This is a strict subset of the functionality implemented in `write_chunk_uninit()`. /// For performance, this special case is immplemented separately. fn next_tail(&self) -> Option { - // "tail" is only ever written by the producer thread, "Relaxed" is enough - let tail = self.buffer.tail.load(Ordering::Relaxed); + let tail = self.cached_tail.get(); // Check if the queue is *possibly* full. if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity { @@ -488,6 +495,13 @@ pub struct Consumer { /// A reference to the ring buffer. buffer: Arc>, + /// A copy of `buffer.head` for quick access. + /// + /// This value is always in sync with `buffer.head`. + // NB: Caching the head seems to have little effect on Intel CPUs, but it seems to + // improve performance on AMD CPUs, see https://github.com/mgeier/rtrb/pull/132 + cached_head: Cell, + /// A copy of `buffer.tail` for quick access. /// /// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`. @@ -534,6 +548,7 @@ impl Consumer { let value = unsafe { self.buffer.slot_ptr(head).read() }; let head = self.buffer.increment1(head); self.buffer.head.store(head, Ordering::Release); + self.cached_head.set(head); Ok(value) } else { Err(PopError::Empty) @@ -588,9 +603,7 @@ impl Consumer { pub fn slots(&self) -> usize { let tail = self.buffer.tail.load(Ordering::Acquire); self.cached_tail.set(tail); - // "head" is only ever written by the consumer thread, "Relaxed" is enough - let head = self.buffer.head.load(Ordering::Relaxed); - self.buffer.distance(head, tail) + self.buffer.distance(self.cached_head.get(), tail) } /// Returns `true` if there are currently no slots available for reading. @@ -690,8 +703,7 @@ impl Consumer { /// This is a strict subset of the functionality implemented in `read_chunk()`. /// For performance, this special case is immplemented separately. fn next_head(&self) -> Option { - // "head" is only ever written by the consumer thread, "Relaxed" is enough - let head = self.buffer.head.load(Ordering::Relaxed); + let head = self.cached_head.get(); // Check if the queue is *possibly* empty. if head == self.cached_tail.get() {