From f3650beceba1a1f175ad34d24392047cb31e4365 Mon Sep 17 00:00:00 2001 From: Kyle Mayes Date: Fri, 24 Mar 2017 23:07:40 -0400 Subject: [PATCH] Fix unbounded MPMC queue cloning --- CHANGELOG.md | 5 +++++ src/unbounded/mpmc.rs | 18 ++++++++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e6f9bf..410b7cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,2 +1,7 @@ +## [0.1.1] - UNRELEASED + +### Fixed +- Fixed unbounded MPMC queue cloning + ## [0.1.0] - 2017-03-24 - Initial release diff --git a/src/unbounded/mpmc.rs b/src/unbounded/mpmc.rs index dd5b56a..f7e4c5d 100644 --- a/src/unbounded/mpmc.rs +++ b/src/unbounded/mpmc.rs @@ -46,6 +46,7 @@ impl Consumer { /// Attempts to clone this consumer. pub fn try_clone(&self) -> Option { if let Some(thread) = self.1.threads.lock().unwrap().pop() { + self.1.consumers.fetch_add(1, Release); Some(Consumer(thread, self.1.clone())) } else { None @@ -61,8 +62,8 @@ impl Clone for Consumer { impl Drop for Consumer { fn drop(&mut self) { - self.1.consumer.store(0, Release); self.1.threads.lock().unwrap().push(self.0); + self.1.consumers.fetch_sub(1, Release); } } @@ -87,6 +88,7 @@ impl Producer { /// Attempts to clone this producer. pub fn try_clone(&self) -> Option { if let Some(thread) = self.1.threads.lock().unwrap().pop() { + self.1.producers.fetch_add(1, Release); Some(Producer(thread, self.1.clone())) } else { None @@ -102,8 +104,8 @@ impl Clone for Producer { impl Drop for Producer { fn drop(&mut self) { - self.1.producer.store(0, Release); self.1.threads.lock().unwrap().push(self.0); + self.1.producers.fetch_sub(1, Release); } } @@ -135,10 +137,10 @@ const NEXT: usize = 2; #[repr(C)] struct Queue { write: AtomicPtr>, - consumer: AtomicUsize, + consumers: AtomicUsize, _wpadding: [usize; POINTERS - 2], read: AtomicPtr>, - producer: AtomicUsize, + producers: AtomicUsize, _rpadding: [usize; POINTERS - 2], hazard: Hazard, VecMemory>, threads: Mutex>, @@ -151,10 +153,10 @@ impl Queue { let sentinel = unsafe { VecMemory.allocate(Node::new(None)) }; Arc::new(Queue { write: AtomicPtr::new(sentinel), - consumer: AtomicUsize::new(1), + consumers: AtomicUsize::new(1), _wpadding: [0; POINTERS - 2], read: AtomicPtr::new(sentinel), - producer: AtomicUsize::new(1), + producers: AtomicUsize::new(1), _rpadding: [0; POINTERS - 2], hazard: Hazard::new(VecMemory, threads, 3, 512), threads: Mutex::new((2..threads).collect()), @@ -165,7 +167,7 @@ impl Queue { fn produce(&self, thread: usize, item: T) -> Result<(), ProduceError> { // Return an error if all of the consumers have been disconnected. - if self.consumer.load(Acquire) == 0 { + if self.consumers.load(Acquire) == 0 { return Err(ProduceError::Disconnected(item)); } @@ -194,7 +196,7 @@ impl Queue { // Return an error if the queue is empty. let read = self.hazard.mark(thread, READ, self.read.load(Acquire)); if read == self.write.load(Acquire) { - if self.producer.load(Acquire) == 0 { + if self.producers.load(Acquire) == 0 { return Err(ConsumeError::Disconnected); } else { return Err(ConsumeError::Empty);