Skip to content

Commit

Permalink
Fix unbounded MPMC queue cloning
Browse files Browse the repository at this point in the history
  • Loading branch information
KyleMayes committed Mar 25, 2017
1 parent 9a7739c commit f3650be
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
## [0.1.1] - UNRELEASED

### Fixed
- Fixed unbounded MPMC queue cloning

## [0.1.0] - 2017-03-24
- Initial release
18 changes: 10 additions & 8 deletions src/unbounded/mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl<T> Consumer<T> {
/// Attempts to clone this consumer.
pub fn try_clone(&self) -> Option<Self> {
if let Some(thread) = self.1.threads.lock().unwrap().pop() {
self.1.consumers.fetch_add(1, Release);
Some(Consumer(thread, self.1.clone()))
} else {
None
Expand All @@ -61,8 +62,8 @@ impl<T> Clone for Consumer<T> {

impl<T> Drop for Consumer<T> {
fn drop(&mut self) {
self.1.consumer.store(0, Release);
self.1.threads.lock().unwrap().push(self.0);
self.1.consumers.fetch_sub(1, Release);
}
}

Expand All @@ -87,6 +88,7 @@ impl<T> Producer<T> {
/// Attempts to clone this producer.
pub fn try_clone(&self) -> Option<Self> {
if let Some(thread) = self.1.threads.lock().unwrap().pop() {
self.1.producers.fetch_add(1, Release);
Some(Producer(thread, self.1.clone()))
} else {
None
Expand All @@ -102,8 +104,8 @@ impl<T> Clone for Producer<T> {

impl<T> Drop for Producer<T> {
fn drop(&mut self) {
self.1.producer.store(0, Release);
self.1.threads.lock().unwrap().push(self.0);
self.1.producers.fetch_sub(1, Release);
}
}

Expand Down Expand Up @@ -135,10 +137,10 @@ const NEXT: usize = 2;
#[repr(C)]
struct Queue<T> {
write: AtomicPtr<Node<T>>,
consumer: AtomicUsize,
consumers: AtomicUsize,
_wpadding: [usize; POINTERS - 2],
read: AtomicPtr<Node<T>>,
producer: AtomicUsize,
producers: AtomicUsize,
_rpadding: [usize; POINTERS - 2],
hazard: Hazard<Node<T>, VecMemory>,
threads: Mutex<Vec<usize>>,
Expand All @@ -151,10 +153,10 @@ impl<T> Queue<T> {
let sentinel = unsafe { VecMemory.allocate(Node::new(None)) };
Arc::new(Queue {
write: AtomicPtr::new(sentinel),
consumer: AtomicUsize::new(1),
consumers: AtomicUsize::new(1),
_wpadding: [0; POINTERS - 2],
read: AtomicPtr::new(sentinel),
producer: AtomicUsize::new(1),
producers: AtomicUsize::new(1),
_rpadding: [0; POINTERS - 2],
hazard: Hazard::new(VecMemory, threads, 3, 512),
threads: Mutex::new((2..threads).collect()),
Expand All @@ -165,7 +167,7 @@ impl<T> Queue<T> {

fn produce(&self, thread: usize, item: T) -> Result<(), ProduceError<T>> {
// Return an error if all of the consumers have been disconnected.
if self.consumer.load(Acquire) == 0 {
if self.consumers.load(Acquire) == 0 {
return Err(ProduceError::Disconnected(item));
}

Expand Down Expand Up @@ -194,7 +196,7 @@ impl<T> Queue<T> {
// Return an error if the queue is empty.
let read = self.hazard.mark(thread, READ, self.read.load(Acquire));
if read == self.write.load(Acquire) {
if self.producer.load(Acquire) == 0 {
if self.producers.load(Acquire) == 0 {
return Err(ConsumeError::Disconnected);
} else {
return Err(ConsumeError::Empty);
Expand Down

0 comments on commit f3650be

Please sign in to comment.