diff --git a/src/lib.rs b/src/lib.rs index c1abe42..57a233a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -145,14 +145,19 @@ use core::{ #[cfg(not(loom))] use core::{ cell::UnsafeCell, - sync::atomic::{AtomicU8, Ordering::SeqCst}, + sync::atomic::{fence, AtomicU8, Ordering::*}, }; #[cfg(loom)] use loom::{ cell::UnsafeCell, - sync::atomic::{AtomicU8, Ordering::SeqCst}, + sync::atomic::{fence, AtomicU8, Ordering::*}, }; +#[cfg(all(feature = "async", not(loom)))] +use core::hint; +#[cfg(all(feature = "async", loom))] +use loom::hint; + #[cfg(feature = "async")] use core::{ pin::Pin, @@ -164,10 +169,10 @@ use std::time::{Duration, Instant}; #[cfg(feature = "std")] mod thread { #[cfg(not(loom))] - pub use std::thread::{current, park, park_timeout, Thread}; + pub use std::thread::{current, park, park_timeout, yield_now, Thread}; #[cfg(loom)] - pub use loom::thread::{current, park, Thread}; + pub use loom::thread::{current, park, yield_now, Thread}; // loom does not support parking with a timeout. So we just // yield. This means that the "park" will "spuriously" wake up @@ -263,21 +268,66 @@ impl Sender { // Don't run our Drop implementation if send was called, any cleanup now happens here mem::forget(self); + // SAFETY: The channel exists on the heap for the entire duration of this method and we + // only ever acquire shared references to it. Note that if the receiver disconnects it + // does not free the channel. let channel = unsafe { channel_ptr.as_ref() }; // Write the message into the channel on the heap. + // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE + // state, and since we're responsible for setting that state, we can guarantee that we have + // exclusive access to this memory location to perform this write. unsafe { channel.write_message(message) }; // Set the state to signal there is a message on the channel. - match channel.state.swap(MESSAGE, SeqCst) { + // ORDERING: we use release ordering to ensure the write of the message is visible to the + // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state, + // and thus we do not need acquire orderng. The RECEIVING branch manages synchronization + // independent of this operation. + // + // EMPTY + 1 = MESSAGE + // RECEIVING + 1 = UNPARKING + // DISCONNECTED + 1 = invalid, however this state is never observed + match channel.state.fetch_add(1, Release) { // The receiver is alive and has not started waiting. Send done. EMPTY => Ok(()), // The receiver is waiting. Wake it up so it can return the message. RECEIVING => { - unsafe { channel.take_waker() }.unpark(); + // ORDERING: Synchronizes with the write of the waker to memory, and prevents the + // taking of the waker from being ordered before this operation. + fence(Acquire); + + // Take the waker, but critically do not unpark it. If we unparked now, then the + // receiving thread could still observe the UNPARKING state and re-park, meaning + // that after we change to the MESSAGE state, it would remain parked indefinitely + // or until a spurious wakeup. + // SAFETY: at this point we are in the UNPARKING state, and the receiving thread + // does not access the waker while in this state, nor does it free the channel + // allocation in this state. + let waker = unsafe { channel.take_waker() }; + + // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load + // in the receiving thread, ensuring that both our read of the waker and write of + // the message happen-before the taking of the message and freeing of the channel. + // Furthermore, we need acquire ordering to ensure the unparking of the receiver + // happens after the channel state is updated. + channel.state.swap(MESSAGE, AcqRel); + + // Note: it is possible that between the store above and this statement that + // the receiving thread is spuriously unparked, takes the message, and frees + // the channel allocation. However, we took ownership of the channel out of + // that allocation, and freeing the channel does not drop the waker since the + // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of + // whether or not the receive has completed by this point. + waker.unpark(); + Ok(()) } // The receiver was already dropped. The error is responsible for freeing the channel. + // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so + // we can transfer exclusive ownership of the channel's resources to the error. + // Moreover, since we just placed the message in the channel, the channel contains a + // valid message. DISCONNECTED => Err(unsafe { SendError::new(channel_ptr) }), _ => unreachable!(), } @@ -286,17 +336,48 @@ impl Sender { impl Drop for Sender { fn drop(&mut self) { - // SAFETY: The reference won't be used after the channel is freed in this method + // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or + // DISCONNECTED states. If we are in the MESSAGE state, then we called + // mem::forget(self), so we should not be in this function call. If we are in the + // DISCONNECTED state, then the receiver either received a MESSAGE so this statement is + // unreachable, or was dropped and observed that our side was still alive, and thus didn't + // free the channel. let channel = unsafe { self.channel_ptr.as_ref() }; // Set the channel state to disconnected and read what state the receiver was in - match channel.state.swap(DISCONNECTED, SeqCst) { + // ORDERING: we don't need release ordering here since there are no modifications we + // need to make visible to other thread, and the Err(RECEIVING) branch handles + // synchronization independent of this cmpxchg + // + // EMPTY ^ 001 = DISCONNECTED + // RECEIVING ^ 001 = UNPARKING + // DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed + match channel.state.fetch_xor(0b001, Relaxed) { // The receiver has not started waiting, nor is it dropped. EMPTY => (), // The receiver is waiting. Wake it up so it can detect that the channel disconnected. - RECEIVING => unsafe { channel.take_waker() }.unpark(), + RECEIVING => { + // See comments in Sender::send + + fence(Acquire); + + let waker = unsafe { channel.take_waker() }; + + // We still need release ordering here to make sure our read of the waker happens + // before this, and acquire ordering to ensure the unparking of the receiver + // happens after this. + channel.state.swap(DISCONNECTED, AcqRel); + + // The Acquire ordering above ensures that the write of the DISCONNECTED state + // happens-before unparking the receiver. + waker.unpark(); + } // The receiver was already dropped. We are responsible for freeing the channel. DISCONNECTED => { + // SAFETY: when the receiver switches the state to DISCONNECTED they have received + // the message or will no longer be trying to receive the message, and have + // observed that the sender is still alive, meaning that we're responsible for + // freeing the channel allocation. unsafe { dealloc(self.channel_ptr) }; } _ => unreachable!(), @@ -322,17 +403,20 @@ impl Receiver { // SAFETY: The channel will not be freed while this method is still running. let channel = unsafe { self.channel_ptr.as_ref() }; - match channel.state.load(SeqCst) { - // The sender is alive but has not sent anything yet. - EMPTY => Err(TryRecvError::Empty), - // The sender sent the message. We take the message and mark the channel disconnected. + // ORDERING: we use acquire ordering to synchronize with the store of the message + match channel.state.load(Acquire) { MESSAGE => { - channel.state.store(DISCONNECTED, SeqCst); + // It's okay to break up the load and store since once we're in the message state + // the sender no longer modifies the state + // ORDERING: at this point the sender has done its job and is no longer active, so + // we don't need to make any side effects visible to it + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we are in the MESSAGE state so the message is present Ok(unsafe { channel.take_message() }) } - // The sender was dropped before sending anything, or we already received the message. + EMPTY => Err(TryRecvError::Empty), DISCONNECTED => Err(TryRecvError::Disconnected), - // The receiver must have already been `Future::poll`ed. No message available. #[cfg(feature = "async")] RECEIVING => Err(TryRecvError::Empty), _ => unreachable!(), @@ -358,14 +442,25 @@ impl Receiver { /// Panics if called after this receiver has been polled asynchronously. #[cfg(feature = "std")] pub fn recv(self) -> Result { + // Note that we don't need to worry about changing the state to disconnected or setting the + // state to an invalid value at any point in this function because we take ownership of + // self, and this function does not exit until the message has been received or both side + // of the channel are inactive and cleaned up. + let channel_ptr = self.channel_ptr; // Don't run our Drop implementation if we are receiving consuming ourselves. mem::forget(self); + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so channel_ptr is valid let channel = unsafe { channel_ptr.as_ref() }; - match channel.state.load(SeqCst) { + // ORDERING: we use acquire ordering to synchronize with the write of the message in the + // case that it's available + match channel.state.load(Acquire) { // The sender is alive but has not sent anything yet. We prepare to park. EMPTY => { // Conditionally add a delay here to help the tests trigger the edge cases where @@ -375,43 +470,81 @@ impl Receiver { std::thread::sleep(std::time::Duration::from_millis(10)); // Write our waker instance to the channel. + // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not + // try to access the waker until it sees the state set to RECEIVING below unsafe { channel.write_waker(ReceiverWaker::current_thread()) }; - match channel - .state - .compare_exchange(EMPTY, RECEIVING, SeqCst, SeqCst) - { + // Switch the state to RECEIVING. We need to do this in one atomic step in case the + // sender disconnected or sent the message while we wrote the waker to memory. We + // don't need to do a compare exchange here however because if the original state + // was not EMPTY, then the sender has either finished sending the message or is + // being dropped, so the RECEIVING state will never be observed after we return. + // ORDERING: we use release ordering so the sender can synchronize with our writing + // of the waker to memory. The individual branches handle any additional + // synchronizaton + match channel.state.swap(RECEIVING, Release) { // We stored our waker, now we park until the sender has changed the state - Ok(EMPTY) => loop { + EMPTY => loop { thread::park(); - match channel.state.load(SeqCst) { + + // ORDERING: synchronize with the write of the message + match channel.state.load(Acquire) { // The sender sent the message while we were parked. MESSAGE => { + // SAFETY: we are in the message state so the message is valid let message = unsafe { channel.take_message() }; + + // SAFETY: the Sender delegates the responsibility of deallocating + // the channel to us upon sending the message unsafe { dealloc(channel_ptr) }; + break Ok(message); } // The sender was dropped while we were parked. DISCONNECTED => { + // SAFETY: the Sender doesn't deallocate the channel allocation in + // its drop implementation if we're receiving unsafe { dealloc(channel_ptr) }; + break Err(RecvError); } // State did not change, spurious wakeup, park again. - RECEIVING => (), + RECEIVING | UNPARKING => (), _ => unreachable!(), } }, // The sender sent the message while we prepared to park. - Err(MESSAGE) => { + MESSAGE => { + // ORDERING: Synchronize with the write of the message. This branch is + // unlikely to be taken, so it's likely more efficient to use a fence here + // instead of AcqRel ordering on the RMW operation + fence(Acquire); + + // SAFETY: we started in the empty state and the sender switched us to the + // message state. This means that it did not take the waker, so we're + // responsible for dropping it. unsafe { channel.drop_waker() }; + + // SAFETY: we are in the message state so the message is valid let message = unsafe { channel.take_message() }; + + // SAFETY: the Sender delegates the responsibility of deallocating the + // channel to us upon sending the message unsafe { dealloc(channel_ptr) }; + Ok(message) } // The sender was dropped before sending anything while we prepared to park. - Err(DISCONNECTED) => { + DISCONNECTED => { + // SAFETY: we started in the empty state and the sender switched us to the + // disconnected state. It does not take the waker when it does this so we + // need to drop it. unsafe { channel.drop_waker() }; + + // SAFETY: the sender does not deallocate the channel if it switches from + // empty to disconnected so we need to free the allocation unsafe { dealloc(channel_ptr) }; + Err(RecvError) } _ => unreachable!(), @@ -419,13 +552,21 @@ impl Receiver { } // The sender already sent the message. MESSAGE => { + // SAFETY: we are in the message state so the message is valid let message = unsafe { channel.take_message() }; + + // SAFETY: we are already in the message state so the sender has been forgotten + // and it's our job to clean up resources unsafe { dealloc(channel_ptr) }; + Ok(message) } // The sender was dropped before sending anything, or we already received the message. DISCONNECTED => { + // SAFETY: the sender does not deallocate the channel if it switches from empty to + // disconnected so we need to free the allocation unsafe { dealloc(channel_ptr) }; + Err(RecvError) } // The receiver must have been `Future::poll`ed prior to this call. @@ -447,68 +588,30 @@ impl Receiver { /// Panics if called after this receiver has been polled asynchronously. #[cfg(feature = "std")] pub fn recv_ref(&self) -> Result { - let channel_ptr = self.channel_ptr; - let channel = unsafe { channel_ptr.as_ref() }; + self.start_recv_ref(RecvError, |channel| { + loop { + thread::park(); - match channel.state.load(SeqCst) { - // The sender is alive but has not sent anything yet. We prepare to park. - EMPTY => { - // Conditionally add a delay here to help the tests trigger the edge cases where - // the sender manages to be dropped or send something before we are able to store - // our waker object in the channel. - #[cfg(oneshot_test_delay)] - std::thread::sleep(std::time::Duration::from_millis(10)); - - // Write our waker instance to the channel. - unsafe { channel.write_waker(ReceiverWaker::current_thread()) }; + // ORDERING: we use acquire ordering to synchronize with the write of the message + match channel.state.load(Acquire) { + // The sender sent the message while we were parked. + // We take the message and mark the channel disconnected. + MESSAGE => { + // ORDERING: the sender is inactive at this point so we don't need to make + // any reads or writes visible to the sending thread + channel.state.store(DISCONNECTED, Relaxed); - match channel - .state - .compare_exchange(EMPTY, RECEIVING, SeqCst, SeqCst) - { - // We stored our waker, now we park until the sender has changed the state - Ok(EMPTY) => loop { - thread::park(); - match channel.state.load(SeqCst) { - // The sender sent the message while we were parked. - // We take the message and mark the channel disconnected. - MESSAGE => { - channel.state.store(DISCONNECTED, SeqCst); - break Ok(unsafe { channel.take_message() }); - } - // The sender was dropped while we were parked. - DISCONNECTED => break Err(RecvError), - // State did not change, spurious wakeup, park again. - RECEIVING => (), - _ => unreachable!(), - } - }, - // The sender sent the message while we prepared to park. - Err(MESSAGE) => { - channel.state.store(DISCONNECTED, SeqCst); - unsafe { channel.drop_waker() }; - Ok(unsafe { channel.take_message() }) - } - // The sender was dropped before sending anything while we prepared to park. - Err(DISCONNECTED) => { - unsafe { channel.drop_waker() }; - Err(RecvError) + // SAFETY: we were just in the message state so the message is valid + break Ok(unsafe { channel.take_message() }); } + // The sender was dropped while we were parked. + DISCONNECTED => break Err(RecvError), + // State did not change, spurious wakeup, park again. + RECEIVING | UNPARKING => (), _ => unreachable!(), } } - // The sender sent the message. We take the message and mark the channel disconnected. - MESSAGE => { - channel.state.store(DISCONNECTED, SeqCst); - Ok(unsafe { channel.take_message() }) - } - // The sender was dropped before sending anything, or we already received the message. - DISCONNECTED => Err(RecvError), - // The receiver must have been `Future::poll`ed prior to this call. - #[cfg(feature = "async")] - RECEIVING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR), - _ => unreachable!(), - } + }) } /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns: @@ -548,75 +651,178 @@ impl Receiver { /// Panics if called after this receiver has been polled asynchronously. #[cfg(feature = "std")] pub fn recv_deadline(&self, deadline: Instant) -> Result { - let channel_ptr = self.channel_ptr; - let channel = unsafe { channel_ptr.as_ref() }; - - match channel.state.load(SeqCst) { - // The sender is alive but has not sent anything yet. We prepare to park. - EMPTY => { - // Conditionally add a delay here to help the tests trigger the edge cases where - // the sender manages to be dropped or send something before we are able to store - // our waker object in the channel. - #[cfg(oneshot_test_delay)] - std::thread::sleep(std::time::Duration::from_millis(10)); + #[cold] + fn wait_for_unpark(channel: &Channel) -> Result { + loop { + thread::park(); + + // Same ordering as usual + match channel.state.load(Acquire) { + MESSAGE => { + // Same ordering and safety as usual + + channel.state.store(DISCONNECTED, Relaxed); + break Ok(unsafe { channel.take_message() }); + } + DISCONNECTED => { + break Err(RecvTimeoutError::Disconnected); + } + // We continue on the empty state here since the current implementation eagerly + // sets the state to EMPTY upon timeout. + EMPTY => (), + _ => unreachable!(), + } + } + } - // Write our thread instance to the channel. - unsafe { channel.write_waker(ReceiverWaker::current_thread()) }; + self.start_recv_ref(RecvTimeoutError::Disconnected, |channel| { + loop { + match deadline.checked_duration_since(Instant::now()) { + Some(timeout) => { + thread::park_timeout(timeout); - match channel - .state - .compare_exchange(EMPTY, RECEIVING, SeqCst, SeqCst) - { - // We stored our waker, now we park until the sender has changed the state - Ok(EMPTY) => loop { - let (state, timed_out) = if let Some(timeout) = - deadline.checked_duration_since(Instant::now()) - { - thread::park_timeout(timeout); - (channel.state.load(SeqCst), false) - } else { - // We reached the deadline. Stop being in the receiving state. - (channel.state.swap(EMPTY, SeqCst), true) - }; - match state { + // ORDERING: synchronize with the write of the message + match channel.state.load(Acquire) { // The sender sent the message while we were parked. MESSAGE => { - channel.state.store(DISCONNECTED, SeqCst); + // ORDERING: the sender has been `mem::forget`-ed so this update + // only needs to be visible to us. + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we either are in the message state or were just in the + // message state break Ok(unsafe { channel.take_message() }); } // The sender was dropped while we were parked. DISCONNECTED => break Err(RecvTimeoutError::Disconnected), // State did not change, spurious wakeup, park again. + RECEIVING | UNPARKING => (), + _ => unreachable!(), + } + } + None => { + // ORDERING: synchronize with the write of the message + match channel.state.swap(EMPTY, Acquire) { + // We reached the end of the timeout without receiving a message RECEIVING => { - if timed_out { - unsafe { channel.drop_waker() }; - break Err(RecvTimeoutError::Timeout); - } + // SAFETY: we were in th receiving state and are now in the empty + // state, so the sender has not and will not try to read the waker, + // so we have exclusive access to drop it. + unsafe { channel.drop_waker() }; + + break Err(RecvTimeoutError::Timeout); + } + // The sender sent the message while we were parked. + MESSAGE => { + // Same safety and ordering as the Some branch + + channel.state.store(DISCONNECTED, Relaxed); + break Ok(unsafe { channel.take_message() }); + } + // The sender was dropped while we were parked. + DISCONNECTED => { + // ORDERING: we were originally in the disconnected state meaning + // that the sender is inactive and no longer observing the state, + // so we only need to change it back to DISCONNECTED for if the + // receiver is dropped or a recv* method is called again + channel.state.store(DISCONNECTED, Relaxed); + + break Err(RecvTimeoutError::Disconnected); + } + UNPARKING => { + // We were in the UNPARKING state and are now in the EMPTY state. + // We wait to be unparked since this is the only way to maintain + // correctness with intrusive memory. + + break wait_for_unpark(channel); } _ => unreachable!(), } - }, - // The sender sent the message while we prepared to park. + } + } + } + }) + } + + /// Begins the process of receiving on the channel by reference. If the message is already + /// ready, or the sender has disconnected, then this function will return the appropriate + /// Result immediately. Otherwise, it will write the waker to memory, check to see if the + /// sender has finished or disconnected again, and then will call `finish`. `finish` is + /// thus responsible for cleaning up the channel's resources appropriately before it returns, + /// such as destroying the waker, for instance. + #[cfg(feature = "std")] + #[inline] + fn start_recv_ref( + &self, + disconnected_error: E, + finish: impl FnOnce(&Channel) -> Result, + ) -> Result { + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so `self.channel` is valid + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: synchronize with the write of the message + match channel.state.load(Acquire) { + // The sender is alive but has not sent anything yet. We prepare to park. + EMPTY => { + // Conditionally add a delay here to help the tests trigger the edge cases where + // the sender manages to be dropped or send something before we are able to store + // our waker object in the channel. + #[cfg(oneshot_test_delay)] + std::thread::sleep(std::time::Duration::from_millis(10)); + + // Write our waker instance to the channel. + unsafe { + channel.write_waker(ReceiverWaker::current_thread()); + } + + // ORDERING: we use release ordering on success so the sender can synchronize with + // our write of the waker. We use relaxed ordering on failure since the sender does + // not need to synchronize with our write and the individual match arms handle any + // additional synchronization + match channel + .state + .compare_exchange(EMPTY, RECEIVING, Release, Relaxed) + { + // We stored our waker, now we delegate to the callback to finish the receive + // operation + Ok(_) => finish(channel), + // The sender sent the message while we prepared to finish Err(MESSAGE) => { - channel.state.store(DISCONNECTED, SeqCst); + // See comments in `recv` for ordering and safety + + fence(Acquire); + unsafe { channel.drop_waker() }; + + // ORDERING: the sender has been `mem::forget`-ed so this update only + // needs to be visible to us + channel.state.store(DISCONNECTED, Relaxed); + Ok(unsafe { channel.take_message() }) } // The sender was dropped before sending anything while we prepared to park. Err(DISCONNECTED) => { + // See comments in `recv` for safety unsafe { channel.drop_waker() }; - Err(RecvTimeoutError::Disconnected) + Err(disconnected_error) } _ => unreachable!(), } } - // The sender sent the message. + // The sender sent the message. We take the message and mark the channel disconnected. MESSAGE => { - channel.state.store(DISCONNECTED, SeqCst); + // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be + // visible to us + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we are in the message state so the message is valid Ok(unsafe { channel.take_message() }) } // The sender was dropped before sending anything, or we already received the message. - DISCONNECTED => Err(RecvTimeoutError::Disconnected), + DISCONNECTED => Err(disconnected_error), // The receiver must have been `Future::poll`ed prior to this call. #[cfg(feature = "async")] RECEIVING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR), @@ -630,6 +836,9 @@ impl core::future::Future for Receiver { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + // FIXME: relax orderings and fix racy access to the waker if necessary + + // SAFETY: The channel will not be freed while this method is still running. let channel = unsafe { self.channel_ptr.as_ref() }; match channel.state.load(SeqCst) { @@ -642,7 +851,7 @@ impl core::future::Future for Receiver { .compare_exchange(RECEIVING, EMPTY, SeqCst, SeqCst) { // We successfully changed the state back to EMPTY. Replace the waker. - Ok(RECEIVING) => { + Ok(_) => { unsafe { channel.drop_waker() }; unsafe { channel.write_async_waker(cx) } } @@ -656,6 +865,11 @@ impl core::future::Future for Receiver { // The sender was dropped before sending anything while we prepared to park. // The sender has taken the waker already. Err(DISCONNECTED) => Poll::Ready(Err(RecvError)), + Err(UNPARKING) => { + cx.waker().wake_by_ref(); + hint::spin_loop(); + Poll::Pending + } _ => unreachable!(), } } @@ -673,25 +887,31 @@ impl core::future::Future for Receiver { impl Drop for Receiver { fn drop(&mut self) { - // SAFETY: The reference won't be used after it is freed in this method + // SAFETY: since the receiving side is still alive the sender would have observed that and + // left deallocating the channel allocation to us. let channel = unsafe { self.channel_ptr.as_ref() }; // Set the channel state to disconnected and read what state the receiver was in - match channel.state.swap(DISCONNECTED, SeqCst) { + match channel.state.swap(DISCONNECTED, Acquire) { // The sender has not sent anything, nor is it dropped. EMPTY => (), // The sender already sent something. We must drop it, and free the channel. MESSAGE => { + // SAFETY: we are in the message state so the message is initialized unsafe { channel.drop_message() }; + + // SAFETY: see safety comment at top of function unsafe { dealloc(self.channel_ptr) }; } // The receiver has been polled. #[cfg(feature = "async")] RECEIVING => { + // TODO: figure this out when async is fixed unsafe { channel.drop_waker() }; } // The sender was already dropped. We are responsible for freeing the channel. DISCONNECTED => { + // SAFETY: see safety comment at top of function unsafe { dealloc(self.channel_ptr) }; } _ => unreachable!(), @@ -701,18 +921,23 @@ impl Drop for Receiver { /// All the values that the `Channel::state` field can have during the lifetime of a channel. mod states { + // These values are very explicitly chosen so that we can replace some cmpxchg calls with + // fetch_* calls. + /// The initial channel state. Active while both endpoints are still alive, no message has been /// sent, and the receiver is not receiving. - pub const EMPTY: u8 = 0; + pub const EMPTY: u8 = 0b011; /// A message has been sent to the channel, but the receiver has not yet read it. - pub const MESSAGE: u8 = 1; + pub const MESSAGE: u8 = 0b100; /// No message has yet been sent on the channel, but the receiver is currently receiving. - pub const RECEIVING: u8 = 2; + pub const RECEIVING: u8 = 0b000; + #[cfg(any(feature = "std", feature = "async"))] + pub const UNPARKING: u8 = 0b001; /// The channel has been closed. This means that either the sender or receiver has been dropped, /// or the message sent to the channel has already been received. Since this is a oneshot /// channel, it is disconnected after the one message it is supposed to hold has been /// transmitted. - pub const DISCONNECTED: u8 = 3; + pub const DISCONNECTED: u8 = 0b010; } use states::*; @@ -833,6 +1058,8 @@ impl Channel { #[cfg(feature = "async")] unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll> { + // FIXME: relax orderings and fix racy access to the waker if necessary + // Write our thread instance to the channel. self.write_waker(ReceiverWaker::task_waker(cx)); @@ -841,7 +1068,7 @@ impl Channel { .compare_exchange(EMPTY, RECEIVING, SeqCst, SeqCst) { // We stored our waker, now we return and let the sender wake us up - Ok(EMPTY) => Poll::Pending, + Ok(_) => Poll::Pending, // The sender was dropped before sending anything while we prepared to park. Err(DISCONNECTED) => { self.drop_waker(); @@ -866,6 +1093,9 @@ enum ReceiverWaker { /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`. #[cfg(feature = "async")] Task(task::Waker), + /// A little hack to not make this enum an uninhibitable type when no features are enabled. + #[cfg(not(any(feature = "async", feature = "std")))] + _Uninhabited, } impl ReceiverWaker { @@ -885,6 +1115,8 @@ impl ReceiverWaker { ReceiverWaker::Thread(thread) => thread.unpark(), #[cfg(feature = "async")] ReceiverWaker::Task(waker) => waker.wake(), + #[cfg(not(any(feature = "async", feature = "std")))] + ReceiverWaker::_Uninhabited => unreachable!(), } } } diff --git a/tests/sync.rs b/tests/sync.rs index d70dafa..c205efa 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -203,6 +203,26 @@ fn try_recv() { }) } +#[cfg(feature = "std")] +#[test] +fn try_recv_then_drop_receiver() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::(); + let t1 = thread::spawn(move || { + let _ = sender.send(42); + }); + let t2 = thread::spawn(move || { + assert!(matches!( + receiver.try_recv(), + Ok(42) | Err(TryRecvError::Empty) + )); + mem::drop(receiver); + }); + t1.join().unwrap(); + t2.join().unwrap(); + }) +} + #[cfg(feature = "std")] #[test] fn recv_deadline_and_timeout_no_time() { @@ -225,7 +245,8 @@ fn recv_deadline_and_timeout_no_time() { }) } -#[cfg(feature = "std")] +// This test doesn't give meaningful results when run with oneshot_test_delay and loom +#[cfg(all(feature = "std", not(all(oneshot_test_delay, loom))))] #[test] fn recv_deadline_and_timeout_time_should_elapse() { maybe_loom_model(|| {