Skip to content

Commit

Permalink
sync: remove try_recv() from mpsc types (tokio-rs#3263)
Browse files Browse the repository at this point in the history
The mpsc `try_recv()` functions have an issue where a sent message
happens-before a call to `try_recv()` but `try_recv()` returns `None`.
Fixing this is non-trivial, so the function is removed for 1.0. When the
bug is fixed, the function can be added back.

Closes tokio-rs#2020
  • Loading branch information
carllerche authored Dec 14, 2020
1 parent 1f862d2 commit a7833e3
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 94 deletions.
9 changes: 7 additions & 2 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
#[cfg(unix)]
#[cfg(any(feature = "signal", feature = "process"))]
use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::error::{SendError, TrySendError};

cfg_time! {
use crate::sync::mpsc::error::SendTimeoutError;
Expand Down Expand Up @@ -194,7 +197,9 @@ impl<T> Receiver<T> {
///
/// Compared with recv, this function has two failure cases instead of
/// one (one for disconnection, one for an empty buffer).
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
#[cfg(unix)]
#[cfg(any(feature = "signal", feature = "process"))]
pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.chan.try_recv()
}

Expand Down
35 changes: 21 additions & 14 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::list;
use crate::sync::notify::Notify;

Expand Down Expand Up @@ -259,21 +258,29 @@ impl<T, S: Semaphore> Rx<T, S> {
}
})
}
}

/// Receives the next value without blocking
pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
use super::block::Read::*;
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
match rx_fields.list.pop(&self.inner.tx) {
Some(Value(value)) => {
self.inner.semaphore.add_permit();
Ok(value)
feature! {
#![all(unix, any(feature = "signal", feature = "process"))]

use crate::sync::mpsc::error::TryRecvError;

impl<T, S: Semaphore> Rx<T, S> {
/// Receives the next value without blocking
pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
use super::block::Read::*;
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
match rx_fields.list.pop(&self.inner.tx) {
Some(Value(value)) => {
self.inner.semaphore.add_permit();
Ok(value)
}
Some(Closed) => Err(TryRecvError::Closed),
None => Err(TryRecvError::Empty),
}
Some(Closed) => Err(TryRecvError::Closed),
None => Err(TryRecvError::Empty),
}
})
})
}
}
}

Expand Down
50 changes: 27 additions & 23 deletions tokio/src/sync/mpsc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,32 +67,36 @@ impl Error for RecvError {}

// ===== TryRecvError =====

/// This enumeration is the list of the possible reasons that try_recv
/// could not return data when called.
#[derive(Debug, PartialEq)]
pub enum TryRecvError {
/// This channel is currently empty, but the Sender(s) have not yet
/// disconnected, so data may yet become available.
Empty,
/// The channel's sending half has been closed, and there will
/// never be any more data received on it.
Closed,
}
feature! {
#![all(unix, any(feature = "signal", feature = "process"))]

/// This enumeration is the list of the possible reasons that try_recv
/// could not return data when called.
#[derive(Debug, PartialEq)]
pub(crate) enum TryRecvError {
/// This channel is currently empty, but the Sender(s) have not yet
/// disconnected, so data may yet become available.
Empty,
/// The channel's sending half has been closed, and there will
/// never be any more data received on it.
Closed,
}

impl fmt::Display for TryRecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"{}",
match self {
TryRecvError::Empty => "channel empty",
TryRecvError::Closed => "channel closed",
}
)
impl fmt::Display for TryRecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"{}",
match self {
TryRecvError::Empty => "channel empty",
TryRecvError::Closed => "channel closed",
}
)
}
}
}

impl Error for TryRecvError {}
impl Error for TryRecvError {}
}

cfg_time! {
// ===== SendTimeoutError =====
Expand Down
17 changes: 1 addition & 16 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{SendError, TryRecvError};
use crate::sync::mpsc::error::SendError;

use std::fmt;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -152,21 +152,6 @@ impl<T> UnboundedReceiver<T> {
crate::future::block_on(self.recv())
}

/// Attempts to return a pending value on this receiver without blocking.
///
/// This method will never block the caller in order to wait for data to
/// become available. Instead, this will always return immediately with
/// a possible option of pending data on the channel.
///
/// This is useful for a flavor of "optimistic check" before deciding to
/// block on a receiver.
///
/// Compared with recv, this function has two failure cases instead of
/// one (one for disconnection, one for an empty buffer).
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.chan.try_recv()
}

/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
Expand Down
40 changes: 1 addition & 39 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
use tokio::sync::mpsc::error::TrySendError;
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
Expand Down Expand Up @@ -385,44 +385,6 @@ fn unconsumed_messages_are_dropped() {
assert_eq!(1, Arc::strong_count(&msg));
}

#[test]
fn try_recv() {
let (tx, mut rx) = mpsc::channel(1);
match rx.try_recv() {
Err(TryRecvError::Empty) => {}
_ => panic!(),
}
tx.try_send(42).unwrap();
match rx.try_recv() {
Ok(42) => {}
_ => panic!(),
}
drop(tx);
match rx.try_recv() {
Err(TryRecvError::Closed) => {}
_ => panic!(),
}
}

#[test]
fn try_recv_unbounded() {
let (tx, mut rx) = mpsc::unbounded_channel();
match rx.try_recv() {
Err(TryRecvError::Empty) => {}
_ => panic!(),
}
tx.send(42).unwrap();
match rx.try_recv() {
Ok(42) => {}
_ => panic!(),
}
drop(tx);
match rx.try_recv() {
Err(TryRecvError::Closed) => {}
_ => panic!(),
}
}

#[test]
fn blocking_recv() {
let (tx, mut rx) = mpsc::channel::<u8>(1);
Expand Down

0 comments on commit a7833e3

Please sign in to comment.