diff --git a/examples/Cargo.toml b/examples/Cargo.toml index ceb2a36a18e..e17871f1e77 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" # If you copy one of the examples into a new project, you should be using # [dependencies] instead, and delete the **path**. [dev-dependencies] -tokio = { version = "1.0.0", path = "../tokio", features = ["full", "tracing"] } +tokio = { version = "1.0.0", path = "../tokio", features = ["full", "taskdump", "tracing"] } tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] } tokio-stream = { version = "0.1", path = "../tokio-stream" } @@ -90,3 +90,7 @@ path = "named-pipe-ready.rs" [[example]] name = "named-pipe-multi-client" path = "named-pipe-multi-client.rs" + +[[example]] +name = "taskdump" +path = "taskdump.rs" diff --git a/examples/taskdump.rs b/examples/taskdump.rs new file mode 100644 index 00000000000..0657fdb9283 --- /dev/null +++ b/examples/taskdump.rs @@ -0,0 +1,59 @@ +//! A service that emits task dumps to stdout upon receipt of a SIGUSR1 signal. + +#![warn(rust_2018_idioms)] + +use tokio::{ + time::{sleep, Duration}, + runtime::Handle, + signal::unix::{signal, SignalKind} +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let _ = tokio::spawn(async { + // Listen for SIGUSR1 + let mut stream = signal(SignalKind::user_defined1()).unwrap(); + loop { + // Wait for receipt of SIGUSR1 + stream.recv().await; + // Write a JSON taskdump to stdout. + println!("{}", Handle::current()); + } + }); + + let sleep = sleep(Duration::from_secs(20)); + + tokio::select! { + _ = tokio::spawn(busy::work()) => {} + _ = tokio::spawn(busy::work()) => {} + _ = sleep => {/* exit the application */} + }; + + Ok(()) +} + +mod busy { + use futures::future::{BoxFuture, FutureExt}; + + #[inline(never)] + pub async fn work() { + loop { + for i in 0..u8::MAX { + recurse(i).await; + } + } + } + + #[inline(never)] + pub fn recurse(depth: u8) -> BoxFuture<'static, ()> { + async move { + tokio::task::yield_now().await; + if depth == 0 { + return; + } else { + recurse(depth - 1).await; + } + } + .boxed() + } +} diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 0e96ccb7cd7..67e2dfc819c 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -85,6 +85,10 @@ signal = [ "windows-sys/Win32_System_Console", ] sync = [] +taskdump = [ + "serde", + "serde_json", +] test-util = ["rt", "sync", "time"] time = [] @@ -115,6 +119,12 @@ socket2 = { version = "0.4.4", optional = true, features = [ "all" ] } # Requires `--cfg tokio_unstable` to enable. [target.'cfg(tokio_unstable)'.dependencies] tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } # Not in full +serde = { version = "1.0", optional = true } +serde_json = { version = "1.0", optional = true } + +[target.'cfg(tokio_unstable)'.dev-dependencies] +jsonschema = { version = "0.16.1" } +schemars = { version = "0.8.11" } [target.'cfg(unix)'.dependencies] libc = { version = "0.2.42", optional = true } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index aa94ff020da..0a2ac2519fc 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -461,6 +461,12 @@ compile_error!("Tokio's build script has incorrectly detected wasm."); ))] compile_error!("Only features sync,macros,io-util,rt,time are supported on wasm."); +#[cfg(all( + not(tokio_unstable), + feature = "taskdump" +))] +compile_error!("Feature taskdump requires tokio_unstable."); + // Includes re-exports used by macros. // // This module is not intended to be part of the public API. In general, any diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 1c66d24147a..2af605e13a5 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -361,6 +361,25 @@ macro_rules! cfg_not_rt_multi_thread { } } +macro_rules! cfg_taskdump { + ($($item:item)*) => { + $( + #[cfg(all(tokio_unstable, feature = "taskdump"))] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable), feature = "taskdump"))] + $item + )* + } +} + +macro_rules! cfg_not_taskdump { + ($($item:item)*) => { + $( + #[cfg(not(all(tokio_unstable, feature = "taskdump")))] + $item + )* + } +} + macro_rules! cfg_test_util { ($($item:item)*) => { $( diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index c5dc65f6e81..41c9231f2aa 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -321,6 +321,22 @@ cfg_metrics! { } } +cfg_taskdump! { + /// # Task Dumps + /// + /// Display-formatting a `Handle` produces a JSON dump of the runtime's internal state. + /// + /// This feature is in very early development, and its API and output is subject to change. + impl fmt::Display for Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use serde_json::to_string_pretty; + let serializable = super::taskdump::Runtime::from(&self); + let json = to_string_pretty(&serializable).map_err(|_| fmt::Error)?; + f.write_str(&json) + } + } +} + /// Error returned by `try_current` when no Runtime has been started #[derive(Debug)] pub struct TryCurrentError { diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 8cd99917546..5749c45642d 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -261,3 +261,7 @@ cfg_rt! { /// After thread starts / before thread stops type Callback = std::sync::Arc; } + +cfg_taskdump! { + mod taskdump; +} diff --git a/tokio/src/runtime/taskdump.rs b/tokio/src/runtime/taskdump.rs new file mode 100644 index 00000000000..7040d9b86a7 --- /dev/null +++ b/tokio/src/runtime/taskdump.rs @@ -0,0 +1,121 @@ +//! Serializable structures for task dumps. + +use super::scheduler::{self, current_thread}; +use crate::loom::sync::Arc; +use serde::Serialize; + +#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] +use scheduler::multi_thread; + +#[cfg_attr(test, derive(schemars::JsonSchema))] +#[derive(Serialize)] +#[serde(tag = "flavor")] +/// A taskdump of a Tokio runtime. +pub(super) enum Runtime { + /// A taskdump of a current-thread runtime. + CurrentThread(CurrentThread), + /// A taskdump of a multi-thread runtime. + MultiThread(MultiThread), +} + +impl Runtime { + pub(super) fn from(handle: &super::Handle) -> Self { + let metrics = handle.metrics(); + match &handle.inner { + scheduler::Handle::CurrentThread(h) => { + Runtime::CurrentThread(CurrentThread::from(h.clone(), metrics)) + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(h) => { + Runtime::MultiThread(MultiThread::from(h.clone(), metrics)) + } + } + } +} + +/// A taskdump of a current-thread runtime. +#[cfg_attr(test, derive(schemars::JsonSchema))] +#[derive(Serialize)] +pub(super) struct CurrentThread { + /// Runtime metrics. + metrics: CurrentThreadMetrics, +} + +impl CurrentThread { + fn from(_handle: Arc, metrics: super::RuntimeMetrics) -> Self { + Self { + metrics: CurrentThreadMetrics { + remote_schedule_count: metrics.remote_schedule_count(), + #[cfg(feature = "net")] + io_driver: Some(IODriverMetrics { + fd_registered_count: metrics.io_driver_fd_registered_count(), + fd_deregistered_count: metrics.io_driver_fd_deregistered_count(), + }), + #[cfg(not(feature = "net"))] + io_driver: None, + }, + } + } +} + +#[cfg_attr(test, derive(schemars::JsonSchema))] +#[derive(Serialize)] +pub(super) struct MultiThread { + /// Runtime metrics. + metrics: MultiThreadMetrics, +} + +impl MultiThread { + pub(super) fn from(_handle: Arc, metrics: super::RuntimeMetrics) -> Self { + Self { + metrics: MultiThreadMetrics { + num_workers: metrics.num_workers(), + num_blocking_threads: metrics.num_blocking_threads(), + num_idle_blocking_threads: metrics.num_idle_blocking_threads(), + remote_schedule_count: metrics.remote_schedule_count(), + #[cfg(feature = "net")] + io_driver: Some(IODriverMetrics { + fd_registered_count: metrics.io_driver_fd_registered_count(), + fd_deregistered_count: metrics.io_driver_fd_deregistered_count(), + }), + #[cfg(not(feature = "net"))] + io_driver: None, + }, + } + } +} + +/// Metrics for a current-thread runtime. +#[cfg_attr(test, derive(schemars::JsonSchema))] +#[derive(Serialize)] +struct CurrentThreadMetrics { + /// The total number of tasks scheduled from outside of the runtime. + remote_schedule_count: u64, + /// IO driver metrics. + io_driver: Option, +} + +/// Metrics for a multi-thread runtime. +#[cfg_attr(test, derive(schemars::JsonSchema))] +#[derive(serde::Serialize)] +struct MultiThreadMetrics { + /// The number of worker threads used by the runtime. + num_workers: usize, + /// The number of additional threads spawned by the runtime. + num_blocking_threads: usize, + /// The number of idle threads, which have spawned for spawn_blocking calls. + num_idle_blocking_threads: usize, + /// The total number of tasks scheduled from outside of the runtime. + remote_schedule_count: u64, + /// IO driver metrics. + io_driver: Option, +} + +#[cfg_attr(test, derive(schemars::JsonSchema))] +#[derive(Serialize)] +struct IODriverMetrics { + /// The number of file descriptors that have been registered with the runtime’s I/O driver. + fd_registered_count: u64, + /// The number of file descriptors that have been deregistered by the runtime’s I/O driver + fd_deregistered_count: u64, +} diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 4e7c2453f25..115416b553d 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -70,4 +70,8 @@ cfg_not_loom! { #[cfg(miri)] mod task; + + cfg_taskdump! { + mod taskdump; + } } diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 68d2e892ea3..1b36b6e6c95 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -141,7 +141,7 @@ fn stress1() { } cfg_metrics! { - assert_metrics!(metrics, steal_count == n as _); + assert_metrics!(metrics, steal_count == n as u64); } n diff --git a/tokio/src/runtime/tests/taskdump.rs b/tokio/src/runtime/tests/taskdump.rs new file mode 100644 index 00000000000..07003194089 --- /dev/null +++ b/tokio/src/runtime/tests/taskdump.rs @@ -0,0 +1,110 @@ +use crate::runtime::{self, taskdump}; +use std::{ + fs::{File, OpenOptions}, + io::Write, + path::PathBuf, +}; + +/// Tests that generated taskdumps match their schema. +/// +/// Failures of these tasks suggest a buggy implementation of `JsonSchema`. +mod model_test { + use super::*; + + /// Tests that a taskdump generated by the current-thread runtime + /// satisfies the JSON schema associated with the taskdump. + #[test] + fn current_thread() { + let runtime = runtime::Builder::new_current_thread().build().unwrap(); + test_runtime(runtime.handle()); + } + + cfg_taskdump! { + /// Tests that a taskdump generated by the multi-thread runtime + /// satisfies the JSON schema associated with the taskdump. + #[test] + fn multi_thread() { + let runtime = runtime::Builder::new_multi_thread() + .build() + .unwrap(); + test_runtime(runtime.handle()); + } + } + + fn test_runtime(handle: &crate::runtime::Handle) { + let taskdump = taskdump::Runtime::from(handle); + let taskdump = serde_json::to_value(taskdump).unwrap(); + + let schema = { + let value = schema_from_file(); + jsonschema::JSONSchema::compile(&value).unwrap() + }; + + let result = schema.validate(&taskdump); + + if let Err(errors) = result { + let errors: Vec<_> = errors.map(|e| format!("{}", e)).collect(); + let errors = errors.join("\n"); + panic!("Validation Errors:\n{}", errors); + } + } +} + +/// Checks that `taskdump_schema.json` is up-to-date. +/// +/// See [`UI_FAILURE_MESSAGE`] for guidance on how to regenerate this file. +#[test] +fn schema() { + // the schema generated from code. + let actual = { + let schema = schemars::schema_for!(crate::runtime::taskdump::Runtime); + serde_json::to_value(&schema).unwrap() + }; + + if should_bless() { + writeln!(schema_file(), "{:#}", actual).unwrap(); + return; + } + + // the schema from disk. + let expected = schema_from_file(); + + assert_eq!(expected, actual, "{}", UI_FAILURE_MESSAGE); +} + +/// Produces `true` if `--bless` is provided as a CLI argument during testing, +/// otherwise false. +fn should_bless() -> bool { + std::env::args().find(|arg| arg == "--bless").is_some() +} + +const UI_FAILURE_MESSAGE: &str = r"The schema on disk (left) did not match the \ + schema generated from code (right). To overwrite the schema on disk, run: +$ RUSTFLAGS='--cfg tokio_unstable' cargo test --features taskdump,full \ + --lib -- runtime::tests::taskdump::schema -- --bless +"; + +/// A deserialization of the contents of `taskdump_schema.json` at compile time. +fn schema_from_file() -> serde_json::Value { + static SCHEMA_TEXT: &str = include_str!("taskdump_schema.json"); + serde_json::from_str(SCHEMA_TEXT).unwrap() +} + +/// A handle to the file `taskdump_schema.json`, whose contents are writable. +fn schema_file() -> File { + let schema_path = { + let mut buf = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + assert!(buf.pop()); + buf.push(file!()); + assert!(buf.pop()); + buf.push("taskdump_schema.json"); + buf + }; + + OpenOptions::new() + .read(false) + .write(true) + .create(false) + .open(schema_path) + .unwrap() +} diff --git a/tokio/src/runtime/tests/taskdump_schema.json b/tokio/src/runtime/tests/taskdump_schema.json new file mode 100644 index 00000000000..4c9cedd4a7b --- /dev/null +++ b/tokio/src/runtime/tests/taskdump_schema.json @@ -0,0 +1,151 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "CurrentThreadMetrics": { + "description": "Metrics for a current-thread runtime.", + "properties": { + "io_driver": { + "anyOf": [ + { + "$ref": "#/definitions/IODriverMetrics" + }, + { + "type": "null" + } + ], + "description": "IO driver metrics." + }, + "remote_schedule_count": { + "description": "The total number of tasks scheduled from outside of the runtime.", + "format": "uint64", + "minimum": 0.0, + "type": "integer" + } + }, + "required": [ + "remote_schedule_count" + ], + "type": "object" + }, + "IODriverMetrics": { + "properties": { + "fd_deregistered_count": { + "description": "The number of file descriptors that have been deregistered by the runtime’s I/O driver", + "format": "uint64", + "minimum": 0.0, + "type": "integer" + }, + "fd_registered_count": { + "description": "The number of file descriptors that have been registered with the runtime’s I/O driver.", + "format": "uint64", + "minimum": 0.0, + "type": "integer" + } + }, + "required": [ + "fd_deregistered_count", + "fd_registered_count" + ], + "type": "object" + }, + "MultiThreadMetrics": { + "description": "Metrics for a multi-thread runtime.", + "properties": { + "io_driver": { + "anyOf": [ + { + "$ref": "#/definitions/IODriverMetrics" + }, + { + "type": "null" + } + ], + "description": "IO driver metrics." + }, + "num_blocking_threads": { + "description": "The number of additional threads spawned by the runtime.", + "format": "uint", + "minimum": 0.0, + "type": "integer" + }, + "num_idle_blocking_threads": { + "description": "The number of idle threads, which have spawned for spawn_blocking calls.", + "format": "uint", + "minimum": 0.0, + "type": "integer" + }, + "num_workers": { + "description": "The number of worker threads used by the runtime.", + "format": "uint", + "minimum": 0.0, + "type": "integer" + }, + "remote_schedule_count": { + "description": "The total number of tasks scheduled from outside of the runtime.", + "format": "uint64", + "minimum": 0.0, + "type": "integer" + } + }, + "required": [ + "num_blocking_threads", + "num_idle_blocking_threads", + "num_workers", + "remote_schedule_count" + ], + "type": "object" + } + }, + "description": "A taskdump of a Tokio runtime.", + "oneOf": [ + { + "description": "A taskdump of a current-thread runtime.", + "properties": { + "flavor": { + "enum": [ + "CurrentThread" + ], + "type": "string" + }, + "metrics": { + "allOf": [ + { + "$ref": "#/definitions/CurrentThreadMetrics" + } + ], + "description": "Runtime metrics." + } + }, + "required": [ + "flavor", + "metrics" + ], + "type": "object" + }, + { + "description": "A taskdump of a multi-thread runtime.", + "properties": { + "flavor": { + "enum": [ + "MultiThread" + ], + "type": "string" + }, + "metrics": { + "allOf": [ + { + "$ref": "#/definitions/MultiThreadMetrics" + } + ], + "description": "Runtime metrics." + } + }, + "required": [ + "flavor", + "metrics" + ], + "type": "object" + } + ], + "title": "Runtime" +} diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index e5345fdfccc..cba311efc2c 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -521,6 +521,6 @@ mod tests { #[test] fn into_c_int() { let value: std::os::raw::c_int = SignalKind::interrupt().into(); - assert_eq!(value, libc::SIGINT as _); + assert_eq!(value, libc::SIGINT as i32); } } diff --git a/tokio/tests/io_buf_reader.rs b/tokio/tests/io_buf_reader.rs index 0d3f6bafc20..032bdbd7664 100644 --- a/tokio/tests/io_buf_reader.rs +++ b/tokio/tests/io_buf_reader.rs @@ -87,13 +87,13 @@ async fn test_buffered_reader() { let nread = reader.read(&mut buf).await.unwrap(); assert_eq!(nread, 3); assert_eq!(buf, [5, 6, 7]); - assert_eq!(reader.buffer(), []); + assert_eq!(reader.buffer(), [0; 0]); let mut buf = [0, 0]; let nread = reader.read(&mut buf).await.unwrap(); assert_eq!(nread, 2); assert_eq!(buf, [0, 1]); - assert_eq!(reader.buffer(), []); + assert_eq!(reader.buffer(), [0; 0]); let mut buf = [0]; let nread = reader.read(&mut buf).await.unwrap(); @@ -105,12 +105,12 @@ async fn test_buffered_reader() { let nread = reader.read(&mut buf).await.unwrap(); assert_eq!(nread, 1); assert_eq!(buf, [3, 0, 0]); - assert_eq!(reader.buffer(), []); + assert_eq!(reader.buffer(), [0; 0]); let nread = reader.read(&mut buf).await.unwrap(); assert_eq!(nread, 1); assert_eq!(buf, [4, 0, 0]); - assert_eq!(reader.buffer(), []); + assert_eq!(reader.buffer(), [0; 0]); assert_eq!(reader.read(&mut buf).await.unwrap(), 0); } @@ -231,13 +231,13 @@ async fn maybe_pending() { let nread = reader.read(&mut buf).await.unwrap(); assert_eq!(nread, 3); assert_eq!(buf, [5, 6, 7]); - assert_eq!(reader.buffer(), []); + assert_eq!(reader.buffer(), [0; 0]); let mut buf = [0, 0]; let nread = reader.read(&mut buf).await.unwrap(); assert_eq!(nread, 2); assert_eq!(buf, [0, 1]); - assert_eq!(reader.buffer(), []); + assert_eq!(reader.buffer(), [0; 0]); let mut buf = [0]; let nread = reader.read(&mut buf).await.unwrap(); @@ -249,12 +249,12 @@ async fn maybe_pending() { let nread = reader.read(&mut buf).await.unwrap(); assert_eq!(nread, 1); assert_eq!(buf, [3, 0, 0]); - assert_eq!(reader.buffer(), []); + assert_eq!(reader.buffer(), [0; 0]); let nread = reader.read(&mut buf).await.unwrap(); assert_eq!(nread, 1); assert_eq!(buf, [4, 0, 0]); - assert_eq!(reader.buffer(), []); + assert_eq!(reader.buffer(), [0; 0]); assert_eq!(reader.read(&mut buf).await.unwrap(), 0); } @@ -274,7 +274,7 @@ async fn maybe_pending_buf_read() { assert_eq!(v, [0]); v.clear(); reader.read_until(9, &mut v).await.unwrap(); - assert_eq!(v, []); + assert_eq!(v, [0; 0]); } // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 diff --git a/tokio/tests/io_buf_writer.rs b/tokio/tests/io_buf_writer.rs index d3acf62c784..92c5d67de75 100644 --- a/tokio/tests/io_buf_writer.rs +++ b/tokio/tests/io_buf_writer.rs @@ -71,7 +71,7 @@ async fn buf_writer() { let mut writer = BufWriter::with_capacity(2, Vec::new()); assert_eq!(writer.write(&[0, 1]).await.unwrap(), 2); - assert_eq!(writer.buffer(), []); + assert_eq!(writer.buffer(), [0; 0]); assert_eq!(*writer.get_ref(), [0, 1]); assert_eq!(writer.write(&[2]).await.unwrap(), 1); @@ -83,7 +83,7 @@ async fn buf_writer() { assert_eq!(*writer.get_ref(), [0, 1]); writer.flush().await.unwrap(); - assert_eq!(writer.buffer(), []); + assert_eq!(writer.buffer(), [0; 0]); assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); assert_eq!(writer.write(&[4]).await.unwrap(), 1); @@ -96,15 +96,15 @@ async fn buf_writer() { assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]); assert_eq!(writer.write(&[7, 8]).await.unwrap(), 2); - assert_eq!(writer.buffer(), []); + assert_eq!(writer.buffer(), [0; 0]); assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]); assert_eq!(writer.write(&[9, 10, 11]).await.unwrap(), 3); - assert_eq!(writer.buffer(), []); + assert_eq!(writer.buffer(), [0; 0]); assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); writer.flush().await.unwrap(); - assert_eq!(writer.buffer(), []); + assert_eq!(writer.buffer(), [0; 0]); assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); } @@ -112,7 +112,7 @@ async fn buf_writer() { async fn buf_writer_inner_flushes() { let mut w = BufWriter::with_capacity(3, Vec::new()); assert_eq!(w.write(&[0, 1]).await.unwrap(), 2); - assert_eq!(*w.get_ref(), []); + assert_eq!(*w.get_ref(), [0; 0]); w.flush().await.unwrap(); let w = w.into_inner(); assert_eq!(w, [0, 1]); @@ -136,7 +136,7 @@ async fn maybe_pending_buf_writer() { let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new())); assert_eq!(writer.write(&[0, 1]).await.unwrap(), 2); - assert_eq!(writer.buffer(), []); + assert_eq!(writer.buffer(), [0; 0]); assert_eq!(&writer.get_ref().inner, &[0, 1]); assert_eq!(writer.write(&[2]).await.unwrap(), 1); @@ -148,7 +148,7 @@ async fn maybe_pending_buf_writer() { assert_eq!(&writer.get_ref().inner, &[0, 1]); writer.flush().await.unwrap(); - assert_eq!(writer.buffer(), []); + assert_eq!(writer.buffer(), [0; 0]); assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); assert_eq!(writer.write(&[4]).await.unwrap(), 1); @@ -161,18 +161,18 @@ async fn maybe_pending_buf_writer() { assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]); assert_eq!(writer.write(&[7, 8]).await.unwrap(), 2); - assert_eq!(writer.buffer(), []); + assert_eq!(writer.buffer(), [0; 0]); assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]); assert_eq!(writer.write(&[9, 10, 11]).await.unwrap(), 3); - assert_eq!(writer.buffer(), []); + assert_eq!(writer.buffer(), [0; 0]); assert_eq!( writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] ); writer.flush().await.unwrap(); - assert_eq!(writer.buffer(), []); + assert_eq!(writer.buffer(), [0; 0]); assert_eq!( &writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] @@ -183,7 +183,7 @@ async fn maybe_pending_buf_writer() { async fn maybe_pending_buf_writer_inner_flushes() { let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new())); assert_eq!(w.write(&[0, 1]).await.unwrap(), 2); - assert_eq!(&w.get_ref().inner, &[]); + assert_eq!(&w.get_ref().inner, &[0; 0]); w.flush().await.unwrap(); let w = w.into_inner().inner; assert_eq!(w, [0, 1]); diff --git a/tokio/tests/io_read_until.rs b/tokio/tests/io_read_until.rs index 61800a0d9c1..ce30a386cb6 100644 --- a/tokio/tests/io_read_until.rs +++ b/tokio/tests/io_read_until.rs @@ -20,7 +20,7 @@ async fn read_until() { buf.clear(); let n = assert_ok!(rd.read_until(b' ', &mut buf).await); assert_eq!(n, 0); - assert_eq!(buf, []); + assert_eq!(buf, [0; 0]); } #[tokio::test] diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index fdb2fb5f551..e807546d0ef 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -216,7 +216,7 @@ fn worker_poll_count() { }); drop(rt); // Account for the `block_on` task - let n = (0..metrics.num_workers()) + let n: u64 = (0..metrics.num_workers()) .map(|i| metrics.worker_poll_count(i)) .sum();