diff --git a/src/task/mod.rs b/src/task/mod.rs index 9dd7ba1857a..663958c7467 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -32,7 +32,7 @@ use std::prelude::v1::*; use std::cell::Cell; use std::fmt; use std::sync::Arc; -use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT}; +use std::sync::atomic::{Ordering, AtomicBool, AtomicUsize, ATOMIC_USIZE_INIT}; use std::thread; use {BoxFuture, Poll, Future, Async}; @@ -224,10 +224,10 @@ impl Spawn { /// to complete. When a future cannot make progress it will use /// `thread::park` to block the current thread. pub fn wait_future(&mut self) -> Result { - let unpark = Arc::new(ThreadUnpark(thread::current())); + let unpark = Arc::new(ThreadUnpark::new(thread::current())); loop { match try!(self.poll_future(unpark.clone())) { - Async::NotReady => thread::park(), + Async::NotReady => unpark.park(), Async::Ready(e) => return Ok(e), } } @@ -279,10 +279,10 @@ impl Spawn { /// Like `wait_future`, except only waits for the next element to arrive on /// the underlying stream. pub fn wait_stream(&mut self) -> Option> { - let unpark = Arc::new(ThreadUnpark(thread::current())); + let unpark = Arc::new(ThreadUnpark::new(thread::current())); loop { match self.poll_stream(unpark.clone()) { - Ok(Async::NotReady) => thread::park(), + Ok(Async::NotReady) => unpark.park(), Ok(Async::Ready(Some(e))) => return Some(Ok(e)), Ok(Async::Ready(None)) => return None, Err(e) => return Some(Err(e)), @@ -331,11 +331,30 @@ pub trait Executor: Send + Sync + 'static { fn execute(&self, r: Run); } -struct ThreadUnpark(thread::Thread); +struct ThreadUnpark { + thread: thread::Thread, + ready: AtomicBool, +} + +impl ThreadUnpark { + fn new(thread: thread::Thread) -> ThreadUnpark { + ThreadUnpark { + thread: thread, + ready: AtomicBool::new(false), + } + } + + fn park(&self) { + if !self.ready.swap(false, Ordering::SeqCst) { + thread::park(); + } + } +} impl Unpark for ThreadUnpark { fn unpark(&self) { - self.0.unpark() + self.ready.store(true, Ordering::SeqCst); + self.thread.unpark() } }