Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio: Add initial io driver metrics #4507

Merged
merged 5 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
target
Cargo.lock

.cargo/config.toml
2 changes: 1 addition & 1 deletion tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ required-features = ["rt-process-signal"]
# For mem check
rt-net = ["tokio/rt", "tokio/rt-multi-thread", "tokio/net"]
# For test-process-signal
rt-process-signal = ["rt", "tokio/process", "tokio/signal"]
rt-process-signal = ["rt-net", "tokio/process", "tokio/signal"]

full = [
"macros",
Expand Down
22 changes: 22 additions & 0 deletions tokio/src/io/driver/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! This file contains mocks of the metrics types used in the I/O driver.
//!
//! The reason these mocks don't live in `src/runtime/mock.rs` is because
//! these need to be available in the case when `net` is enabled but
//! `rt` is not.

cfg_not_rt_and_metrics! {
#[derive(Default)]
pub(crate) struct IoDriverMetrics {}

impl IoDriverMetrics {
pub(crate) fn incr_fd_count(&self) {}
pub(crate) fn dec_fd_count(&self) {}
pub(crate) fn incr_ready_count_by(&self, _amt: u64) {}
}
}

cfg_rt! {
cfg_metrics! {
pub(crate) use crate::runtime::IoDriverMetrics;
}
}
38 changes: 37 additions & 1 deletion tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ pub(crate) use registration::Registration;
mod scheduled_io;
use scheduled_io::ScheduledIo;

mod metrics;

use crate::park::{Park, Unpark};
use crate::util::slab::{self, Slab};
use crate::{loom::sync::Mutex, util::bit};

use metrics::IoDriverMetrics;

use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -74,6 +78,8 @@ pub(super) struct Inner {

/// Used to wake up the reactor from a call to `turn`.
waker: mio::Waker,

metrics: IoDriverMetrics,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -130,6 +136,7 @@ impl Driver {
registry,
io_dispatch: allocator,
waker,
metrics: IoDriverMetrics::default(),
}),
})
}
Expand Down Expand Up @@ -167,14 +174,18 @@ impl Driver {
}

// Process all the events that came in, dispatching appropriately
let mut ready_count = 0;
for event in events.iter() {
let token = event.token();

if token != TOKEN_WAKEUP {
self.dispatch(token, Ready::from_mio(event));
ready_count += 1;
}
}

self.inner.metrics.incr_ready_count_by(ready_count);

self.events = Some(events);

Ok(())
Expand Down Expand Up @@ -279,6 +290,25 @@ cfg_not_rt! {
}
}

cfg_metrics! {
impl Handle {
// TODO: Remove this when handle contains `Arc<Inner>` so that we can return
// &IoDriverMetrics instead of using a closure.
//
// Related issue: https://github.com/tokio-rs/tokio/issues/4509
pub(crate) fn with_io_driver_metrics<F, R>(&self, f: F) -> Option<R>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, calling self.inner() does an arc bump anyway. I probably wouldn't bother w/ the closure, but it isn't a bit deal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, plan to refactor with removal of weak from io driver handle.

where
F: Fn(&IoDriverMetrics) -> R,
{
if let Some(inner) = self.inner() {
Some(f(&inner.metrics))
} else {
None
}
}
}
}

