From dd15ab169bd4253902f96e06fc2d14eed1ae989e Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sat, 20 May 2023 11:28:17 +0200 Subject: [PATCH 1/7] taskdump: instrument the remaining leaf futures --- tokio/src/io/util/empty.rs | 2 ++ tokio/src/io/util/mem.rs | 4 ++++ tokio/src/lib.rs | 18 ++++++++++++++++++ tokio/src/process/mod.rs | 1 + tokio/src/runtime/io/registration.rs | 1 + tokio/src/sync/barrier.rs | 2 ++ tokio/src/sync/broadcast.rs | 2 ++ tokio/src/sync/mpsc/bounded.rs | 2 ++ tokio/src/sync/mpsc/chan.rs | 2 ++ tokio/src/sync/mutex.rs | 2 ++ tokio/src/sync/notify.rs | 11 +++++++---- tokio/src/sync/once_cell.rs | 4 ++++ tokio/src/sync/oneshot.rs | 3 +++ tokio/src/sync/watch.rs | 4 ++++ tokio/src/task/consume_budget.rs | 1 + tokio/src/time/sleep.rs | 2 ++ 16 files changed, 57 insertions(+), 4 deletions(-) diff --git a/tokio/src/io/util/empty.rs b/tokio/src/io/util/empty.rs index 9e648f87e62..b96fabbaabe 100644 --- a/tokio/src/io/util/empty.rs +++ b/tokio/src/io/util/empty.rs @@ -53,6 +53,7 @@ impl AsyncRead for Empty { cx: &mut Context<'_>, _: &mut ReadBuf<'_>, ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); ready!(poll_proceed_and_make_progress(cx)); Poll::Ready(Ok(())) } @@ -61,6 +62,7 @@ impl AsyncRead for Empty { impl AsyncBufRead for Empty { #[inline] fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); ready!(poll_proceed_and_make_progress(cx)); Poll::Ready(Ok(&[])) } diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index 31884b39614..5b404c21940 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -233,6 +233,7 @@ impl AsyncRead for Pipe { cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ret = self.poll_read_internal(cx, buf); @@ -249,6 +250,7 @@ impl AsyncRead for Pipe { cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); self.poll_read_internal(cx, buf) } } @@ -261,6 +263,7 @@ impl AsyncWrite for Pipe { cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ret = self.poll_write_internal(cx, buf); @@ -277,6 +280,7 @@ impl AsyncWrite for Pipe { cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); self.poll_write_internal(cx, buf) } } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index c324ba1d6ae..8e1eabeb55d 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -579,6 +579,24 @@ mod trace { std::task::Poll::Ready(()) } } + + pub(crate) fn async_trace_leaf() -> impl Future { + use std::pin::Pin; + use std::task::{Context, Poll}; + + struct Trace; + + impl Future for Trace { + type Output = (); + + #[inline(always)] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + trace_leaf() + } + } + + Trace + } } mod util; diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index 8b22cfbe178..d68a68995ec 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -1011,6 +1011,7 @@ where type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 140b9240ae4..341fa0539a1 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -144,6 +144,7 @@ impl Registration { cx: &mut Context<'_>, direction: Direction, ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index 2ce1d731402..29b6d4e4848 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -132,6 +132,8 @@ impl Barrier { return self.wait_internal().await; } async fn wait_internal(&self) -> BarrierWaitResult { + crate::trace::async_trace_leaf().await; + // NOTE: we are taking a _synchronous_ lock here. // It is okay to do so because the critical section is fast and never yields, so it cannot // deadlock even if another future is concurrently holding the lock. diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 9f8c45a7caf..4b36452cec3 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1279,6 +1279,8 @@ where type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); + let (receiver, waiter) = self.project(); let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) { diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 3ac82acd0de..e870ae5f42e 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -861,6 +861,8 @@ impl Sender { } async fn reserve_inner(&self) -> Result<(), SendError<()>> { + crate::trace::async_trace_leaf().await; + match self.chan.semaphore().semaphore.acquire(1).await { Ok(_) => Ok(()), Err(_) => Err(SendError(())), diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index edd3e9527b0..6f87715dd34 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -242,6 +242,8 @@ impl Rx { pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { use super::block::Read::*; + ready!(crate::trace::trace_leaf(cx)); + // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index a378116f857..549c77b321e 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -629,6 +629,8 @@ impl Mutex { } async fn acquire(&self) { + crate::trace::async_trace_leaf().await; + self.s.acquire(1).await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and // we own it exclusively, which means that this can never happen. diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 058b067ea98..92809349902 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -885,6 +885,7 @@ impl Notified<'_> { let (notify, state, notify_waiters_calls, waiter) = self.project(); + 'outer_loop: loop { match *state { Init => { @@ -901,7 +902,7 @@ impl Notified<'_> { if res.is_ok() { // Acquired the notification *state = Done; - return Poll::Ready(()); + continue 'outer_loop; } // Clone the waker before locking, a waker clone can be @@ -919,7 +920,7 @@ impl Notified<'_> { // was created, then we are done if get_num_notify_waiters_calls(curr) != *notify_waiters_calls { *state = Done; - return Poll::Ready(()); + continue 'outer_loop; } // Transition the state to WAITING. @@ -955,7 +956,7 @@ impl Notified<'_> { Ok(_) => { // Acquired the notification *state = Done; - return Poll::Ready(()); + continue 'outer_loop; } Err(actual) => { assert_eq!(get_state(actual), EMPTY); @@ -990,6 +991,8 @@ impl Notified<'_> { return Poll::Pending; } Waiting => { + ready!(crate::trace::trace_leaf(cx)); + if waiter.notification.load(Acquire).is_some() { // Safety: waiter is already unlinked and will not be shared again, // so we have an exclusive access to `waker`. @@ -1078,7 +1081,7 @@ impl Notified<'_> { drop(old_waker); } Done => { - return Poll::Ready(()); + return crate::trace::trace_leaf(cx); } } } diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 081efae3472..90ea5cd6862 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -301,6 +301,8 @@ impl OnceCell { F: FnOnce() -> Fut, Fut: Future, { + crate::trace::async_trace_leaf().await; + if self.initialized() { // SAFETY: The OnceCell has been fully initialized. unsafe { self.get_unchecked() } @@ -349,6 +351,8 @@ impl OnceCell { F: FnOnce() -> Fut, Fut: Future>, { + crate::trace::async_trace_leaf().await; + if self.initialized() { // SAFETY: The OnceCell has been fully initialized. unsafe { Ok(self.get_unchecked()) } diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index da64899d095..0ff3ea28177 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -790,6 +790,8 @@ impl Sender { /// } /// ``` pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { + ready!(crate::trace::trace_leaf(cx)); + // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); @@ -1130,6 +1132,7 @@ impl Inner { } fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 8213da6dea4..61049b71ef7 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -740,6 +740,8 @@ async fn changed_impl( shared: &Shared, version: &mut Version, ) -> Result<(), error::RecvError> { + crate::trace::async_trace_leaf().await; + loop { // In order to avoid a race condition, we first request a notification, // **then** check the current value's version. If a new version exists, @@ -1038,6 +1040,8 @@ impl Sender { /// } /// ``` pub async fn closed(&self) { + crate::trace::async_trace_leaf().await; + while self.receiver_count() > 0 { let notified = self.shared.notify_tx.notified(); diff --git a/tokio/src/task/consume_budget.rs b/tokio/src/task/consume_budget.rs index 1212cfccd77..e7432ffe7d1 100644 --- a/tokio/src/task/consume_budget.rs +++ b/tokio/src/task/consume_budget.rs @@ -33,6 +33,7 @@ pub async fn consume_budget() { let mut status = Poll::Pending; crate::future::poll_fn(move |cx| { + ready!(crate::trace::trace_leaf(cx)); if status.is_ready() { return status; } diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index ccf53ef8ca2..6c9b3379366 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -400,6 +400,8 @@ impl Sleep { fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { let me = self.project(); + ready!(crate::trace::trace_leaf(cx)); + // Keep track of task budget #[cfg(all(tokio_unstable, feature = "tracing"))] let coop = ready!(trace_poll_op!( From 22f64a3a9e4b85e4ba4143de584de209d56b0a5a Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sat, 20 May 2023 11:35:31 +0200 Subject: [PATCH 2/7] fix build --- tokio/src/lib.rs | 9 +++++---- tokio/src/sync/notify.rs | 16 ++++++++++++---- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 8e1eabeb55d..3a350e4e497 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -568,6 +568,10 @@ cfg_time! { } mod trace { + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + cfg_taskdump! { pub(crate) use crate::runtime::task::trace::trace_leaf; } @@ -581,9 +585,6 @@ mod trace { } pub(crate) fn async_trace_leaf() -> impl Future { - use std::pin::Pin; - use std::task::{Context, Poll}; - struct Trace; impl Future for Trace { @@ -591,7 +592,7 @@ mod trace { #[inline(always)] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - trace_leaf() + trace_leaf(cx) } } diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 92809349902..b26be6be59f 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -885,8 +885,7 @@ impl Notified<'_> { let (notify, state, notify_waiters_calls, waiter) = self.project(); - 'outer_loop: - loop { + 'outer_loop: loop { match *state { Init => { let curr = notify.state.load(SeqCst); @@ -991,7 +990,11 @@ impl Notified<'_> { return Poll::Pending; } Waiting => { - ready!(crate::trace::trace_leaf(cx)); + #[cfg(tokio_taskdump)] + if let Some(waker) = waker { + let mut ctx = Context::from_waker(waker); + ready!(crate::trace::trace_leaf(&mut ctx)); + } if waiter.notification.load(Acquire).is_some() { // Safety: waiter is already unlinked and will not be shared again, @@ -1081,7 +1084,12 @@ impl Notified<'_> { drop(old_waker); } Done => { - return crate::trace::trace_leaf(cx); + #[cfg(tokio_taskdump)] + if let Some(waker) = waker { + let mut ctx = Context::from_waker(waker); + ready!(crate::trace::trace_leaf(&mut ctx)); + } + return Poll::Pending; } } } From 29b5649f4dfcf7126d748d17bf853bd79db8684d Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sat, 20 May 2023 12:04:58 +0200 Subject: [PATCH 3/7] fix notify --- tokio/src/sync/notify.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index b26be6be59f..0f104b71aa2 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -1089,7 +1089,7 @@ impl Notified<'_> { let mut ctx = Context::from_waker(waker); ready!(crate::trace::trace_leaf(&mut ctx)); } - return Poll::Pending; + return Poll::Ready(()); } } } From 3ede7bf3f3b048164194b6e5fa1f072e981281da Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sat, 20 May 2023 12:22:31 +0200 Subject: [PATCH 4/7] fix unused warning --- tokio/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 3a350e4e497..c712945a480 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -584,6 +584,7 @@ mod trace { } } + #[cfg_attr(not(feature = "sync"), allow(dead_code))] pub(crate) fn async_trace_leaf() -> impl Future { struct Trace; From 0e86e9aa3763805fe4744d49618766b5965bb791 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 23 May 2023 20:23:10 +0200 Subject: [PATCH 5/7] Fix thread-local shutdown failure --- tokio/src/runtime/context.rs | 4 ++-- tokio/src/runtime/task/trace/mod.rs | 21 ++++++++++++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 48e3f6087bc..b356445b4ca 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -408,8 +408,8 @@ cfg_rt! { cfg_taskdump! { /// SAFETY: Callers of this function must ensure that trace frames always /// form a valid linked list. - pub(crate) unsafe fn with_trace(f: impl FnOnce(&trace::Context) -> R) -> R { - CONTEXT.with(|c| f(&c.trace)) + pub(crate) unsafe fn with_trace(f: impl FnOnce(&trace::Context) -> R) -> Option { + CONTEXT.try_with(|c| f(&c.trace)).ok() } } } diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 5a21527551a..50e0386e11c 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -59,6 +59,11 @@ pin_project_lite::pin_project! { } } +const FAIL_NO_THREAD_LOCAL: &str = "The Tokio thread-local has been destroyed \ + as part of shutting down the curren \ + thread, so collecting a taskdump is not \ + possible."; + impl Context { pub(crate) const fn new() -> Self { Context { @@ -69,7 +74,7 @@ impl Context { /// SAFETY: Callers of this function must ensure that trace frames always /// form a valid linked list. - unsafe fn with_current(f: F) -> R + unsafe fn try_with_current(f: F) -> Option where F: FnOnce(&Self) -> R, { @@ -80,14 +85,20 @@ impl Context { where F: FnOnce(&Cell>>) -> R, { - Self::with_current(|context| f(&context.active_frame)) + Self::try_with_current(|context| f(&context.active_frame)) + .expect(FAIL_NO_THREAD_LOCAL) } fn with_current_collector(f: F) -> R where F: FnOnce(&Cell>) -> R, { - unsafe { Self::with_current(|context| f(&context.collector)) } + // SAFETY: This call can only access the collector field, so it cannot + // break the trace frame linked list. + unsafe { + Self::try_with_current(|context| f(&context.collector)) + .expect(FAIL_NO_THREAD_LOCAL) + } } } @@ -131,7 +142,7 @@ impl Trace { pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> { // Safety: We don't manipulate the current context's active frame. let did_trace = unsafe { - Context::with_current(|context_cell| { + Context::try_with_current(|context_cell| { if let Some(mut collector) = context_cell.collector.take() { let mut frames = vec![]; let mut above_leaf = false; @@ -162,7 +173,7 @@ pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> { } else { false } - }) + }).unwrap_or(false) }; if did_trace { From 5136ee47a35dffa4114d19b23cf5ad4a94fe83eb Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 23 May 2023 20:24:12 +0200 Subject: [PATCH 6/7] fmt --- tokio/src/runtime/task/trace/mod.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 50e0386e11c..fd6c8dc72c7 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -85,8 +85,7 @@ impl Context { where F: FnOnce(&Cell>>) -> R, { - Self::try_with_current(|context| f(&context.active_frame)) - .expect(FAIL_NO_THREAD_LOCAL) + Self::try_with_current(|context| f(&context.active_frame)).expect(FAIL_NO_THREAD_LOCAL) } fn with_current_collector(f: F) -> R @@ -96,8 +95,7 @@ impl Context { // SAFETY: This call can only access the collector field, so it cannot // break the trace frame linked list. unsafe { - Self::try_with_current(|context| f(&context.collector)) - .expect(FAIL_NO_THREAD_LOCAL) + Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL) } } } @@ -173,7 +171,8 @@ pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> { } else { false } - }).unwrap_or(false) + }) + .unwrap_or(false) }; if did_trace { From b3776b03b8cc292a7621aab7516e832c94a80179 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sat, 27 May 2023 13:46:52 +0200 Subject: [PATCH 7/7] Fix error message Co-authored-by: Jack Wrenn --- tokio/src/runtime/task/trace/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 5070b4a0717..2f1d7dba4f0 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -61,7 +61,7 @@ pin_project_lite::pin_project! { } const FAIL_NO_THREAD_LOCAL: &str = "The Tokio thread-local has been destroyed \ - as part of shutting down the curren \ + as part of shutting down the current \ thread, so collecting a taskdump is not \ possible.";