From e4f76688a00fa2ce81ab6c074700995095c29e1e Mon Sep 17 00:00:00 2001 From: Ivan Petkov Date: Tue, 16 Mar 2021 18:31:46 +0000 Subject: [PATCH] runtime: fix memory leak/growth when creating many runtimes (#3564) --- .github/workflows/ci.yml | 13 +- tests-integration/Cargo.toml | 6 + .../src/bin/test-process-signal.rs | 11 + tokio/src/io/driver/interest.rs | 2 +- tokio/src/io/driver/registration.rs | 2 + tokio/src/io/poll_evented.rs | 5 + tokio/src/process/unix/driver.rs | 60 ++--- tokio/src/process/unix/reap.rs | 17 +- tokio/src/signal/mod.rs | 40 +++ tokio/src/signal/registry.rs | 131 ++++------ tokio/src/signal/reusable_box.rs | 227 ++++++++++++++++++ tokio/src/signal/unix.rs | 56 ++--- tokio/src/signal/windows.rs | 24 +- tokio/src/sync/mod.rs | 18 +- tokio/src/sync/mpsc/bounded.rs | 20 -- tokio/src/sync/mpsc/chan.rs | 24 -- tokio/src/sync/mpsc/error.rs | 33 --- tokio/src/sync/watch.rs | 33 ++- 18 files changed, 437 insertions(+), 285 deletions(-) create mode 100644 tests-integration/src/bin/test-process-signal.rs create mode 100644 tokio/src/signal/reusable_box.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b68bfb2be06..2806a61669a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -82,14 +82,23 @@ jobs: sudo apt-get install -y valgrind # Compile tests - - name: cargo build + - name: cargo build test-mem run: cargo build --features rt-net --bin test-mem working-directory: tests-integration # Run with valgrind - - name: Run valgrind + - name: Run valgrind test-mem run: valgrind --leak-check=full --show-leak-kinds=all ./target/debug/test-mem + # Compile tests + - name: cargo build test-process-signal + run: cargo build --features rt-process-signal --bin test-process-signal + working-directory: tests-integration + + # Run with valgrind + - name: Run valgrind test-process-signal + run: valgrind --leak-check=full --show-leak-kinds=all ./target/debug/test-process-signal + test-unstable: name: test tokio full --unstable runs-on: ${{ matrix.os }} diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 0b98ddc54f9..5ab8c15db3d 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -12,9 +12,15 @@ name = "test-cat" name = "test-mem" required-features = ["rt-net"] +[[bin]] +name = "test-process-signal" +required-features = ["rt-process-signal"] + [features] # For mem check rt-net = ["tokio/rt", "tokio/rt-multi-thread", "tokio/net"] +# For test-process-signal +rt-process-signal = ["rt", "tokio/process", "tokio/signal"] full = [ "macros", diff --git a/tests-integration/src/bin/test-process-signal.rs b/tests-integration/src/bin/test-process-signal.rs new file mode 100644 index 00000000000..af1b4c49074 --- /dev/null +++ b/tests-integration/src/bin/test-process-signal.rs @@ -0,0 +1,11 @@ +// https://github.com/tokio-rs/tokio/issues/3550 +fn main() { + for _ in 0..1000 { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + drop(rt); + } +} diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs index 8c8049dfb53..9eead082f39 100644 --- a/tokio/src/io/driver/interest.rs +++ b/tokio/src/io/driver/interest.rs @@ -1,4 +1,4 @@ -#![cfg_attr(not(feature = "net"), allow(unreachable_pub))] +#![cfg_attr(not(feature = "net"), allow(dead_code, unreachable_pub))] use crate::io::driver::Ready; diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index 1451224598c..db911b3e1aa 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "net"), allow(dead_code))] + use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo}; use crate::util::slab; diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 27a4cb7c1c2..47ae5583dad 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -121,6 +121,11 @@ impl PollEvented { } /// Returns a reference to the registration + #[cfg(any( + feature = "net", + all(unix, feature = "process"), + all(unix, feature = "signal"), + ))] pub(crate) fn registration(&self) -> &Registration { &self.registration } diff --git a/tokio/src/process/unix/driver.rs b/tokio/src/process/unix/driver.rs index 9a16cad5499..110b4846b03 100644 --- a/tokio/src/process/unix/driver.rs +++ b/tokio/src/process/unix/driver.rs @@ -6,8 +6,8 @@ use crate::park::Park; use crate::process::unix::orphan::ReapOrphanQueue; use crate::process::unix::GlobalOrphanQueue; use crate::signal::unix::driver::Driver as SignalDriver; -use crate::signal::unix::{signal_with_handle, InternalStream, Signal, SignalKind}; -use crate::sync::mpsc::error::TryRecvError; +use crate::signal::unix::{signal_with_handle, SignalKind}; +use crate::sync::watch; use std::io; use std::time::Duration; @@ -16,7 +16,7 @@ use std::time::Duration; #[derive(Debug)] pub(crate) struct Driver { park: SignalDriver, - inner: CoreDriver, + inner: CoreDriver, GlobalOrphanQueue>, } #[derive(Debug)] @@ -25,27 +25,25 @@ struct CoreDriver { orphan_queue: Q, } +trait HasChanged { + fn has_changed(&mut self) -> bool; +} + +impl HasChanged for watch::Receiver { + fn has_changed(&mut self) -> bool { + self.try_has_changed().and_then(Result::ok).is_some() + } +} + // ===== impl CoreDriver ===== impl CoreDriver where - S: InternalStream, + S: HasChanged, Q: ReapOrphanQueue, { - fn got_signal(&mut self) -> bool { - match self.sigchild.try_recv() { - Ok(()) => true, - Err(TryRecvError::Empty) => false, - Err(TryRecvError::Closed) => panic!("signal was deregistered"), - } - } - fn process(&mut self) { - if self.got_signal() { - // Drain all notifications which may have been buffered - // so we can try to reap all orphans in one batch - while self.got_signal() {} - + if self.sigchild.has_changed() { self.orphan_queue.reap_orphans(); } } @@ -97,8 +95,6 @@ impl Park for Driver { mod test { use super::*; use crate::process::unix::orphan::test::MockQueue; - use crate::sync::mpsc::error::TryRecvError; - use std::task::{Context, Poll}; struct MockStream { total_try_recv: usize, @@ -114,17 +110,10 @@ mod test { } } - impl InternalStream for MockStream { - fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll> { - unimplemented!(); - } - - fn try_recv(&mut self) -> Result<(), TryRecvError> { + impl HasChanged for MockStream { + fn has_changed(&mut self) -> bool { self.total_try_recv += 1; - match self.values.remove(0) { - Some(()) => Ok(()), - None => Err(TryRecvError::Empty), - } + self.values.remove(0).is_some() } } @@ -140,17 +129,4 @@ mod test { assert_eq!(1, driver.sigchild.total_try_recv); assert_eq!(0, driver.orphan_queue.total_reaps.get()); } - - #[test] - fn coalesce_signals_before_reaping() { - let mut driver = CoreDriver { - sigchild: MockStream::new(vec![Some(()), Some(()), None]), - orphan_queue: MockQueue::<()>::new(), - }; - - driver.process(); - - assert_eq!(3, driver.sigchild.total_try_recv); - assert_eq!(1, driver.orphan_queue.total_reaps.get()); - } } diff --git a/tokio/src/process/unix/reap.rs b/tokio/src/process/unix/reap.rs index de483c44b41..5dc95e5b8aa 100644 --- a/tokio/src/process/unix/reap.rs +++ b/tokio/src/process/unix/reap.rs @@ -15,7 +15,7 @@ use std::task::Poll; #[derive(Debug)] pub(crate) struct Reaper where - W: Wait + Unpin, + W: Wait, Q: OrphanQueue, { inner: Option, @@ -25,7 +25,7 @@ where impl Deref for Reaper where - W: Wait + Unpin, + W: Wait, Q: OrphanQueue, { type Target = W; @@ -37,7 +37,7 @@ where impl Reaper where - W: Wait + Unpin, + W: Wait, Q: OrphanQueue, { pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self { @@ -61,7 +61,7 @@ impl Future for Reaper where W: Wait + Unpin, Q: OrphanQueue + Unpin, - S: InternalStream, + S: InternalStream + Unpin, { type Output = io::Result; @@ -106,7 +106,7 @@ where impl Kill for Reaper where - W: Kill + Wait + Unpin, + W: Kill + Wait, Q: OrphanQueue, { fn kill(&mut self) -> io::Result<()> { @@ -116,7 +116,7 @@ where impl Drop for Reaper where - W: Wait + Unpin, + W: Wait, Q: OrphanQueue, { fn drop(&mut self) { @@ -134,7 +134,6 @@ mod test { use super::*; use crate::process::unix::orphan::test::MockQueue; - use crate::sync::mpsc::error::TryRecvError; use futures::future::FutureExt; use std::os::unix::process::ExitStatusExt; use std::process::ExitStatus; @@ -206,10 +205,6 @@ mod test { None => Poll::Pending, } } - - fn try_recv(&mut self) -> Result<(), TryRecvError> { - unimplemented!(); - } } #[test] diff --git a/tokio/src/signal/mod.rs b/tokio/src/signal/mod.rs index d347e6ed016..fe572f041f3 100644 --- a/tokio/src/signal/mod.rs +++ b/tokio/src/signal/mod.rs @@ -42,6 +42,8 @@ //! } //! # } //! ``` +use crate::sync::watch::Receiver; +use std::task::{Context, Poll}; mod ctrl_c; pub use ctrl_c::ctrl_c; @@ -58,3 +60,41 @@ mod os { pub mod unix; pub mod windows; + +mod reusable_box; +use self::reusable_box::ReusableBoxFuture; + +#[derive(Debug)] +struct RxFuture { + inner: ReusableBoxFuture>, +} + +async fn make_future(mut rx: Receiver<()>) -> Receiver<()> { + match rx.changed().await { + Ok(()) => rx, + Err(_) => panic!("signal sender went away"), + } +} + +impl RxFuture { + fn new(rx: Receiver<()>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } + + async fn recv(&mut self) -> Option<()> { + use crate::future::poll_fn; + poll_fn(|cx| self.poll_recv(cx)).await + } + + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(rx) => { + self.inner.set(make_future(rx)); + Poll::Ready(Some(())) + } + } + } +} diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index 55ee8c53263..8b89108a62d 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -2,22 +2,32 @@ use crate::signal::os::{OsExtraData, OsStorage}; -use crate::sync::mpsc::Sender; +use crate::sync::watch; use once_cell::sync::Lazy; use std::ops; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Mutex; pub(crate) type EventId = usize; /// State for a specific event, whether a notification is pending delivery, /// and what listeners are registered. -#[derive(Default, Debug)] +#[derive(Debug)] pub(crate) struct EventInfo { pending: AtomicBool, - recipients: Mutex>>, + tx: watch::Sender<()>, +} + +impl Default for EventInfo { + fn default() -> Self { + let (tx, _rx) = watch::channel(()); + + Self { + pending: AtomicBool::new(false), + tx, + } + } } /// An interface for retrieving the `EventInfo` for a particular eventId. @@ -67,14 +77,12 @@ impl Registry { impl Registry { /// Registers a new listener for `event_id`. - fn register_listener(&self, event_id: EventId, listener: Sender<()>) { + fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { self.storage .event_info(event_id) .unwrap_or_else(|| panic!("invalid event_id: {}", event_id)) - .recipients - .lock() - .unwrap() - .push(listener); + .tx + .subscribe() } /// Marks `event_id` as having been delivered, without broadcasting it to @@ -89,8 +97,6 @@ impl Registry { /// /// Returns `true` if an event was delivered to at least one listener. fn broadcast(&self) -> bool { - use crate::sync::mpsc::error::TrySendError; - let mut did_notify = false; self.storage.for_each(|event_info| { // Any signal of this kind arrived since we checked last? @@ -98,23 +104,9 @@ impl Registry { return; } - let mut recipients = event_info.recipients.lock().unwrap(); - - // Notify all waiters on this signal that the signal has been - // received. If we can't push a message into the queue then we don't - // worry about it as everything is coalesced anyway. If the channel - // has gone away then we can remove that slot. - for i in (0..recipients.len()).rev() { - match recipients[i].try_send(()) { - Ok(()) => did_notify = true, - Err(TrySendError::Closed(..)) => { - recipients.swap_remove(i); - } - - // Channel is full, ignore the error since the - // receiver has already been woken up - Err(_) => {} - } + // Ignore errors if there are no listeners + if event_info.tx.send(()).is_ok() { + did_notify = true; } }); @@ -137,8 +129,8 @@ impl ops::Deref for Globals { impl Globals { /// Registers a new listener for `event_id`. - pub(crate) fn register_listener(&self, event_id: EventId, listener: Sender<()>) { - self.registry.register_listener(event_id, listener); + pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { + self.registry.register_listener(event_id) } /// Marks `event_id` as having been delivered, without broadcasting it to @@ -179,7 +171,7 @@ where mod tests { use super::*; use crate::runtime::{self, Runtime}; - use crate::sync::{mpsc, oneshot}; + use crate::sync::{oneshot, watch}; use futures::future; @@ -193,13 +185,9 @@ mod tests { EventInfo::default(), ]); - let (first_tx, first_rx) = mpsc::channel(3); - let (second_tx, second_rx) = mpsc::channel(3); - let (third_tx, third_rx) = mpsc::channel(3); - - registry.register_listener(0, first_tx); - registry.register_listener(1, second_tx); - registry.register_listener(2, third_tx); + let first = registry.register_listener(0); + let second = registry.register_listener(1); + let third = registry.register_listener(2); let (fire, wait) = oneshot::channel(); @@ -213,6 +201,9 @@ mod tests { registry.record_event(1); registry.broadcast(); + // Yield so the previous broadcast can get received + crate::time::sleep(std::time::Duration::from_millis(10)).await; + // Send subsequent signal registry.record_event(0); registry.broadcast(); @@ -221,7 +212,7 @@ mod tests { }); let _ = fire.send(()); - let all = future::join3(collect(first_rx), collect(second_rx), collect(third_rx)); + let all = future::join3(collect(first), collect(second), collect(third)); let (first_results, second_results, third_results) = all.await; assert_eq!(2, first_results.len()); @@ -235,8 +226,7 @@ mod tests { fn register_panics_on_invalid_input() { let registry = Registry::new(vec![EventInfo::default()]); - let (tx, _) = mpsc::channel(1); - registry.register_listener(1, tx); + registry.register_listener(1); } #[test] @@ -245,74 +235,37 @@ mod tests { registry.record_event(42); } - #[test] - fn broadcast_cleans_up_disconnected_listeners() { - let rt = Runtime::new().unwrap(); - - rt.block_on(async { - let registry = Registry::new(vec![EventInfo::default()]); - - let (first_tx, first_rx) = mpsc::channel(1); - let (second_tx, second_rx) = mpsc::channel(1); - let (third_tx, third_rx) = mpsc::channel(1); - - registry.register_listener(0, first_tx); - registry.register_listener(0, second_tx); - registry.register_listener(0, third_tx); - - drop(first_rx); - drop(second_rx); - - let (fire, wait) = oneshot::channel(); - - crate::spawn(async { - wait.await.expect("wait failed"); - - registry.record_event(0); - registry.broadcast(); - - assert_eq!(1, registry.storage[0].recipients.lock().unwrap().len()); - drop(registry); - }); - - let _ = fire.send(()); - let results = collect(third_rx).await; - - assert_eq!(1, results.len()); - }); - } - #[test] fn broadcast_returns_if_at_least_one_event_fired() { - let registry = Registry::new(vec![EventInfo::default()]); + let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]); registry.record_event(0); assert_eq!(false, registry.broadcast()); - let (first_tx, first_rx) = mpsc::channel(1); - let (second_tx, second_rx) = mpsc::channel(1); - - registry.register_listener(0, first_tx); - registry.register_listener(0, second_tx); + let first = registry.register_listener(0); + let second = registry.register_listener(1); registry.record_event(0); assert_eq!(true, registry.broadcast()); - drop(first_rx); + drop(first); registry.record_event(0); assert_eq!(false, registry.broadcast()); - drop(second_rx); + drop(second); } fn rt() -> Runtime { - runtime::Builder::new_current_thread().build().unwrap() + runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap() } - async fn collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()> { + async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> { let mut ret = vec![]; - while let Some(v) = rx.recv().await { + while let Ok(v) = rx.changed().await { ret.push(v); } diff --git a/tokio/src/signal/reusable_box.rs b/tokio/src/signal/reusable_box.rs new file mode 100644 index 00000000000..426ecb06fb9 --- /dev/null +++ b/tokio/src/signal/reusable_box.rs @@ -0,0 +1,227 @@ +use std::alloc::Layout; +use std::future::Future; +use std::panic::AssertUnwindSafe; +use std::pin::Pin; +use std::ptr::{self, NonNull}; +use std::task::{Context, Poll}; +use std::{fmt, panic}; + +/// A reusable `Pin + Send>>`. +/// +/// This type lets you replace the future stored in the box without +/// reallocating when the size and alignment permits this. +pub(crate) struct ReusableBoxFuture { + boxed: NonNull + Send>, +} + +impl ReusableBoxFuture { + /// Create a new `ReusableBoxFuture` containing the provided future. + pub(crate) fn new(future: F) -> Self + where + F: Future + Send + 'static, + { + let boxed: Box + Send> = Box::new(future); + + let boxed = Box::into_raw(boxed); + + // SAFETY: Box::into_raw does not return null pointers. + let boxed = unsafe { NonNull::new_unchecked(boxed) }; + + Self { boxed } + } + + /// Replace the future currently stored in this box. + /// + /// This reallocates if and only if the layout of the provided future is + /// different from the layout of the currently stored future. + pub(crate) fn set(&mut self, future: F) + where + F: Future + Send + 'static, + { + if let Err(future) = self.try_set(future) { + *self = Self::new(future); + } + } + + /// Replace the future currently stored in this box. + /// + /// This function never reallocates, but returns an error if the provided + /// future has a different size or alignment from the currently stored + /// future. + pub(crate) fn try_set(&mut self, future: F) -> Result<(), F> + where + F: Future + Send + 'static, + { + // SAFETY: The pointer is not dangling. + let self_layout = { + let dyn_future: &(dyn Future + Send) = unsafe { self.boxed.as_ref() }; + Layout::for_value(dyn_future) + }; + + if Layout::new::() == self_layout { + // SAFETY: We just checked that the layout of F is correct. + unsafe { + self.set_same_layout(future); + } + + Ok(()) + } else { + Err(future) + } + } + + /// Set the current future. + /// + /// # Safety + /// + /// This function requires that the layout of the provided future is the + /// same as `self.layout`. + unsafe fn set_same_layout(&mut self, future: F) + where + F: Future + Send + 'static, + { + // Drop the existing future, catching any panics. + let result = panic::catch_unwind(AssertUnwindSafe(|| { + ptr::drop_in_place(self.boxed.as_ptr()); + })); + + // Overwrite the future behind the pointer. This is safe because the + // allocation was allocated with the same size and alignment as the type F. + let self_ptr: *mut F = self.boxed.as_ptr() as *mut F; + ptr::write(self_ptr, future); + + // Update the vtable of self.boxed. The pointer is not null because we + // just got it from self.boxed, which is not null. + self.boxed = NonNull::new_unchecked(self_ptr); + + // If the old future's destructor panicked, resume unwinding. + match result { + Ok(()) => {} + Err(payload) => { + panic::resume_unwind(payload); + } + } + } + + /// Get a pinned reference to the underlying future. + pub(crate) fn get_pin(&mut self) -> Pin<&mut (dyn Future + Send)> { + // SAFETY: The user of this box cannot move the box, and we do not move it + // either. + unsafe { Pin::new_unchecked(self.boxed.as_mut()) } + } + + /// Poll the future stored inside this box. + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + self.get_pin().poll(cx) + } +} + +impl Future for ReusableBoxFuture { + type Output = T; + + /// Poll the future stored inside this box. + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::into_inner(self).get_pin().poll(cx) + } +} + +// The future stored inside ReusableBoxFuture must be Send. +unsafe impl Send for ReusableBoxFuture {} + +// The only method called on self.boxed is poll, which takes &mut self, so this +// struct being Sync does not permit any invalid access to the Future, even if +// the future is not Sync. +unsafe impl Sync for ReusableBoxFuture {} + +// Just like a Pin> is always Unpin, so is this type. +impl Unpin for ReusableBoxFuture {} + +impl Drop for ReusableBoxFuture { + fn drop(&mut self) { + unsafe { + drop(Box::from_raw(self.boxed.as_ptr())); + } + } +} + +impl fmt::Debug for ReusableBoxFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReusableBoxFuture").finish() + } +} + +#[cfg(test)] +mod test { + use super::ReusableBoxFuture; + use futures::future::FutureExt; + use std::alloc::Layout; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + #[test] + fn test_different_futures() { + let fut = async move { 10 }; + // Not zero sized! + assert_eq!(Layout::for_value(&fut).size(), 1); + + let mut b = ReusableBoxFuture::new(fut); + + assert_eq!(b.get_pin().now_or_never(), Some(10)); + + b.try_set(async move { 20 }) + .unwrap_or_else(|_| panic!("incorrect size")); + + assert_eq!(b.get_pin().now_or_never(), Some(20)); + + b.try_set(async move { 30 }) + .unwrap_or_else(|_| panic!("incorrect size")); + + assert_eq!(b.get_pin().now_or_never(), Some(30)); + } + + #[test] + fn test_different_sizes() { + let fut1 = async move { 10 }; + let val = [0u32; 1000]; + let fut2 = async move { val[0] }; + let fut3 = ZeroSizedFuture {}; + + assert_eq!(Layout::for_value(&fut1).size(), 1); + assert_eq!(Layout::for_value(&fut2).size(), 4004); + assert_eq!(Layout::for_value(&fut3).size(), 0); + + let mut b = ReusableBoxFuture::new(fut1); + assert_eq!(b.get_pin().now_or_never(), Some(10)); + b.set(fut2); + assert_eq!(b.get_pin().now_or_never(), Some(0)); + b.set(fut3); + assert_eq!(b.get_pin().now_or_never(), Some(5)); + } + + struct ZeroSizedFuture {} + impl Future for ZeroSizedFuture { + type Output = u32; + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + Poll::Ready(5) + } + } + + #[test] + fn test_zero_sized() { + let fut = ZeroSizedFuture {}; + // Zero sized! + assert_eq!(Layout::for_value(&fut).size(), 0); + + let mut b = ReusableBoxFuture::new(fut); + + assert_eq!(b.get_pin().now_or_never(), Some(5)); + assert_eq!(b.get_pin().now_or_never(), Some(5)); + + b.try_set(ZeroSizedFuture {}) + .unwrap_or_else(|_| panic!("incorrect size")); + + assert_eq!(b.get_pin().now_or_never(), Some(5)); + assert_eq!(b.get_pin().now_or_never(), Some(5)); + } +} diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 0de875adb21..cb1d1cc9286 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -6,8 +6,8 @@ #![cfg(unix)] use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; -use crate::sync::mpsc::error::TryRecvError; -use crate::sync::mpsc::{channel, Receiver}; +use crate::signal::RxFuture; +use crate::sync::watch; use libc::c_int; use mio::net::UnixStream; @@ -222,7 +222,8 @@ fn action(globals: Pin<&'static Globals>, signal: c_int) { /// /// This will register the signal handler if it hasn't already been registered, /// returning any error along the way if that fails. -fn signal_enable(signal: c_int, handle: Handle) -> io::Result<()> { +fn signal_enable(signal: SignalKind, handle: Handle) -> io::Result<()> { + let signal = signal.0; if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) { return Err(Error::new( ErrorKind::Other, @@ -325,7 +326,7 @@ fn signal_enable(signal: c_int, handle: Handle) -> io::Result<()> { #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub struct Signal { - rx: Receiver<()>, + inner: RxFuture, } /// Creates a new stream which will receive notifications when the current @@ -351,21 +352,21 @@ pub struct Signal { /// * If the signal is one of /// [`signal_hook::FORBIDDEN`](fn@signal_hook_registry::register#panics) pub fn signal(kind: SignalKind) -> io::Result { - signal_with_handle(kind, Handle::current()) -} + let rx = signal_with_handle(kind, Handle::current())?; -pub(crate) fn signal_with_handle(kind: SignalKind, handle: Handle) -> io::Result { - let signal = kind.0; + Ok(Signal { + inner: RxFuture::new(rx), + }) +} +pub(crate) fn signal_with_handle( + kind: SignalKind, + handle: Handle, +) -> io::Result> { // Turn the signal delivery on once we are ready for it - signal_enable(signal, handle)?; + signal_enable(kind, handle)?; - // One wakeup in a queue is enough, no need for us to buffer up any - // more. - let (tx, rx) = channel(1); - globals().register_listener(signal as EventId, tx); - - Ok(Signal { rx }) + Ok(globals().register_listener(kind.0 as EventId)) } impl Signal { @@ -393,8 +394,7 @@ impl Signal { /// } /// ``` pub async fn recv(&mut self) -> Option<()> { - use crate::future::poll_fn; - poll_fn(|cx| self.poll_recv(cx)).await + self.inner.recv().await } /// Polls to receive the next signal notification event, outside of an @@ -432,29 +432,19 @@ impl Signal { /// } /// ``` pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - self.rx.poll_recv(cx) - } - - /// Try to receive a signal notification without blocking or registering a waker. - pub(crate) fn try_recv(&mut self) -> Result<(), TryRecvError> { - self.rx.try_recv() + self.inner.poll_recv(cx) } } // Work around for abstracting streams internally -pub(crate) trait InternalStream: Unpin { +pub(crate) trait InternalStream { fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll>; - fn try_recv(&mut self) -> Result<(), TryRecvError>; } impl InternalStream for Signal { fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { self.poll_recv(cx) } - - fn try_recv(&mut self) -> Result<(), TryRecvError> { - self.try_recv() - } } pub(crate) fn ctrl_c() -> io::Result { @@ -467,11 +457,15 @@ mod tests { #[test] fn signal_enable_error_on_invalid_input() { - signal_enable(-1, Handle::default()).unwrap_err(); + signal_enable(SignalKind::from_raw(-1), Handle::default()).unwrap_err(); } #[test] fn signal_enable_error_on_forbidden_input() { - signal_enable(signal_hook_registry::FORBIDDEN[0], Handle::default()).unwrap_err(); + signal_enable( + SignalKind::from_raw(signal_hook_registry::FORBIDDEN[0]), + Handle::default(), + ) + .unwrap_err(); } } diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs index 43af2906c70..c231d6268b4 100644 --- a/tokio/src/signal/windows.rs +++ b/tokio/src/signal/windows.rs @@ -8,7 +8,7 @@ #![cfg(windows)] use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage}; -use crate::sync::mpsc::{channel, Receiver}; +use crate::signal::RxFuture; use std::convert::TryFrom; use std::io; @@ -76,22 +76,18 @@ impl Init for OsExtraData { #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub(crate) struct Event { - rx: Receiver<()>, + inner: RxFuture, } impl Event { fn new(signum: DWORD) -> io::Result { global_init()?; - let (tx, rx) = channel(1); - globals().register_listener(signum as EventId, tx); + let rx = globals().register_listener(signum as EventId); - Ok(Event { rx }) - } - - pub(crate) async fn recv(&mut self) -> Option<()> { - use crate::future::poll_fn; - poll_fn(|cx| self.rx.poll_recv(cx)).await + Ok(Self { + inner: RxFuture::new(rx), + }) } } @@ -195,7 +191,7 @@ impl CtrlC { /// } /// ``` pub async fn recv(&mut self) -> Option<()> { - self.inner.recv().await + self.inner.inner.recv().await } /// Polls to receive the next signal notification event, outside of an @@ -227,7 +223,7 @@ impl CtrlC { /// } /// ``` pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.rx.poll_recv(cx) + self.inner.inner.poll_recv(cx) } } @@ -267,7 +263,7 @@ impl CtrlBreak { /// } /// ``` pub async fn recv(&mut self) -> Option<()> { - self.inner.recv().await + self.inner.inner.recv().await } /// Polls to receive the next signal notification event, outside of an @@ -299,7 +295,7 @@ impl CtrlBreak { /// } /// ``` pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.rx.poll_recv(cx) + self.inner.inner.poll_recv(cx) } } diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index fdffecb837c..5d8c82ea070 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -462,10 +462,8 @@ cfg_sync! { } cfg_not_sync! { - #[cfg(any(feature = "fs", feature = "signal", all(unix, feature = "process")))] - pub(crate) mod batch_semaphore; - cfg_fs! { + pub(crate) mod batch_semaphore; mod mutex; pub(crate) use mutex::Mutex; } @@ -473,20 +471,16 @@ cfg_not_sync! { #[cfg(any(feature = "rt", feature = "signal", all(unix, feature = "process")))] pub(crate) mod notify; + #[cfg(any(feature = "rt", all(windows, feature = "process")))] + pub(crate) mod oneshot; + cfg_atomic_waker_impl! { mod task; pub(crate) use task::AtomicWaker; } - #[cfg(any( - feature = "rt", - feature = "process", - feature = "signal"))] - pub(crate) mod oneshot; - - cfg_signal_internal! { - pub(crate) mod mpsc; - } + #[cfg(any(feature = "signal", all(unix, feature = "process")))] + pub(crate) mod watch; } /// Unit tests diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 760aca367d0..5d17c1e199b 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1,8 +1,5 @@ use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError}; use crate::sync::mpsc::chan; -#[cfg(unix)] -#[cfg(any(feature = "signal", feature = "process"))] -use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::error::{SendError, TrySendError}; cfg_time! { @@ -224,23 +221,6 @@ impl Receiver { crate::future::block_on(self.recv()) } - /// Attempts to return a pending value on this receiver without blocking. - /// - /// This method will never block the caller in order to wait for data to - /// become available. Instead, this will always return immediately with - /// a possible option of pending data on the channel. - /// - /// This is useful for a flavor of "optimistic check" before deciding to - /// block on a receiver. - /// - /// Compared with recv, this function has two failure cases instead of - /// one (one for disconnection, one for an empty buffer). - #[cfg(unix)] - #[cfg(any(feature = "signal", feature = "process"))] - pub(crate) fn try_recv(&mut self) -> Result { - self.chan.try_recv() - } - /// Closes the receiving half of a channel without dropping it. /// /// This prevents any further messages from being sent on the channel while diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index df12744f3fc..554d0228478 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -265,30 +265,6 @@ impl Rx { } } -feature! { - #![all(unix, any(feature = "signal", feature = "process"))] - - use crate::sync::mpsc::error::TryRecvError; - - impl Rx { - /// Receives the next value without blocking - pub(crate) fn try_recv(&mut self) -> Result { - use super::block::Read::*; - self.inner.rx_fields.with_mut(|rx_fields_ptr| { - let rx_fields = unsafe { &mut *rx_fields_ptr }; - match rx_fields.list.pop(&self.inner.tx) { - Some(Value(value)) => { - self.inner.semaphore.add_permit(); - Ok(value) - } - Some(Closed) => Err(TryRecvError::Closed), - None => Err(TryRecvError::Empty), - } - }) - } - } -} - impl Drop for Rx { fn drop(&mut self) { use super::block::Read::Value; diff --git a/tokio/src/sync/mpsc/error.rs b/tokio/src/sync/mpsc/error.rs index d23255b5aab..a2d28240dae 100644 --- a/tokio/src/sync/mpsc/error.rs +++ b/tokio/src/sync/mpsc/error.rs @@ -65,39 +65,6 @@ impl fmt::Display for RecvError { impl Error for RecvError {} -// ===== TryRecvError ===== - -feature! { - #![all(unix, any(feature = "signal", feature = "process"))] - - /// This enumeration is the list of the possible reasons that try_recv - /// could not return data when called. - #[derive(Debug, PartialEq)] - pub(crate) enum TryRecvError { - /// This channel is currently empty, but the Sender(s) have not yet - /// disconnected, so data may yet become available. - Empty, - /// The channel's sending half has been closed, and there will - /// never be any more data received on it. - Closed, - } - - impl fmt::Display for TryRecvError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - fmt, - "{}", - match self { - TryRecvError::Empty => "channel empty", - TryRecvError::Closed => "channel closed", - } - ) - } - } - - impl Error for TryRecvError {} -} - cfg_time! { // ===== SendTimeoutError ===== diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 1ccb040e040..bf6f0ac8605 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))] + //! A single-producer, multi-consumer channel that only retains the *last* sent //! value. //! @@ -51,7 +53,7 @@ //! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed //! [`Sender::closed`]: crate::sync::watch::Sender::closed -use crate::sync::Notify; +use crate::sync::notify::Notify; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::Ordering::{Relaxed, SeqCst}; @@ -198,6 +200,14 @@ pub fn channel(init: T) -> (Sender, Receiver) { } impl Receiver { + fn from_shared(version: usize, shared: Arc>) -> Self { + // No synchronization necessary as this is only used as a counter and + // not memory access. + shared.ref_count_rx.fetch_add(1, Relaxed); + + Self { version, shared } + } + /// Returns a reference to the most recently sent value /// /// Outstanding borrows hold a read lock. This means that long lived borrows @@ -260,6 +270,12 @@ impl Receiver { // loop around again in case the wake-up was spurious } } + + cfg_process_driver! { + pub(crate) fn try_has_changed(&mut self) -> Option> { + maybe_changed(&self.shared, &mut self.version) + } + } } fn maybe_changed( @@ -289,11 +305,7 @@ impl Clone for Receiver { let version = self.version; let shared = self.shared.clone(); - // No synchronization necessary as this is only used as a counter and - // not memory access. - shared.ref_count_rx.fetch_add(1, Relaxed); - - Receiver { shared, version } + Self::from_shared(version, shared) } } @@ -396,6 +408,15 @@ impl Sender { notified.await; debug_assert_eq!(0, self.shared.ref_count_rx.load(Relaxed)); } + + cfg_signal_internal! { + pub(crate) fn subscribe(&self) -> Receiver { + let shared = self.shared.clone(); + let version = shared.version.load(SeqCst); + + Receiver::from_shared(version, shared) + } + } } impl Drop for Sender {