diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index 1bb4e261f73..1d0f3dea30a 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -73,7 +73,8 @@ impl MetricsBatch { } } - pub(crate) fn submit(&mut self, worker: &WorkerMetrics) { + pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { + worker.mean_poll_time.store(mean_poll_time, Relaxed); worker.park_count.store(self.park_count, Relaxed); worker.noop_count.store(self.noop_count, Relaxed); worker.steal_count.store(self.steal_count, Relaxed); diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index 8f8345c08b4..e4bb3a99d0c 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -37,7 +37,7 @@ impl MetricsBatch { Self {} } - pub(crate) fn submit(&mut self, _to: &WorkerMetrics) {} + pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {} pub(crate) fn about_to_park(&mut self) {} pub(crate) fn inc_local_schedule_count(&mut self) {} pub(crate) fn start_processing_scheduled_tasks(&mut self) {} diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 1f990a1f852..95b517c94eb 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -769,6 +769,47 @@ impl RuntimeMetrics { .unwrap_or_default() } + /// Returns the mean duration of task polls, in nanoseconds. + /// + /// This is an exponentially weighted moving average. Currently, this metric + /// is only provided by the multi-threaded runtime. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_mean_poll_time(0); + /// println!("worker 0 has a mean poll time of {:?}", n); + /// } + /// ``` + #[track_caller] + pub fn worker_mean_poll_time(&self, worker: usize) -> Duration { + let nanos = self + .handle + .inner + .worker_metrics(worker) + .mean_poll_time + .load(Relaxed); + Duration::from_nanos(nanos) + } + /// Returns the number of tasks currently scheduled in the blocking /// thread pool, spawned using `spawn_blocking`. /// diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index e0f23e6a08e..b53a86bcc87 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -28,6 +28,9 @@ pub(crate) struct WorkerMetrics { /// Number of tasks the worker polled. pub(crate) poll_count: AtomicU64, + /// EWMA task poll time, in nanoseconds. + pub(crate) mean_poll_time: AtomicU64, + /// Amount of time the worker spent doing work vs. parking. pub(crate) busy_duration_total: AtomicU64, @@ -62,6 +65,7 @@ impl WorkerMetrics { steal_count: AtomicU64::new(0), steal_operations: AtomicU64::new(0), poll_count: AtomicU64::new(0), + mean_poll_time: AtomicU64::new(0), overflow_count: AtomicU64::new(0), busy_duration_total: AtomicU64::new(0), local_schedule_count: AtomicU64::new(0), diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 1100147d5cf..30b17c0e8ed 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -321,7 +321,7 @@ impl Core { } fn submit_metrics(&mut self, handle: &Handle) { - self.metrics.submit(&handle.shared.worker_metrics); + self.metrics.submit(&handle.shared.worker_metrics, 0); } } diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index f01daaa1bff..3b8c5020e49 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -74,7 +74,7 @@ impl Stats { } pub(crate) fn submit(&mut self, to: &WorkerMetrics) { - self.batch.submit(to); + self.batch.submit(to, self.task_poll_time_ewma as u64); } pub(crate) fn about_to_park(&mut self) { diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs index 57657bb0391..228e797714b 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs @@ -93,7 +93,7 @@ impl Stats { } pub(crate) fn submit(&mut self, to: &WorkerMetrics) { - self.batch.submit(to); + self.batch.submit(to, self.task_poll_time_ewma as u64); } pub(crate) fn about_to_park(&mut self) { diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 7b7a957c419..42faab37f9b 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -215,18 +215,25 @@ fn worker_steal_count() { } #[test] -fn worker_poll_count() { +fn worker_poll_count_and_time() { const N: u64 = 5; + async fn task() { + // Sync sleep + std::thread::sleep(std::time::Duration::from_micros(10)); + } + let rt = current_thread(); let metrics = rt.metrics(); rt.block_on(async { for _ in 0..N { - tokio::spawn(async {}).await.unwrap(); + tokio::spawn(task()).await.unwrap(); } }); drop(rt); assert_eq!(N, metrics.worker_poll_count(0)); + // Not currently supported for current-thread runtime + assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0)); // Does not populate the histogram assert!(!metrics.poll_count_histogram_enabled()); @@ -238,7 +245,7 @@ fn worker_poll_count() { let metrics = rt.metrics(); rt.block_on(async { for _ in 0..N { - tokio::spawn(async {}).await.unwrap(); + tokio::spawn(task()).await.unwrap(); } }); drop(rt); @@ -249,6 +256,12 @@ fn worker_poll_count() { assert_eq!(N, n); + let n: Duration = (0..metrics.num_workers()) + .map(|i| metrics.worker_mean_poll_time(i)) + .sum(); + + assert!(n > Duration::default()); + // Does not populate the histogram assert!(!metrics.poll_count_histogram_enabled()); for n in 0..metrics.num_workers() {