diff --git a/tokio/src/io/util/empty.rs b/tokio/src/io/util/empty.rs index 9e648f87e62..b96fabbaabe 100644 --- a/tokio/src/io/util/empty.rs +++ b/tokio/src/io/util/empty.rs @@ -53,6 +53,7 @@ impl AsyncRead for Empty { cx: &mut Context<'_>, _: &mut ReadBuf<'_>, ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); ready!(poll_proceed_and_make_progress(cx)); Poll::Ready(Ok(())) } @@ -61,6 +62,7 @@ impl AsyncRead for Empty { impl AsyncBufRead for Empty { #[inline] fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); ready!(poll_proceed_and_make_progress(cx)); Poll::Ready(Ok(&[])) } diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index 31884b39614..5b404c21940 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -233,6 +233,7 @@ impl AsyncRead for Pipe { cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ret = self.poll_read_internal(cx, buf); @@ -249,6 +250,7 @@ impl AsyncRead for Pipe { cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); self.poll_read_internal(cx, buf) } } @@ -261,6 +263,7 @@ impl AsyncWrite for Pipe { cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ret = self.poll_write_internal(cx, buf); @@ -277,6 +280,7 @@ impl AsyncWrite for Pipe { cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); self.poll_write_internal(cx, buf) } } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index c324ba1d6ae..c712945a480 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -568,6 +568,10 @@ cfg_time! { } mod trace { + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + cfg_taskdump! { pub(crate) use crate::runtime::task::trace::trace_leaf; } @@ -579,6 +583,22 @@ mod trace { std::task::Poll::Ready(()) } } + + #[cfg_attr(not(feature = "sync"), allow(dead_code))] + pub(crate) fn async_trace_leaf() -> impl Future { + struct Trace; + + impl Future for Trace { + type Output = (); + + #[inline(always)] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + trace_leaf(cx) + } + } + + Trace + } } mod util; diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index 8b22cfbe178..d68a68995ec 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -1011,6 +1011,7 @@ where type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 0c3ee5f1089..e9011b66fff 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -433,8 +433,8 @@ cfg_rt! { cfg_taskdump! { /// SAFETY: Callers of this function must ensure that trace frames always /// form a valid linked list. - pub(crate) unsafe fn with_trace(f: impl FnOnce(&trace::Context) -> R) -> R { - CONTEXT.with(|c| f(&c.trace)) + pub(crate) unsafe fn with_trace(f: impl FnOnce(&trace::Context) -> R) -> Option { + CONTEXT.try_with(|c| f(&c.trace)).ok() } } } diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 140b9240ae4..341fa0539a1 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -144,6 +144,7 @@ impl Registration { cx: &mut Context<'_>, direction: Direction, ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index fb1909c3574..2f1d7dba4f0 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -60,6 +60,11 @@ pin_project_lite::pin_project! { } } +const FAIL_NO_THREAD_LOCAL: &str = "The Tokio thread-local has been destroyed \ + as part of shutting down the current \ + thread, so collecting a taskdump is not \ + possible."; + impl Context { pub(crate) const fn new() -> Self { Context { @@ -70,7 +75,7 @@ impl Context { /// SAFETY: Callers of this function must ensure that trace frames always /// form a valid linked list. - unsafe fn with_current(f: F) -> R + unsafe fn try_with_current(f: F) -> Option where F: FnOnce(&Self) -> R, { @@ -81,14 +86,18 @@ impl Context { where F: FnOnce(&Cell>>) -> R, { - Self::with_current(|context| f(&context.active_frame)) + Self::try_with_current(|context| f(&context.active_frame)).expect(FAIL_NO_THREAD_LOCAL) } fn with_current_collector(f: F) -> R where F: FnOnce(&Cell>) -> R, { - unsafe { Self::with_current(|context| f(&context.collector)) } + // SAFETY: This call can only access the collector field, so it cannot + // break the trace frame linked list. + unsafe { + Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL) + } } } @@ -132,7 +141,7 @@ impl Trace { pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> { // Safety: We don't manipulate the current context's active frame. let did_trace = unsafe { - Context::with_current(|context_cell| { + Context::try_with_current(|context_cell| { if let Some(mut collector) = context_cell.collector.take() { let mut frames = vec![]; let mut above_leaf = false; @@ -164,6 +173,7 @@ pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> { false } }) + .unwrap_or(false) }; if did_trace { diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index 2ce1d731402..29b6d4e4848 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -132,6 +132,8 @@ impl Barrier { return self.wait_internal().await; } async fn wait_internal(&self) -> BarrierWaitResult { + crate::trace::async_trace_leaf().await; + // NOTE: we are taking a _synchronous_ lock here. // It is okay to do so because the critical section is fast and never yields, so it cannot // deadlock even if another future is concurrently holding the lock. diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 9f8c45a7caf..4b36452cec3 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1279,6 +1279,8 @@ where type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); + let (receiver, waiter) = self.project(); let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) { diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 3ac82acd0de..e870ae5f42e 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -861,6 +861,8 @@ impl Sender { } async fn reserve_inner(&self) -> Result<(), SendError<()>> { + crate::trace::async_trace_leaf().await; + match self.chan.semaphore().semaphore.acquire(1).await { Ok(_) => Ok(()), Err(_) => Err(SendError(())), diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index edd3e9527b0..6f87715dd34 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -242,6 +242,8 @@ impl Rx { pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { use super::block::Read::*; + ready!(crate::trace::trace_leaf(cx)); + // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index a378116f857..549c77b321e 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -629,6 +629,8 @@ impl Mutex { } async fn acquire(&self) { + crate::trace::async_trace_leaf().await; + self.s.acquire(1).await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and // we own it exclusively, which means that this can never happen. diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 058b067ea98..0f104b71aa2 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -885,7 +885,7 @@ impl Notified<'_> { let (notify, state, notify_waiters_calls, waiter) = self.project(); - loop { + 'outer_loop: loop { match *state { Init => { let curr = notify.state.load(SeqCst); @@ -901,7 +901,7 @@ impl Notified<'_> { if res.is_ok() { // Acquired the notification *state = Done; - return Poll::Ready(()); + continue 'outer_loop; } // Clone the waker before locking, a waker clone can be @@ -919,7 +919,7 @@ impl Notified<'_> { // was created, then we are done if get_num_notify_waiters_calls(curr) != *notify_waiters_calls { *state = Done; - return Poll::Ready(()); + continue 'outer_loop; } // Transition the state to WAITING. @@ -955,7 +955,7 @@ impl Notified<'_> { Ok(_) => { // Acquired the notification *state = Done; - return Poll::Ready(()); + continue 'outer_loop; } Err(actual) => { assert_eq!(get_state(actual), EMPTY); @@ -990,6 +990,12 @@ impl Notified<'_> { return Poll::Pending; } Waiting => { + #[cfg(tokio_taskdump)] + if let Some(waker) = waker { + let mut ctx = Context::from_waker(waker); + ready!(crate::trace::trace_leaf(&mut ctx)); + } + if waiter.notification.load(Acquire).is_some() { // Safety: waiter is already unlinked and will not be shared again, // so we have an exclusive access to `waker`. @@ -1078,6 +1084,11 @@ impl Notified<'_> { drop(old_waker); } Done => { + #[cfg(tokio_taskdump)] + if let Some(waker) = waker { + let mut ctx = Context::from_waker(waker); + ready!(crate::trace::trace_leaf(&mut ctx)); + } return Poll::Ready(()); } } diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 081efae3472..90ea5cd6862 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -301,6 +301,8 @@ impl OnceCell { F: FnOnce() -> Fut, Fut: Future, { + crate::trace::async_trace_leaf().await; + if self.initialized() { // SAFETY: The OnceCell has been fully initialized. unsafe { self.get_unchecked() } @@ -349,6 +351,8 @@ impl OnceCell { F: FnOnce() -> Fut, Fut: Future>, { + crate::trace::async_trace_leaf().await; + if self.initialized() { // SAFETY: The OnceCell has been fully initialized. unsafe { Ok(self.get_unchecked()) } diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index da64899d095..0ff3ea28177 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -790,6 +790,8 @@ impl Sender { /// } /// ``` pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { + ready!(crate::trace::trace_leaf(cx)); + // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); @@ -1130,6 +1132,7 @@ impl Inner { } fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); // Keep track of task budget let coop = ready!(crate::runtime::coop::poll_proceed(cx)); diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 8213da6dea4..61049b71ef7 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -740,6 +740,8 @@ async fn changed_impl( shared: &Shared, version: &mut Version, ) -> Result<(), error::RecvError> { + crate::trace::async_trace_leaf().await; + loop { // In order to avoid a race condition, we first request a notification, // **then** check the current value's version. If a new version exists, @@ -1038,6 +1040,8 @@ impl Sender { /// } /// ``` pub async fn closed(&self) { + crate::trace::async_trace_leaf().await; + while self.receiver_count() > 0 { let notified = self.shared.notify_tx.notified(); diff --git a/tokio/src/task/consume_budget.rs b/tokio/src/task/consume_budget.rs index 1212cfccd77..e7432ffe7d1 100644 --- a/tokio/src/task/consume_budget.rs +++ b/tokio/src/task/consume_budget.rs @@ -33,6 +33,7 @@ pub async fn consume_budget() { let mut status = Poll::Pending; crate::future::poll_fn(move |cx| { + ready!(crate::trace::trace_leaf(cx)); if status.is_ready() { return status; } diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index ccf53ef8ca2..6c9b3379366 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -400,6 +400,8 @@ impl Sleep { fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { let me = self.project(); + ready!(crate::trace::trace_leaf(cx)); + // Keep track of task budget #[cfg(all(tokio_unstable, feature = "tracing"))] let coop = ready!(trace_poll_op!(