Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace log with tracing #140

Merged
merged 2 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ cfg-if = "1"
concurrent-queue = "2.2.0"
futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
futures-lite = { version = "1.11.0", default-features = false }
log = "0.4.11"
parking = "2.0.0"
polling = "2.6.0"
rustix = { version = "0.37.1", default-features = false, features = ["std", "fs"] }
slab = "0.4.2"
socket2 = { version = "0.5.3", features = ["all"] }
tracing = { version = "0.1.37", default-features = false }
waker-fn = "1.1.0"

[build-dependencies]
Expand Down
26 changes: 15 additions & 11 deletions src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub(crate) fn init() {

/// The main loop for the "async-io" thread.
fn main_loop(parker: parking::Parker) {
let span = tracing::trace_span!("async_io::main_loop");
let _enter = span.enter();

// The last observed reactor tick.
let mut last_tick = 0;
// Number of sleeps since this thread has called `react()`.
Expand All @@ -61,7 +64,7 @@ fn main_loop(parker: parking::Parker) {
};

if let Some(mut reactor_lock) = reactor_lock {
log::trace!("main_loop: waiting on I/O");
tracing::trace!("waiting on I/O");
reactor_lock.react(None).ok();
last_tick = Reactor::get().ticker();
sleeps = 0;
Expand All @@ -76,9 +79,9 @@ fn main_loop(parker: parking::Parker) {
.get(sleeps as usize)
.unwrap_or(&10_000);

log::trace!("main_loop: sleeping for {} us", delay_us);
tracing::trace!("sleeping for {} us", delay_us);
if parker.park_timeout(Duration::from_micros(*delay_us)) {
log::trace!("main_loop: notified");
tracing::trace!("notified");

// If notified before timeout, reset the last tick and the sleep counter.
last_tick = Reactor::get().ticker();
Expand All @@ -105,7 +108,8 @@ fn main_loop(parker: parking::Parker) {
/// });
/// ```
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
log::trace!("block_on()");
let span = tracing::trace_span!("async_io::block_on");
let _enter = span.enter();

// Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
Expand Down Expand Up @@ -144,13 +148,13 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
loop {
// Poll the future.
if let Poll::Ready(t) = future.as_mut().poll(cx) {
log::trace!("block_on: completed");
tracing::trace!("completed");
return t;
}

// Check if a notification was received.
if p.park_timeout(Duration::from_secs(0)) {
log::trace!("block_on: notified");
tracing::trace!("notified");

// Try grabbing a lock on the reactor to process I/O events.
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
Expand Down Expand Up @@ -183,23 +187,23 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
// Check if a notification has been received before `io_blocked` was updated
// because in that case the reactor won't receive a wakeup.
if p.park_timeout(Duration::from_secs(0)) {
log::trace!("block_on: notified");
tracing::trace!("notified");
break;
}

// Wait for I/O events.
log::trace!("block_on: waiting on I/O");
tracing::trace!("waiting on I/O");
reactor_lock.react(None).ok();

// Check if a notification has been received.
if p.park_timeout(Duration::from_secs(0)) {
log::trace!("block_on: notified");
tracing::trace!("notified");
break;
}

// Check if this thread been handling I/O events for a long time.
if start.elapsed() > Duration::from_micros(500) {
log::trace!("block_on: stops hogging the reactor");
tracing::trace!("stops hogging the reactor");

// This thread is clearly processing I/O events for some other threads
// because it didn't get a notification yet. It's best to stop hogging the
Expand All @@ -218,7 +222,7 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
}
} else {
// Wait for an actual notification.
log::trace!("block_on: sleep until notification");
tracing::trace!("sleep until notification");
p.park();
}
}
Expand Down
19 changes: 13 additions & 6 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ impl Reactor {
///
/// Returns the duration until the next timer before this method was called.
fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
let span = tracing::trace_span!("process_timers");
let _enter = span.enter();

let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);

Expand Down Expand Up @@ -227,7 +230,8 @@ impl Reactor {
drop(timers);

// Add wakers to the list.
log::trace!("process_timers: {} ready wakers", ready.len());
tracing::trace!("{} ready wakers", ready.len());

for (_, waker) in ready {
wakers.push(waker);
}
Expand Down Expand Up @@ -262,6 +266,9 @@ pub(crate) struct ReactorLock<'a> {
impl ReactorLock<'_> {
/// Processes new events, blocking until the first event or the timeout.
pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
let span = tracing::trace_span!("react");
let _enter = span.enter();

let mut wakers = Vec::new();

// Process ready timers.
Expand Down Expand Up @@ -339,7 +346,7 @@ impl ReactorLock<'_> {
};

// Wake up ready tasks.
log::trace!("react: {} ready wakers", wakers.len());
tracing::trace!("{} ready wakers", wakers.len());
for waker in wakers {
// Don't let a panicking waker blow everything up.
panic::catch_unwind(|| waker.wake()).ok();
Expand Down Expand Up @@ -502,7 +509,7 @@ impl<T> Future for Readable<'_, T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
log::trace!("readable: fd={:?}", self.0.handle.source.registration);
tracing::trace!(fd=?self.0.handle.source.registration, "readable");
notgull marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(Ok(()))
}
}
Expand All @@ -522,7 +529,7 @@ impl<T> Future for ReadableOwned<T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
log::trace!("readable_owned: fd={:?}", self.0.handle.source.registration);
tracing::trace!(fd=?self.0.handle.source.registration, "readable_owned");
notgull marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(Ok(()))
}
}
Expand All @@ -542,7 +549,7 @@ impl<T> Future for Writable<'_, T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
log::trace!("writable: fd={:?}", self.0.handle.source.registration);
tracing::trace!(fd=?self.0.handle.source.registration, "writable");
notgull marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(Ok(()))
}
}
Expand All @@ -562,7 +569,7 @@ impl<T> Future for WritableOwned<T> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
log::trace!("writable_owned: fd={:?}", self.0.handle.source.registration);
tracing::trace!(fd=?self.0.handle.source.registration, "writable_owned");
notgull marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(Ok(()))
}
}
Expand Down