diff --git a/Cargo.toml b/Cargo.toml index 2dda7ca..d8eaacc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,12 +28,12 @@ cfg-if = "1" concurrent-queue = "2.2.0" futures-io = { version = "0.3.28", default-features = false, features = ["std"] } futures-lite = { version = "1.11.0", default-features = false } -log = "0.4.11" parking = "2.0.0" polling = "2.6.0" rustix = { version = "0.37.1", default-features = false, features = ["std", "fs"] } slab = "0.4.2" socket2 = { version = "0.5.3", features = ["all"] } +tracing = { version = "0.1.37", default-features = false } waker-fn = "1.1.0" [build-dependencies] diff --git a/src/driver.rs b/src/driver.rs index 3122f1f..168d55d 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -43,6 +43,9 @@ pub(crate) fn init() { /// The main loop for the "async-io" thread. fn main_loop(parker: parking::Parker) { + let span = tracing::trace_span!("async_io::main_loop"); + let _enter = span.enter(); + // The last observed reactor tick. let mut last_tick = 0; // Number of sleeps since this thread has called `react()`. @@ -61,7 +64,7 @@ fn main_loop(parker: parking::Parker) { }; if let Some(mut reactor_lock) = reactor_lock { - log::trace!("main_loop: waiting on I/O"); + tracing::trace!("waiting on I/O"); reactor_lock.react(None).ok(); last_tick = Reactor::get().ticker(); sleeps = 0; @@ -76,9 +79,9 @@ fn main_loop(parker: parking::Parker) { .get(sleeps as usize) .unwrap_or(&10_000); - log::trace!("main_loop: sleeping for {} us", delay_us); + tracing::trace!("sleeping for {} us", delay_us); if parker.park_timeout(Duration::from_micros(*delay_us)) { - log::trace!("main_loop: notified"); + tracing::trace!("notified"); // If notified before timeout, reset the last tick and the sleep counter. last_tick = Reactor::get().ticker(); @@ -105,7 +108,8 @@ fn main_loop(parker: parking::Parker) { /// }); /// ``` pub fn block_on(future: impl Future) -> T { - log::trace!("block_on()"); + let span = tracing::trace_span!("async_io::block_on"); + let _enter = span.enter(); // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive. BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst); @@ -144,13 +148,13 @@ pub fn block_on(future: impl Future) -> T { loop { // Poll the future. if let Poll::Ready(t) = future.as_mut().poll(cx) { - log::trace!("block_on: completed"); + tracing::trace!("completed"); return t; } // Check if a notification was received. if p.park_timeout(Duration::from_secs(0)) { - log::trace!("block_on: notified"); + tracing::trace!("notified"); // Try grabbing a lock on the reactor to process I/O events. if let Some(mut reactor_lock) = Reactor::get().try_lock() { @@ -183,23 +187,23 @@ pub fn block_on(future: impl Future) -> T { // Check if a notification has been received before `io_blocked` was updated // because in that case the reactor won't receive a wakeup. if p.park_timeout(Duration::from_secs(0)) { - log::trace!("block_on: notified"); + tracing::trace!("notified"); break; } // Wait for I/O events. - log::trace!("block_on: waiting on I/O"); + tracing::trace!("waiting on I/O"); reactor_lock.react(None).ok(); // Check if a notification has been received. if p.park_timeout(Duration::from_secs(0)) { - log::trace!("block_on: notified"); + tracing::trace!("notified"); break; } // Check if this thread been handling I/O events for a long time. if start.elapsed() > Duration::from_micros(500) { - log::trace!("block_on: stops hogging the reactor"); + tracing::trace!("stops hogging the reactor"); // This thread is clearly processing I/O events for some other threads // because it didn't get a notification yet. It's best to stop hogging the @@ -218,7 +222,7 @@ pub fn block_on(future: impl Future) -> T { } } else { // Wait for an actual notification. - log::trace!("block_on: sleep until notification"); + tracing::trace!("sleep until notification"); p.park(); } } diff --git a/src/reactor.rs b/src/reactor.rs index c2958f0..40604e4 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -199,6 +199,9 @@ impl Reactor { /// /// Returns the duration until the next timer before this method was called. fn process_timers(&self, wakers: &mut Vec) -> Option { + let span = tracing::trace_span!("process_timers"); + let _enter = span.enter(); + let mut timers = self.timers.lock().unwrap(); self.process_timer_ops(&mut timers); @@ -227,7 +230,8 @@ impl Reactor { drop(timers); // Add wakers to the list. - log::trace!("process_timers: {} ready wakers", ready.len()); + tracing::trace!("{} ready wakers", ready.len()); + for (_, waker) in ready { wakers.push(waker); } @@ -262,6 +266,9 @@ pub(crate) struct ReactorLock<'a> { impl ReactorLock<'_> { /// Processes new events, blocking until the first event or the timeout. pub(crate) fn react(&mut self, timeout: Option) -> io::Result<()> { + let span = tracing::trace_span!("react"); + let _enter = span.enter(); + let mut wakers = Vec::new(); // Process ready timers. @@ -339,7 +346,7 @@ impl ReactorLock<'_> { }; // Wake up ready tasks. - log::trace!("react: {} ready wakers", wakers.len()); + tracing::trace!("{} ready wakers", wakers.len()); for waker in wakers { // Don't let a panicking waker blow everything up. panic::catch_unwind(|| waker.wake()).ok(); @@ -502,7 +509,7 @@ impl Future for Readable<'_, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("readable: fd={:?}", self.0.handle.source.registration); + tracing::trace!(fd = ?self.0.handle.source.registration, "readable"); Poll::Ready(Ok(())) } } @@ -522,7 +529,7 @@ impl Future for ReadableOwned { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("readable_owned: fd={:?}", self.0.handle.source.registration); + tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned"); Poll::Ready(Ok(())) } } @@ -542,7 +549,7 @@ impl Future for Writable<'_, T> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("writable: fd={:?}", self.0.handle.source.registration); + tracing::trace!(fd = ?self.0.handle.source.registration, "writable"); Poll::Ready(Ok(())) } } @@ -562,7 +569,7 @@ impl Future for WritableOwned { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(&mut self.0).poll(cx))?; - log::trace!("writable_owned: fd={:?}", self.0.handle.source.registration); + tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned"); Poll::Ready(Ok(())) } }