Skip to content

Commit

Permalink
sync: add mpsc::Receiver::blocking_recv_many (#6867)
Browse files Browse the repository at this point in the history
Fixes: #6865
Co-authored-by: Rafael Bachmann <[email protected]>
  • Loading branch information
barafael and Rafael Bachmann authored Oct 17, 2024
1 parent c9e998e commit 1656d8e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 0 deletions.
10 changes: 10 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,16 @@ impl<T> Receiver<T> {
crate::future::block_on(self.recv())
}

/// Variant of [`Self::recv_many`] for blocking contexts.
///
/// The same conditions as in [`Self::blocking_recv`] apply.
#[track_caller]
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
crate::future::block_on(self.recv_many(buffer, limit))
}

/// Closes the receiving half of a channel without dropping it.
///
/// This prevents any further messages from being sent on the channel while
Expand Down
10 changes: 10 additions & 0 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,16 @@ impl<T> UnboundedReceiver<T> {
crate::future::block_on(self.recv())
}

/// Variant of [`Self::recv_many`] for blocking contexts.
///
/// The same conditions as in [`Self::blocking_recv`] apply.
#[track_caller]
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
crate::future::block_on(self.recv_many(buffer, limit))
}

/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
Expand Down
33 changes: 33 additions & 0 deletions tokio/tests/sync_panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ fn mpsc_bounded_receiver_blocking_recv_panic_caller() -> Result<(), Box<dyn Erro
Ok(())
}

#[test]
fn mpsc_bounded_receiver_blocking_recv_many_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let rt = current_thread();
let (_tx, mut rx) = mpsc::channel::<u8>(1);
rt.block_on(async {
let _ = rx.blocking_recv();
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]
fn mpsc_bounded_sender_blocking_send_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
Expand Down Expand Up @@ -161,6 +177,23 @@ fn mpsc_unbounded_receiver_blocking_recv_panic_caller() -> Result<(), Box<dyn Er
Ok(())
}

#[test]
fn mpsc_unbounded_receiver_blocking_recv_many_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let rt = current_thread();
let (_tx, mut rx) = mpsc::unbounded_channel::<u8>();
let mut vec = vec![];
rt.block_on(async {
let _ = rx.blocking_recv_many(&mut vec, 1);
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]
fn semaphore_merge_unrelated_owned_permits() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
Expand Down

0 comments on commit 1656d8e

Please sign in to comment.