impl Handle {
/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
/// makes the next call to `turn` return immediately.
Expand Down Expand Up @@ -335,12 +365,18 @@ impl Inner {
self.registry
.register(source, mio::Token(token), interest.to_mio())?;

self.metrics.incr_fd_count();

Ok(shared)
}

/// Deregisters an I/O resource from the reactor.
pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
self.registry.deregister(source)
self.registry.deregister(source)?;

self.metrics.dec_fd_count();

Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ pub(crate) use self::doc::winapi;

#[cfg(all(not(docsrs), windows, feature = "net"))]
#[allow(unused)]
pub(crate) use ::winapi;
pub(crate) use winapi;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think rustfmt did it? I didn't manually change it but I don't think it affects anything since this got updated in 2018 edition iirc


cfg_macros! {
/// Implementation detail of the `select!` macro. This macro is **not**
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ macro_rules! cfg_not_metrics {
}
}

macro_rules! cfg_not_rt_and_metrics {
($($item:item)*) => {
$( #[cfg(not(all(feature = "rt", all(tokio_unstable, not(loom)))))] $item )*
}
}

macro_rules! cfg_net {
($($item:item)*) => {
$(
Expand Down
29 changes: 29 additions & 0 deletions tokio/src/runtime/metrics/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#![cfg_attr(not(feature = "net"), allow(dead_code))]

use std::sync::atomic::{AtomicU64, Ordering::Relaxed};

#[derive(Default)]
pub(crate) struct IoDriverMetrics {
pub(super) fd_count: AtomicU64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth tracking registered & deregistered separately? Right now, if an FD is registered then immediately deregistered, this will show up as the counter not moving. What is the value in tracking outstanding FDs? If registered & deregistered ops are tracked separately, churn will be detectable.

Separately, do we want to track fd_count or punt that to tracing? I am not sure. Thoughts @hawkw

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this catch things like leaked fds etc? I think there is value in tracking this internally to the driver in addition to what ever tracing can bring?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracking register & deregister separately would be a super set. Total fds == registers - deregisters.

pub(super) ready_count: AtomicU64,
}

impl IoDriverMetrics {
pub(crate) fn incr_fd_count(&self) {
let prev = self.fd_count.load(Relaxed);
let new = prev.wrapping_add(1);
self.fd_count.store(new, Relaxed);
}

pub(crate) fn dec_fd_count(&self) {
let prev = self.fd_count.load(Relaxed);
let new = prev.wrapping_sub(1);
self.fd_count.store(new, Relaxed);
}

pub(crate) fn incr_ready_count_by(&self, amt: u64) {
let prev = self.fd_count.load(Relaxed);
let new = prev.wrapping_add(amt);
self.ready_count.store(new, Relaxed);
}
}
5 changes: 5 additions & 0 deletions tokio/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ cfg_metrics! {

mod worker;
pub(crate) use worker::WorkerMetrics;

cfg_net! {
mod io;
pub(crate) use io::IoDriverMetrics;
}
}

cfg_not_metrics! {
Expand Down
58 changes: 58 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,61 @@ impl RuntimeMetrics {
self.handle.spawner.worker_local_queue_depth(worker)
}
}

cfg_net! {
impl RuntimeMetrics {
/// Returns the number of file descriptors currently registered with the
/// runtime's I/O driver.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.io_driver_fd_count();
/// println!("{} fds currently registered with the runtime's I/O driver.", n);
/// }
/// ```
pub fn io_driver_fd_count(&self) -> u64 {
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
self.with_io_driver_metrics(|m| m.fd_count.load(Relaxed))
}

/// Returns the number of ready events processed by the runtime's
/// I/O driver.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.io_driver_ready_count();
/// println!("{} ready events procssed by the runtime's I/O driver.", n);
/// }
/// ```
pub fn io_driver_ready_count(&self) -> u64 {
self.with_io_driver_metrics(|m| m.ready_count.load(Relaxed))
}

fn with_io_driver_metrics<F>(&self, f: F) -> u64
where
F: Fn(&super::IoDriverMetrics) -> u64,
{
// TODO: Investigate if this should return 0, most of our metrics always increase
// thus this breaks that guarantee.
self.handle
.io_handle
.as_ref()
.map(|h| h.with_io_driver_metrics(f))
.flatten()
.unwrap_or(0)
}
}
}
4 changes: 4 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ cfg_metrics! {
pub use metrics::RuntimeMetrics;

pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics};

cfg_net! {
pub(crate) use metrics::IoDriverMetrics;
}
}

cfg_not_metrics! {
Expand Down
28 changes: 28 additions & 0 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,34 @@ fn worker_local_queue_depth() {
});
}

#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_fd_count() {
let rt = basic();
let metrics = rt.metrics();

let stream = tokio::net::TcpStream::connect("google.com:80");
let stream = rt.block_on(async move { stream.await.unwrap() });

assert_eq!(metrics.io_driver_fd_count(), 2);

drop(stream);

assert_eq!(metrics.io_driver_fd_count(), 1);
}

#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_ready_count() {
let rt = basic();
let metrics = rt.metrics();

let stream = tokio::net::TcpStream::connect("google.com:80");
let _stream = rt.block_on(async move { stream.await.unwrap() });

assert_eq!(metrics.io_driver_ready_count(), 3);
}

fn basic() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand Down