diff --git a/.cirrus.yml b/.cirrus.yml index 8aea3efa74b..bdf3af74098 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -4,7 +4,7 @@ freebsd_instance: image_family: freebsd-14-0 env: RUST_STABLE: stable - RUST_NIGHTLY: nightly-2023-10-21 + RUST_NIGHTLY: nightly-2024-05-05 RUSTFLAGS: -D warnings # Test FreeBSD in a full VM on cirrus-ci.com. Test the i686 target too, in the diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1ff7da915bd..1d799a54083 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ env: RUST_BACKTRACE: 1 # Change to specific Rust release to pin rust_stable: stable - rust_nightly: nightly-2023-10-21 + rust_nightly: nightly-2024-05-05 rust_clippy: '1.77' # When updating this, also update: # - README.md @@ -67,7 +67,7 @@ jobs: - x86_64-fortanix-unknown-sgx - check-redox - wasm32-unknown-unknown - - wasm32-wasi + - wasm32-wasip1 - check-external-types - check-fuzzing - check-unstable-mt-counters @@ -908,17 +908,22 @@ jobs: run: wasm-pack test --node -- --features "macros sync" working-directory: tokio - wasm32-wasi: - name: wasm32-wasi + wasm32-wasip1: + name: ${{ matrix.target }} needs: basics runs-on: ubuntu-latest + strategy: + matrix: + target: + - wasm32-wasip1 + - wasm32-wasip1-threads steps: - uses: actions/checkout@v4 - name: Install Rust ${{ env.rust_stable }} uses: dtolnay/rust-toolchain@stable with: toolchain: ${{ env.rust_stable }} - targets: wasm32-wasi + targets: ${{ matrix.target }} # Install dependencies - name: Install cargo-hack, wasmtime, and cargo-wasi @@ -928,28 +933,40 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: WASI test tokio full - run: cargo test -p tokio --target wasm32-wasi --features full + run: cargo test -p tokio --target ${{ matrix.target }} --features full env: - CARGO_TARGET_WASM32_WASI_RUNNER: "wasmtime run --" - RUSTFLAGS: --cfg tokio_unstable -Dwarnings + CARGO_TARGET_WASM32_WASIP1_RUNNER: "wasmtime run --" + CARGO_TARGET_WASM32_WASIP1_THREADS_RUNNER: "wasmtime run -W bulk-memory=y -W threads=y -S threads=y --" + RUSTFLAGS: --cfg tokio_unstable -Dwarnings -C target-feature=+atomics,+bulk-memory -C link-args=--max-memory=67108864 - name: WASI test tokio-util full - run: cargo test -p tokio-util --target wasm32-wasi --features full + run: cargo test -p tokio-util --target ${{ matrix.target }} --features full env: - CARGO_TARGET_WASM32_WASI_RUNNER: "wasmtime run --" - RUSTFLAGS: --cfg tokio_unstable -Dwarnings + CARGO_TARGET_WASM32_WASIP1_RUNNER: "wasmtime run --" + CARGO_TARGET_WASM32_WASIP1_THREADS_RUNNER: "wasmtime run -W bulk-memory=y -W threads=y -S threads=y --" + RUSTFLAGS: --cfg tokio_unstable -Dwarnings -C target-feature=+atomics,+bulk-memory -C link-args=--max-memory=67108864 - name: WASI test tokio-stream - run: cargo test -p tokio-stream --target wasm32-wasi --features time,net,io-util,sync + run: cargo test -p tokio-stream --target ${{ matrix.target }} --features time,net,io-util,sync env: - CARGO_TARGET_WASM32_WASI_RUNNER: "wasmtime run --" - RUSTFLAGS: --cfg tokio_unstable -Dwarnings + CARGO_TARGET_WASM32_WASIP1_RUNNER: "wasmtime run --" + CARGO_TARGET_WASM32_WASIP1_THREADS_RUNNER: "wasmtime run -W bulk-memory=y -W threads=y -S threads=y --" + RUSTFLAGS: --cfg tokio_unstable -Dwarnings -C target-feature=+atomics,+bulk-memory -C link-args=--max-memory=67108864 - name: test tests-integration --features wasi-rt # TODO: this should become: `cargo hack wasi test --each-feature` run: cargo wasi test --test rt_yield --features wasi-rt + if: matrix.target == 'wasm32-wasip1' working-directory: tests-integration + - name: test tests-integration --features wasi-threads-rt + run: cargo test --target ${{ matrix.target }} --features wasi-threads-rt + if: matrix.target == 'wasm32-wasip1-threads' + working-directory: tests-integration + env: + CARGO_TARGET_WASM32_WASIP1_THREADS_RUNNER: "wasmtime run -W bulk-memory=y -W threads=y -S threads=y --" + RUSTFLAGS: --cfg tokio_unstable -Dwarnings -C target-feature=+atomics,+bulk-memory -C link-args=--max-memory=67108864 + check-external-types: name: check-external-types (${{ matrix.os }}) needs: basics @@ -978,6 +995,23 @@ jobs: run: cargo check-external-types --all-features working-directory: tokio + check-unexpected-lints-cfgs: + name: check unexpected lints and cfgs + needs: basics + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Rust ${{ env.rust_nightly }} + uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ env.rust_nightly }} + - name: don't allow warnings + run: sed -i '/#!\[allow(unknown_lints, unexpected_cfgs)\]/d' */src/lib.rs */tests/*.rs + - name: check for unknown lints and cfgs + run: cargo check --all-features --tests + env: + RUSTFLAGS: -Dwarnings --check-cfg=cfg(loom,tokio_unstable,tokio_taskdump,fuzzing,mio_unsupported_force_poll_poll,tokio_internal_mt_counters,fs,tokio_no_parking_lot,tokio_no_tuning_tests) -Funexpected_cfgs -Funknown_lints + check-fuzzing: name: check-fuzzing needs: basics diff --git a/benches/Cargo.toml b/benches/Cargo.toml index c581055cf65..c1d13bac279 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -90,3 +90,8 @@ harness = false name = "time_now" path = "time_now.rs" harness = false + +[[bench]] +name = "time_timeout" +path = "time_timeout.rs" +harness = false diff --git a/benches/time_timeout.rs b/benches/time_timeout.rs new file mode 100644 index 00000000000..c961477562c --- /dev/null +++ b/benches/time_timeout.rs @@ -0,0 +1,109 @@ +use std::time::{Duration, Instant}; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use tokio::{ + runtime::Runtime, + time::{sleep, timeout}, +}; + +// a very quick async task, but might timeout +async fn quick_job() -> usize { + 1 +} + +fn build_run_time(workers: usize) -> Runtime { + if workers == 1 { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + } else { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(workers) + .build() + .unwrap() + } +} + +fn single_thread_scheduler_timeout(c: &mut Criterion) { + do_timeout_test(c, 1, "single_thread_timeout"); +} + +fn multi_thread_scheduler_timeout(c: &mut Criterion) { + do_timeout_test(c, 8, "multi_thread_timeout-8"); +} + +fn do_timeout_test(c: &mut Criterion, workers: usize, name: &str) { + let runtime = build_run_time(workers); + c.bench_function(name, |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(spawn_timeout_job(iters as usize, workers).await); + }); + start.elapsed() + }) + }); +} + +async fn spawn_timeout_job(iters: usize, procs: usize) { + let mut handles = Vec::with_capacity(procs); + for _ in 0..procs { + handles.push(tokio::spawn(async move { + for _ in 0..iters / procs { + let h = timeout(Duration::from_secs(1), quick_job()); + assert_eq!(black_box(h.await.unwrap()), 1); + } + })); + } + for handle in handles { + handle.await.unwrap(); + } +} + +fn single_thread_scheduler_sleep(c: &mut Criterion) { + do_sleep_test(c, 1, "single_thread_sleep"); +} + +fn multi_thread_scheduler_sleep(c: &mut Criterion) { + do_sleep_test(c, 8, "multi_thread_sleep-8"); +} + +fn do_sleep_test(c: &mut Criterion, workers: usize, name: &str) { + let runtime = build_run_time(workers); + + c.bench_function(name, |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(spawn_sleep_job(iters as usize, workers).await); + }); + start.elapsed() + }) + }); +} + +async fn spawn_sleep_job(iters: usize, procs: usize) { + let mut handles = Vec::with_capacity(procs); + for _ in 0..procs { + handles.push(tokio::spawn(async move { + for _ in 0..iters / procs { + let _h = black_box(sleep(Duration::from_secs(1))); + } + })); + } + for handle in handles { + handle.await.unwrap(); + } +} + +criterion_group!( + timeout_benchmark, + single_thread_scheduler_timeout, + multi_thread_scheduler_timeout, + single_thread_scheduler_sleep, + multi_thread_scheduler_sleep +); + +criterion_main!(timeout_benchmark); diff --git a/target-specs/README.md b/target-specs/README.md new file mode 100644 index 00000000000..f5db16b78c5 --- /dev/null +++ b/target-specs/README.md @@ -0,0 +1,9 @@ +This is used for the `no-atomic-u64-test` ci check that verifies that Tokio +works even if the `AtomicU64` type is missing. + +When increasing the nightly compiler version, you may need to regenerate this +target using the following command: +``` +rustc +nightly -Z unstable-options --print target-spec-json --target i686-unknown-linux-gnu | grep -v 'is-builtin' | sed 's/"max-atomic-width": 64/"max-atomic-width": 32/' > target-specs/i686-unknown-linux-gnu.json +``` + diff --git a/target-specs/i686-unknown-linux-gnu.json b/target-specs/i686-unknown-linux-gnu.json index 4eebe7afb57..7a70e7474f3 100644 --- a/target-specs/i686-unknown-linux-gnu.json +++ b/target-specs/i686-unknown-linux-gnu.json @@ -1,29 +1,35 @@ { "arch": "x86", "cpu": "pentium4", + "crt-objects-fallback": "false", "crt-static-respected": true, - "data-layout": "e-m:e-p:32:32-p270:32:32-p271:32:32-p272:64:64-f64:32:64-f80:32-n8:16:32-S128", + "data-layout": "e-m:e-p:32:32-p270:32:32-p271:32:32-p272:64:64-i128:128-f64:32:64-f80:32-n8:16:32-S128", "dynamic-linking": true, "env": "gnu", "has-rpath": true, "has-thread-local": true, + "linker-flavor": "gnu-cc", "llvm-target": "i686-unknown-linux-gnu", "max-atomic-width": 32, + "metadata": { + "description": null, + "host_tools": null, + "std": null, + "tier": null + }, "os": "linux", "position-independent-executables": true, "pre-link-args": { - "gcc": [ + "gnu-cc": [ + "-m32" + ], + "gnu-lld-cc": [ "-m32" ] }, "relro-level": "full", "stack-probes": { - "kind": "inline-or-call", - "min-llvm-version-for-inline": [ - 16, - 0, - 0 - ] + "kind": "inline" }, "supported-sanitizers": [ "address" diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 76b9956b8fd..4852f7d23ae 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -39,6 +39,7 @@ rt-process-signal = ["rt-net", "tokio/process", "tokio/signal"] # This is an explicit feature so we can use `cargo hack` testing single features # instead of all possible permutations. wasi-rt = ["rt", "macros", "sync"] +wasi-threads-rt = ["wasi-rt", "rt-multi-thread"] full = [ "macros", diff --git a/tests-integration/tests/macros_main.rs b/tests-integration/tests/macros_main.rs index e34387e5ec1..31442805141 100644 --- a/tests-integration/tests/macros_main.rs +++ b/tests-integration/tests/macros_main.rs @@ -1,8 +1,4 @@ -#![cfg(all( - feature = "macros", - feature = "rt-multi-thread", - not(target_os = "wasi") -))] +#![cfg(all(feature = "macros", feature = "rt-multi-thread"))] #[tokio::main] async fn basic_main() -> usize { diff --git a/tests-integration/tests/macros_select.rs b/tests-integration/tests/macros_select.rs index a1a242c0f4e..18338445603 100644 --- a/tests-integration/tests/macros_select.rs +++ b/tests-integration/tests/macros_select.rs @@ -4,7 +4,10 @@ use futures::channel::oneshot; use futures::executor::block_on; use std::thread; -#[cfg_attr(target_os = "wasi", ignore = "WASI: std::thread::spawn not supported")] +#[cfg_attr( + not(feature = "rt-multi-thread"), + ignore = "WASI: std::thread::spawn not supported" +)] #[test] fn join_with_select() { block_on(async { diff --git a/tokio-macros/src/entry.rs b/tokio-macros/src/entry.rs index f1c39fa6e70..8858c8a1674 100644 --- a/tokio-macros/src/entry.rs +++ b/tokio-macros/src/entry.rs @@ -418,7 +418,6 @@ fn token_stream_with_error(mut tokens: TokenStream, error: syn::Error) -> TokenS tokens } -#[cfg(not(test))] // Work around for rust-lang/rust#62127 pub(crate) fn main(args: TokenStream, item: TokenStream, rt_multi_thread: bool) -> TokenStream { // If any of the steps for this macro fail, we still want to expand to an item that is as close // to the expected output as possible. This helps out IDEs such that completions and other diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 919c4ac0ba9..c108d8c46a2 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![allow(clippy::needless_doctest_main)] #![warn( missing_debug_implementations, @@ -202,7 +203,6 @@ use proc_macro::TokenStream; /// } /// ``` #[proc_macro_attribute] -#[cfg(not(test))] // Work around for rust-lang/rust#62127 pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { entry::main(args.into(), item.into(), true).into() } @@ -267,7 +267,6 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { /// } /// ``` #[proc_macro_attribute] -#[cfg(not(test))] // Work around for rust-lang/rust#62127 pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream { entry::main(args.into(), item.into(), false).into() } diff --git a/tokio-stream/src/lib.rs b/tokio-stream/src/lib.rs index 6ff1085a552..11ccd8c6aee 100644 --- a/tokio-stream/src/lib.rs +++ b/tokio-stream/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![allow( clippy::cognitive_complexity, clippy::large_enum_variant, diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index 87e63861210..9f60faf7952 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![warn( missing_debug_implementations, missing_docs, diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index b98092c4eb0..729c0352df6 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,3 +1,36 @@ +# 0.7.11 (May 4th, 2024) + +This release updates the MSRV to 1.63. ([#6126]) + +### Added + +- either: implement `Sink` for `Either` ([#6239]) +- time: add `DelayQueue::deadline` ([#6163]) +- time: add `FutureExt::timeout` ([#6276]) + +### Changed + +- codec: assert compatibility between `LengthDelimitedCodec` options ([#6414]) +- codec: make tracing feature optional for codecs ([#6434]) +- io: add `T: ?Sized` to `tokio_util::io::poll_read_buf` ([#6441]) +- sync: remove `'static` bound on `impl Sink for PollSender` ([#6397]) + +### Documented + +- codec: add examples for `FramedRead` and `FramedWrite` ([#6310]) +- codec: document cancel safety of `SinkExt::send` and `StreamExt::next` ([#6417]) + +[#6126]: https://github.com/tokio-rs/tokio/pull/6126 +[#6163]: https://github.com/tokio-rs/tokio/pull/6163 +[#6239]: https://github.com/tokio-rs/tokio/pull/6239 +[#6276]: https://github.com/tokio-rs/tokio/pull/6276 +[#6310]: https://github.com/tokio-rs/tokio/pull/6310 +[#6397]: https://github.com/tokio-rs/tokio/pull/6397 +[#6414]: https://github.com/tokio-rs/tokio/pull/6414 +[#6417]: https://github.com/tokio-rs/tokio/pull/6417 +[#6434]: https://github.com/tokio-rs/tokio/pull/6434 +[#6441]: https://github.com/tokio-rs/tokio/pull/6441 + # 0.7.10 (October 24th, 2023) ### Added diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 47f443aeee7..a33e9c9cff7 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -4,7 +4,7 @@ name = "tokio-util" # - Remove path dependencies # - Update CHANGELOG.md. # - Create "tokio-util-0.7.x" git tag. -version = "0.7.10" +version = "0.7.11" edition = "2021" rust-version = "1.63" authors = ["Tokio Contributors "] @@ -45,7 +45,7 @@ slab = { version = "0.4.4", optional = true } # Backs `DelayQueue` tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } [target.'cfg(tokio_unstable)'.dependencies] -hashbrown = { version = "0.14.0", optional = true } +hashbrown = { version = "0.14.0", default-features = false, optional = true } [dev-dependencies] tokio = { version = "1.0.0", path = "../tokio", features = ["full"] } diff --git a/tokio-util/src/codec/length_delimited.rs b/tokio-util/src/codec/length_delimited.rs index 92d76b2cd28..c37f5863bd4 100644 --- a/tokio-util/src/codec/length_delimited.rs +++ b/tokio-util/src/codec/length_delimited.rs @@ -639,7 +639,6 @@ mod builder { impl LengthFieldType for u64 {} #[cfg(any( - target_pointer_width = "8", target_pointer_width = "16", target_pointer_width = "32", target_pointer_width = "64", diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 1df4de1b459..34f69fd14e3 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![allow(clippy::needless_doctest_main)] #![warn( missing_debug_implementations, diff --git a/tokio-util/src/task/mod.rs b/tokio-util/src/task/mod.rs index e37015a4e3c..1ab3ff13dbe 100644 --- a/tokio-util/src/task/mod.rs +++ b/tokio-util/src/task/mod.rs @@ -2,9 +2,7 @@ #[cfg(tokio_unstable)] mod join_map; -#[cfg(not(target_os = "wasi"))] mod spawn_pinned; -#[cfg(not(target_os = "wasi"))] pub use spawn_pinned::LocalPoolHandle; #[cfg(tokio_unstable)] diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index 1ab5f9ba832..8757f8b5c6e 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(feature = "rt", tokio_unstable))] diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 7dab413ceb6..b35a20dd35b 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -271,7 +271,7 @@ cfg_io_util! { pub(crate) mod seek; pub(crate) mod util; pub use util::{ - copy, copy_bidirectional, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, + copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, }; } diff --git a/tokio/src/io/split.rs b/tokio/src/io/split.rs index 2602929cdd1..453508eda55 100644 --- a/tokio/src/io/split.rs +++ b/tokio/src/io/split.rs @@ -79,8 +79,7 @@ impl ReadHalf { /// /// If this `ReadHalf` and the given `WriteHalf` do not originate from the /// same `split` operation this method will panic. - /// This can be checked ahead of time by comparing the stream ID - /// of the two halves. + /// This can be checked ahead of time by calling [`is_pair_of()`](Self::is_pair_of). #[track_caller] pub fn unsplit(self, wr: WriteHalf) -> T where diff --git a/tokio/src/io/util/copy.rs b/tokio/src/io/util/copy.rs index 56310c86f59..47f8d4ebec6 100644 --- a/tokio/src/io/util/copy.rs +++ b/tokio/src/io/util/copy.rs @@ -16,14 +16,14 @@ pub(super) struct CopyBuffer { } impl CopyBuffer { - pub(super) fn new() -> Self { + pub(super) fn new(buf_size: usize) -> Self { Self { read_done: false, need_flush: false, pos: 0, cap: 0, amt: 0, - buf: vec![0; super::DEFAULT_BUF_SIZE].into_boxed_slice(), + buf: vec![0; buf_size].into_boxed_slice(), } } @@ -269,7 +269,7 @@ cfg_io_util! { Copy { reader, writer, - buf: CopyBuffer::new() + buf: CopyBuffer::new(super::DEFAULT_BUF_SIZE) }.await } } diff --git a/tokio/src/io/util/copy_bidirectional.rs b/tokio/src/io/util/copy_bidirectional.rs index e1a7db127a7..ce90141e5a5 100644 --- a/tokio/src/io/util/copy_bidirectional.rs +++ b/tokio/src/io/util/copy_bidirectional.rs @@ -57,6 +57,9 @@ where /// it will return a tuple of the number of bytes copied from a to b /// and the number of bytes copied from b to a, in that order. /// +/// It uses two 8 KB buffers for transferring bytes between `a` and `b` by default. +/// To set your own buffers sizes use [`copy_bidirectional_with_sizes()`]. +/// /// [`shutdown()`]: crate::io::AsyncWriteExt::shutdown /// /// # Errors @@ -69,13 +72,56 @@ where /// /// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`. #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] -pub async fn copy_bidirectional(a: &mut A, b: &mut B) -> Result<(u64, u64), std::io::Error> +pub async fn copy_bidirectional(a: &mut A, b: &mut B) -> io::Result<(u64, u64)> +where + A: AsyncRead + AsyncWrite + Unpin + ?Sized, + B: AsyncRead + AsyncWrite + Unpin + ?Sized, +{ + copy_bidirectional_impl( + a, + b, + CopyBuffer::new(super::DEFAULT_BUF_SIZE), + CopyBuffer::new(super::DEFAULT_BUF_SIZE), + ) + .await +} + +/// Copies data in both directions between `a` and `b` using buffers of the specified size. +/// +/// This method is the same as the [`copy_bidirectional()`], except that it allows you to set the +/// size of the internal buffers used when copying data. +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] +pub async fn copy_bidirectional_with_sizes( + a: &mut A, + b: &mut B, + a_to_b_buf_size: usize, + b_to_a_buf_size: usize, +) -> io::Result<(u64, u64)> +where + A: AsyncRead + AsyncWrite + Unpin + ?Sized, + B: AsyncRead + AsyncWrite + Unpin + ?Sized, +{ + copy_bidirectional_impl( + a, + b, + CopyBuffer::new(a_to_b_buf_size), + CopyBuffer::new(b_to_a_buf_size), + ) + .await +} + +async fn copy_bidirectional_impl( + a: &mut A, + b: &mut B, + a_to_b_buffer: CopyBuffer, + b_to_a_buffer: CopyBuffer, +) -> io::Result<(u64, u64)> where A: AsyncRead + AsyncWrite + Unpin + ?Sized, B: AsyncRead + AsyncWrite + Unpin + ?Sized, { - let mut a_to_b = TransferState::Running(CopyBuffer::new()); - let mut b_to_a = TransferState::Running(CopyBuffer::new()); + let mut a_to_b = TransferState::Running(a_to_b_buffer); + let mut b_to_a = TransferState::Running(b_to_a_buffer); poll_fn(|cx| { let a_to_b = transfer_one_direction(cx, &mut a_to_b, a, b)?; let b_to_a = transfer_one_direction(cx, &mut b_to_a, b, a)?; diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 47b951f2b83..5010fc01d29 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -28,7 +28,7 @@ cfg_io_util! { pub use copy::copy; mod copy_bidirectional; - pub use copy_bidirectional::copy_bidirectional; + pub use copy_bidirectional::{copy_bidirectional, copy_bidirectional_with_sizes}; mod copy_buf; pub use copy_buf::copy_buf; diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 57b6560bf0d..f15f8763e36 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![allow( clippy::cognitive_complexity, clippy::large_enum_variant, @@ -446,13 +447,9 @@ // least 32 bits, which a lot of components in Tokio currently assumes. // // TODO: improve once we have MSRV access to const eval to make more flexible. -#[cfg(not(any( - target_pointer_width = "32", - target_pointer_width = "64", - target_pointer_width = "128" -)))] +#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))] compile_error! { - "Tokio requires the platform pointer width to be 32, 64, or 128 bits" + "Tokio requires the platform pointer width to be at least 32 bits" } #[cfg(all( @@ -654,7 +651,6 @@ cfg_macros! { cfg_rt! { #[cfg(feature = "rt-multi-thread")] - #[cfg(not(test))] // Work around for rust-lang/rust#62127 #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] #[doc(inline)] pub use tokio_macros::main; @@ -665,7 +661,6 @@ cfg_macros! { pub use tokio_macros::test; cfg_not_rt_multi_thread! { - #[cfg(not(test))] // Work around for rust-lang/rust#62127 #[doc(inline)] pub use tokio_macros::main_rt as main; @@ -676,7 +671,6 @@ cfg_macros! { // Always fail if rt is not enabled. cfg_not_rt! { - #[cfg(not(test))] #[doc(inline)] pub use tokio_macros::main_fail as main; diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index d2f7b42bf60..c67e0e8379f 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -384,7 +384,7 @@ macro_rules! cfg_not_rt { macro_rules! cfg_rt_multi_thread { ($($item:item)*) => { $( - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] $item )* diff --git a/tokio/src/process/unix/orphan.rs b/tokio/src/process/unix/orphan.rs index b6ca7da238b..a89555f5876 100644 --- a/tokio/src/process/unix/orphan.rs +++ b/tokio/src/process/unix/orphan.rs @@ -8,6 +8,7 @@ use std::process::ExitStatus; /// An interface for waiting on a process to exit. pub(crate) trait Wait { /// Get the identifier for this process or diagnostics. + #[allow(dead_code)] fn id(&self) -> u32; /// Try waiting for a process to exit in a non-blocking manner. fn try_wait(&mut self) -> io::Result>; diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index 8dfe5fd10f2..6c9fdf3f8e3 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -23,9 +23,9 @@ impl BlockingSchedule { scheduler::Handle::CurrentThread(handle) => { handle.driver.clock.inhibit_auto_advance(); } - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] scheduler::Handle::MultiThread(_) => {} - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] scheduler::Handle::MultiThreadAlt(_) => {} } } @@ -45,9 +45,9 @@ impl task::Schedule for BlockingSchedule { handle.driver.clock.allow_auto_advance(); handle.driver.unpark(); } - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] scheduler::Handle::MultiThread(_) => {} - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] scheduler::Handle::MultiThreadAlt(_) => {} } } diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 27ae4c80167..3b09c0d4b10 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -199,9 +199,9 @@ pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + #[derive(Clone, Copy)] pub(crate) enum Kind { CurrentThread, - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] MultiThread, - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] MultiThreadAlt, } @@ -224,35 +224,33 @@ impl Builder { Builder::new(Kind::CurrentThread, EVENT_INTERVAL) } - cfg_not_wasi! { - /// Returns a new builder with the multi thread scheduler selected. + /// Returns a new builder with the multi thread scheduler selected. + /// + /// Configuration methods can be chained on the return value. + #[cfg(feature = "rt-multi-thread")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] + pub fn new_multi_thread() -> Builder { + // The number `61` is fairly arbitrary. I believe this value was copied from golang. + Builder::new(Kind::MultiThread, 61) + } + + cfg_unstable! { + /// Returns a new builder with the alternate multi thread scheduler + /// selected. + /// + /// The alternate multi threaded scheduler is an in-progress + /// candidate to replace the existing multi threaded scheduler. It + /// currently does not scale as well to 16+ processors. + /// + /// This runtime flavor is currently **not considered production + /// ready**. /// /// Configuration methods can be chained on the return value. #[cfg(feature = "rt-multi-thread")] #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] - pub fn new_multi_thread() -> Builder { + pub fn new_multi_thread_alt() -> Builder { // The number `61` is fairly arbitrary. I believe this value was copied from golang. - Builder::new(Kind::MultiThread, 61) - } - - cfg_unstable! { - /// Returns a new builder with the alternate multi thread scheduler - /// selected. - /// - /// The alternate multi threaded scheduler is an in-progress - /// candidate to replace the existing multi threaded scheduler. It - /// currently does not scale as well to 16+ processors. - /// - /// This runtime flavor is currently **not considered production - /// ready**. - /// - /// Configuration methods can be chained on the return value. - #[cfg(feature = "rt-multi-thread")] - #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] - pub fn new_multi_thread_alt() -> Builder { - // The number `61` is fairly arbitrary. I believe this value was copied from golang. - Builder::new(Kind::MultiThreadAlt, 61) - } + Builder::new(Kind::MultiThreadAlt, 61) } } @@ -697,9 +695,9 @@ impl Builder { pub fn build(&mut self) -> io::Result { match &self.kind { Kind::CurrentThread => self.build_current_thread_runtime(), - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] Kind::MultiThread => self.build_threaded_runtime(), - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] Kind::MultiThreadAlt => self.build_alt_threaded_runtime(), } } @@ -708,9 +706,9 @@ impl Builder { driver::Cfg { enable_pause_time: match self.kind { Kind::CurrentThread => true, - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] Kind::MultiThread => false, - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] Kind::MultiThreadAlt => false, }, enable_io: self.enable_io, diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 7e7e5636c80..01d210cd36f 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -355,9 +355,9 @@ impl Handle { pub fn runtime_flavor(&self) -> RuntimeFlavor { match self.inner { scheduler::Handle::CurrentThread(_) => RuntimeFlavor::CurrentThread, - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] scheduler::Handle::MultiThread(_) => RuntimeFlavor::MultiThread, - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] scheduler::Handle::MultiThreadAlt(_) => RuntimeFlavor::MultiThreadAlt, } } @@ -385,9 +385,9 @@ impl Handle { pub fn id(&self) -> runtime::Id { let owned_id = match &self.inner { scheduler::Handle::CurrentThread(handle) => handle.owned_id(), - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] scheduler::Handle::MultiThread(handle) => handle.owned_id(), - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] scheduler::Handle::MultiThreadAlt(handle) => handle.owned_id(), }; owned_id.into() diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index 917c3f8ce91..7cf2cebeffc 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -126,11 +126,11 @@ pub(super) enum Scheduler { CurrentThread(CurrentThread), /// Execute tasks across multiple threads. - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] MultiThread(MultiThread), /// Execute tasks across multiple threads. - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] MultiThreadAlt(MultiThreadAlt), } @@ -147,40 +147,38 @@ impl Runtime { } } - cfg_not_wasi! { - /// Creates a new runtime instance with default configuration values. - /// - /// This results in the multi threaded scheduler, I/O driver, and time driver being - /// initialized. - /// - /// Most applications will not need to call this function directly. Instead, - /// they will use the [`#[tokio::main]` attribute][main]. When a more complex - /// configuration is necessary, the [runtime builder] may be used. - /// - /// See [module level][mod] documentation for more details. - /// - /// # Examples - /// - /// Creating a new `Runtime` with default configuration values. - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// // Use the runtime... - /// ``` - /// - /// [mod]: index.html - /// [main]: ../attr.main.html - /// [threaded scheduler]: index.html#threaded-scheduler - /// [runtime builder]: crate::runtime::Builder - #[cfg(feature = "rt-multi-thread")] - #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] - pub fn new() -> std::io::Result { - Builder::new_multi_thread().enable_all().build() - } + /// Creates a new runtime instance with default configuration values. + /// + /// This results in the multi threaded scheduler, I/O driver, and time driver being + /// initialized. + /// + /// Most applications will not need to call this function directly. Instead, + /// they will use the [`#[tokio::main]` attribute][main]. When a more complex + /// configuration is necessary, the [runtime builder] may be used. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// ``` + /// + /// [mod]: index.html + /// [main]: ../attr.main.html + /// [threaded scheduler]: index.html#threaded-scheduler + /// [runtime builder]: crate::runtime::Builder + #[cfg(feature = "rt-multi-thread")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] + pub fn new() -> std::io::Result { + Builder::new_multi_thread().enable_all().build() } /// Returns a handle to the runtime's spawner. @@ -347,9 +345,9 @@ impl Runtime { match &self.scheduler { Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future), - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future), - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future), } } @@ -469,13 +467,13 @@ impl Drop for Runtime { let _guard = context::try_set_current(&self.handle.inner); current_thread.shutdown(&self.handle.inner); } - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] Scheduler::MultiThread(multi_thread) => { // The threaded scheduler drops its tasks on its worker threads, which is // already in the runtime's context. multi_thread.shutdown(&self.handle.inner); } - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] Scheduler::MultiThreadAlt(multi_thread) => { // The threaded scheduler drops its tasks on its worker threads, which is // already in the runtime's context. @@ -491,7 +489,8 @@ impl std::panic::RefUnwindSafe for Runtime {} cfg_metrics! { impl Runtime { - /// TODO + /// Returns a view that lets you get information about how the runtime + /// is performing. pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { self.handle.metrics() } diff --git a/tokio/src/runtime/scheduler/inject/shared.rs b/tokio/src/runtime/scheduler/inject/shared.rs index 2d29486db73..e32c2e4d719 100644 --- a/tokio/src/runtime/scheduler/inject/shared.rs +++ b/tokio/src/runtime/scheduler/inject/shared.rs @@ -38,10 +38,7 @@ impl Shared { } // Kind of annoying to have to include the cfg here - #[cfg(any( - tokio_taskdump, - all(feature = "rt-multi-thread", not(target_os = "wasi")) - ))] + #[cfg(any(tokio_taskdump, feature = "rt-multi-thread"))] pub(crate) fn is_closed(&self, synced: &Synced) -> bool { synced.is_closed } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 42368e5bed8..04fbff39e47 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -32,10 +32,10 @@ pub(crate) enum Handle { #[cfg(feature = "rt")] CurrentThread(Arc), - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] MultiThread(Arc), - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] MultiThreadAlt(Arc), // TODO: This is to avoid triggering "dead code" warnings many other places @@ -49,10 +49,10 @@ pub(crate) enum Handle { pub(super) enum Context { CurrentThread(current_thread::Context), - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] MultiThread(multi_thread::Context), - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] MultiThreadAlt(multi_thread_alt::Context), } @@ -63,10 +63,10 @@ impl Handle { #[cfg(feature = "rt")] Handle::CurrentThread(ref h) => &h.driver, - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] Handle::MultiThread(ref h) => &h.driver, - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] Handle::MultiThreadAlt(ref h) => &h.driver, #[cfg(not(feature = "rt"))] @@ -89,10 +89,10 @@ cfg_rt! { match $self { $ty::CurrentThread($h) => $e, - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] $ty::MultiThread($h) => $e, - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] $ty::MultiThreadAlt($h) => $e, } } @@ -119,10 +119,10 @@ cfg_rt! { match self { Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id), - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id), - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] Handle::MultiThreadAlt(h) => multi_thread_alt::Handle::spawn(h, future, id), } } @@ -131,10 +131,10 @@ cfg_rt! { match *self { Handle::CurrentThread(_) => {}, - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] Handle::MultiThread(ref h) => h.shutdown(), - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] Handle::MultiThreadAlt(ref h) => h.shutdown(), } } @@ -146,7 +146,7 @@ cfg_rt! { pub(crate) fn as_current_thread(&self) -> &Arc { match self { Handle::CurrentThread(handle) => handle, - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] _ => panic!("not a CurrentThread handle"), } } @@ -170,9 +170,9 @@ cfg_rt! { pub(crate) fn num_workers(&self) -> usize { match self { Handle::CurrentThread(_) => 1, - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] Handle::MultiThread(handle) => handle.num_workers(), - #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] Handle::MultiThreadAlt(handle) => handle.num_workers(), } } @@ -216,7 +216,7 @@ cfg_rt! { pub(crate) fn expect_current_thread(&self) -> ¤t_thread::Context { match self { Context::CurrentThread(context) => context, - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] _ => panic!("expected `CurrentThread::Context`") } } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index f07fb8568cd..83e70795f4f 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -450,6 +450,7 @@ impl Launch { } fn run(worker: Arc) { + #[allow(dead_code)] struct AbortOnPanic; impl Drop for AbortOnPanic { diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index cf19eea83bb..8479becd80a 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -249,11 +249,11 @@ where } pub(super) fn dealloc(self) { - // Release the join waker, if there is one. - self.trailer().waker.with_mut(drop); - - // Check causality - self.core().stage.with_mut(drop); + // Observe that we expect to have mutable access to these objects + // because we are going to drop them. This only matters when running + // under loom. + self.trailer().waker.with_mut(|_| ()); + self.core().stage.with_mut(|_| ()); // Safety: The caller of this method just transitioned our ref-count to // zero, so it is our responsibility to release the allocation. diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index ec2e8432216..bb411f42d72 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -195,13 +195,9 @@ pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> { if let Some(scheduler) = scheduler { match scheduler { scheduler::Context::CurrentThread(s) => s.defer.defer(cx.waker()), - #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + #[cfg(feature = "rt-multi-thread")] scheduler::Context::MultiThread(s) => s.defer.defer(cx.waker()), - #[cfg(all( - tokio_unstable, - feature = "rt-multi-thread", - not(target_os = "wasi") - ))] + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] scheduler::Context::MultiThreadAlt(_) => unimplemented!(), } } diff --git a/tokio/src/runtime/tests/loom_current_thread/yield_now.rs b/tokio/src/runtime/tests/loom_current_thread/yield_now.rs index ba506e5a408..3d454209b24 100644 --- a/tokio/src/runtime/tests/loom_current_thread/yield_now.rs +++ b/tokio/src/runtime/tests/loom_current_thread/yield_now.rs @@ -1,5 +1,4 @@ use crate::runtime::park; -use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; #[test] @@ -8,10 +7,9 @@ fn yield_calls_park_before_scheduling_again() { let mut loom = loom::model::Builder::default(); loom.max_permutations = Some(1); loom.check(|| { - let rt = mk_runtime(2); - let (tx, rx) = oneshot::channel::<()>(); + let rt = mk_runtime(); - rt.spawn(async { + let jh = rt.spawn(async { let tid = loom::thread::current().id(); let park_count = park::current_thread_park_count(); @@ -21,17 +19,12 @@ fn yield_calls_park_before_scheduling_again() { let new_park_count = park::current_thread_park_count(); assert_eq!(park_count + 1, new_park_count); } - - tx.send(()); }); - rx.recv(); + rt.block_on(jh).unwrap(); }); } -fn mk_runtime(num_threads: usize) -> Runtime { - runtime::Builder::new_multi_thread() - .worker_threads(num_threads) - .build() - .unwrap() +fn mk_runtime() -> Runtime { + runtime::Builder::new_current_thread().build().unwrap() } diff --git a/tokio/src/runtime/tests/loom_multi_thread/yield_now.rs b/tokio/src/runtime/tests/loom_multi_thread/yield_now.rs index ba506e5a408..f078669ddfa 100644 --- a/tokio/src/runtime/tests/loom_multi_thread/yield_now.rs +++ b/tokio/src/runtime/tests/loom_multi_thread/yield_now.rs @@ -3,6 +3,7 @@ use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; #[test] +#[ignore] fn yield_calls_park_before_scheduling_again() { // Don't need to check all permutations let mut loom = loom::model::Builder::default(); diff --git a/tokio/src/runtime/tests/loom_multi_thread_alt/yield_now.rs b/tokio/src/runtime/tests/loom_multi_thread_alt/yield_now.rs index ba506e5a408..f078669ddfa 100644 --- a/tokio/src/runtime/tests/loom_multi_thread_alt/yield_now.rs +++ b/tokio/src/runtime/tests/loom_multi_thread_alt/yield_now.rs @@ -3,6 +3,7 @@ use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; #[test] +#[ignore] fn yield_calls_park_before_scheduling_again() { // Don't need to check all permutations let mut loom = loom::model::Builder::default(); diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 0998b53011d..a6be0e62a13 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -75,7 +75,7 @@ const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; /// The largest safe integer to use for ticks. /// /// This value should be updated if any other signal values are added above. -pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = u64::MAX - 2; +pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = STATE_MIN_VALUE - 1; /// This structure holds the current shared state of the timer - its scheduled /// time (if registered), or otherwise the result of the timer completing, as @@ -187,18 +187,14 @@ impl StateCell { break Err(cur_state); } - match self.state.compare_exchange( + match self.state.compare_exchange_weak( cur_state, STATE_PENDING_FIRE, Ordering::AcqRel, Ordering::Acquire, ) { - Ok(_) => { - break Ok(()); - } - Err(actual_state) => { - cur_state = actual_state; - } + Ok(_) => break Ok(()), + Err(actual_state) => cur_state = actual_state, } } } @@ -266,12 +262,8 @@ impl StateCell { Ordering::AcqRel, Ordering::Acquire, ) { - Ok(_) => { - return Ok(()); - } - Err(true_prior) => { - prior = true_prior; - } + Ok(_) => return Ok(()), + Err(true_prior) => prior = true_prior, } } } @@ -301,7 +293,7 @@ pub(crate) struct TimerEntry { /// /// This is manipulated only under the inner mutex. TODO: Can we use loom /// cells for this? - inner: StdUnsafeCell, + inner: StdUnsafeCell>, /// Deadline for the timer. This is used to register on the first /// poll, as we can't register prior to being pinned. deadline: Instant, @@ -477,23 +469,32 @@ unsafe impl linked_list::Link for TimerShared { impl TimerEntry { #[track_caller] - pub(crate) fn new(handle: &scheduler::Handle, deadline: Instant) -> Self { + pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self { // Panic if the time driver is not enabled let _ = handle.driver().time(); - let driver = handle.clone(); - Self { - driver, - inner: StdUnsafeCell::new(TimerShared::new()), + driver: handle, + inner: StdUnsafeCell::new(None), deadline, registered: false, _m: std::marker::PhantomPinned, } } + fn is_inner_init(&self) -> bool { + unsafe { &*self.inner.get() }.is_some() + } + + // This lazy initialization is for performance purposes. fn inner(&self) -> &TimerShared { - unsafe { &*self.inner.get() } + let inner = unsafe { &*self.inner.get() }; + if inner.is_none() { + unsafe { + *self.inner.get() = Some(TimerShared::new()); + } + } + return inner.as_ref().unwrap(); } pub(crate) fn deadline(&self) -> Instant { @@ -501,11 +502,15 @@ impl TimerEntry { } pub(crate) fn is_elapsed(&self) -> bool { - !self.inner().state.might_be_registered() && self.registered + self.is_inner_init() && !self.inner().state.might_be_registered() && self.registered } /// Cancels and deregisters the timer. This operation is irreversible. pub(crate) fn cancel(self: Pin<&mut Self>) { + // Avoid calling the `clear_entry` method, because it has not been initialized yet. + if !self.is_inner_init() { + return; + } // We need to perform an acq/rel fence with the driver thread, and the // simplest way to do so is to grab the driver lock. // @@ -532,8 +537,9 @@ impl TimerEntry { } pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) { - unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time; - unsafe { self.as_mut().get_unchecked_mut() }.registered = reregister; + let this = unsafe { self.as_mut().get_unchecked_mut() }; + this.deadline = new_time; + this.registered = reregister; let tick = self.driver().time_source().deadline_to_tick(new_time); @@ -564,9 +570,7 @@ impl TimerEntry { self.as_mut().reset(deadline, true); } - let this = unsafe { self.get_unchecked_mut() }; - - this.inner().state.poll(cx.waker()) + self.inner().state.poll(cx.waker()) } pub(crate) fn driver(&self) -> &super::Handle { diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index a30393a02b2..8cd51c5cb4a 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -23,9 +23,10 @@ use crate::loom::sync::Mutex; use crate::runtime::driver::{self, IoHandle, IoStack}; use crate::time::error::Error; use crate::time::{Clock, Duration}; +use crate::util::WakeList; use std::fmt; -use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; +use std::{num::NonZeroU64, ptr::NonNull}; /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. /// @@ -253,8 +254,7 @@ impl Handle { } pub(self) fn process_at_time(&self, mut now: u64) { - let mut waker_list: [Option; 32] = Default::default(); - let mut waker_idx = 0; + let mut waker_list = WakeList::new(); let mut lock = self.inner.lock(); @@ -273,19 +273,13 @@ impl Handle { // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. if let Some(waker) = unsafe { entry.fire(Ok(())) } { - waker_list[waker_idx] = Some(waker); + waker_list.push(waker); - waker_idx += 1; - - if waker_idx == waker_list.len() { + if !waker_list.can_push() { // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped. drop(lock); - for waker in waker_list.iter_mut() { - waker.take().unwrap().wake(); - } - - waker_idx = 0; + waker_list.wake_all(); lock = self.inner.lock(); } @@ -299,9 +293,7 @@ impl Handle { drop(lock); - for waker in &mut waker_list[0..waker_idx] { - waker.take().unwrap().wake(); - } + waker_list.wake_all(); } /// Removes a registered timer from the driver. diff --git a/tokio/src/runtime/time/source.rs b/tokio/src/runtime/time/source.rs index c709dc5380f..e371c207cdb 100644 --- a/tokio/src/runtime/time/source.rs +++ b/tokio/src/runtime/time/source.rs @@ -21,9 +21,7 @@ impl TimeSource { pub(crate) fn instant_to_tick(&self, t: Instant) -> u64 { // round up - let dur: Duration = t - .checked_duration_since(self.start_time) - .unwrap_or_else(|| Duration::from_secs(0)); + let dur: Duration = t.saturating_duration_since(self.start_time); let ms = dur.as_millis(); ms.try_into().unwrap_or(MAX_SAFE_MILLIS_DURATION) diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index e7ab222ef63..520dc00a462 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -49,7 +49,7 @@ fn single_timer() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_.inner, + handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -83,7 +83,7 @@ fn drop_timer() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_.inner, + handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -117,7 +117,7 @@ fn change_waker() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_.inner, + handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -157,7 +157,7 @@ fn reset_future() { let start = handle.inner.driver().clock().now(); let jh = thread::spawn(move || { - let entry = TimerEntry::new(&handle_.inner, start + Duration::from_secs(1)); + let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1)); pin!(entry); let _ = entry @@ -219,7 +219,7 @@ fn poll_process_levels() { for i in 0..normal_or_miri(1024, 64) { let mut entry = Box::pin(TimerEntry::new( - &handle.inner, + handle.inner.clone(), handle.inner.driver().clock().now() + Duration::from_millis(i), )); @@ -253,7 +253,7 @@ fn poll_process_levels_targeted() { let handle = rt.handle(); let e1 = TimerEntry::new( - &handle.inner, + handle.inner.clone(), handle.inner.driver().clock().now() + Duration::from_millis(193), ); pin!(e1); diff --git a/tokio/src/runtime/time/wheel/level.rs b/tokio/src/runtime/time/wheel/level.rs index a828c0067ef..d31eaf46879 100644 --- a/tokio/src/runtime/time/wheel/level.rs +++ b/tokio/src/runtime/time/wheel/level.rs @@ -1,6 +1,6 @@ use crate::runtime::time::{EntryList, TimerHandle, TimerShared}; -use std::{fmt, ptr::NonNull}; +use std::{array, fmt, ptr::NonNull}; /// Wheel for a single level in the timer. This wheel contains 64 slots. pub(crate) struct Level { @@ -39,89 +39,10 @@ const LEVEL_MULT: usize = 64; impl Level { pub(crate) fn new(level: usize) -> Level { - // A value has to be Copy in order to use syntax like: - // let stack = Stack::default(); - // ... - // slots: [stack; 64], - // - // Alternatively, since Stack is Default one can - // use syntax like: - // let slots: [Stack; 64] = Default::default(); - // - // However, that is only supported for arrays of size - // 32 or fewer. So in our case we have to explicitly - // invoke the constructor for each array element. - let ctor = EntryList::default; - Level { level, occupied: 0, - slot: [ - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ctor(), - ], + slot: array::from_fn(|_| EntryList::default()), } } @@ -130,10 +51,7 @@ impl Level { pub(crate) fn next_expiration(&self, now: u64) -> Option { // Use the `occupied` bit field to get the index of the next slot that // needs to be processed. - let slot = match self.next_occupied_slot(now) { - Some(slot) => slot, - None => return None, - }; + let slot = self.next_occupied_slot(now)?; // From the slot index, calculate the `Instant` at which it needs to be // processed. This value *must* be in the future with respect to `now`. @@ -196,7 +114,7 @@ impl Level { let now_slot = (now / slot_range(self.level)) as usize; let occupied = self.occupied.rotate_right(now_slot as u32); let zeros = occupied.trailing_zeros() as usize; - let slot = (zeros + now_slot) % 64; + let slot = (zeros + now_slot) % LEVEL_MULT; Some(slot) } diff --git a/tokio/src/runtime/time/wheel/mod.rs b/tokio/src/runtime/time/wheel/mod.rs index bf13b7b2415..f2b4228514c 100644 --- a/tokio/src/runtime/time/wheel/mod.rs +++ b/tokio/src/runtime/time/wheel/mod.rs @@ -5,7 +5,7 @@ mod level; pub(crate) use self::level::Expiration; use self::level::Level; -use std::ptr::NonNull; +use std::{array, ptr::NonNull}; use super::EntryList; @@ -35,7 +35,7 @@ pub(crate) struct Wheel { /// * ~ 4 min slots / ~ 4 hr range /// * ~ 4 hr slots / ~ 12 day range /// * ~ 12 day slots / ~ 2 yr range - levels: Vec, + levels: Box<[Level; NUM_LEVELS]>, /// Entries queued for firing pending: EntryList, @@ -52,11 +52,9 @@ pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; impl Wheel { /// Creates a new timing wheel. pub(crate) fn new() -> Wheel { - let levels = (0..NUM_LEVELS).map(Level::new).collect(); - Wheel { elapsed: 0, - levels, + levels: Box::new(array::from_fn(Level::new)), pending: EntryList::new(), } } @@ -130,7 +128,6 @@ impl Wheel { ); let level = self.level_for(when); - self.levels[level].remove_entry(item); } } @@ -180,11 +177,11 @@ impl Wheel { } // Check all levels - for level in 0..NUM_LEVELS { - if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) { + for (level_num, level) in self.levels.iter().enumerate() { + if let Some(expiration) = level.next_expiration(self.elapsed) { // There cannot be any expirations at a higher level that happen // before this one. - debug_assert!(self.no_expirations_before(level + 1, expiration.deadline)); + debug_assert!(self.no_expirations_before(level_num + 1, expiration.deadline)); return Some(expiration); } @@ -203,8 +200,8 @@ impl Wheel { fn no_expirations_before(&self, start_level: usize, before: u64) -> bool { let mut res = true; - for l2 in start_level..NUM_LEVELS { - if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) { + for level in &self.levels[start_level..] { + if let Some(e2) = level.next_expiration(self.elapsed) { if e2.deadline < before { res = false; } @@ -267,7 +264,6 @@ impl Wheel { } /// Obtains the list of entries that need processing for the given expiration. - /// fn take_entries(&mut self, expiration: &Expiration) -> EntryList { self.levels[expiration.level].take_slot(expiration.slot) } @@ -292,7 +288,7 @@ fn level_for(elapsed: u64, when: u64) -> usize { let leading_zeros = masked.leading_zeros() as usize; let significant = 63 - leading_zeros; - significant / 6 + significant / NUM_LEVELS } #[cfg(all(test, not(loom)))] diff --git a/tokio/src/signal/mod.rs b/tokio/src/signal/mod.rs index 59f71db0e46..5778f22ed12 100644 --- a/tokio/src/signal/mod.rs +++ b/tokio/src/signal/mod.rs @@ -45,7 +45,9 @@ use crate::sync::watch::Receiver; use std::task::{Context, Poll}; +#[cfg(feature = "signal")] mod ctrl_c; +#[cfg(feature = "signal")] pub use ctrl_c::ctrl_c; pub(crate) mod registry; diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 52a9cbaac40..c4a196a660f 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -485,10 +485,12 @@ impl Signal { } // Work around for abstracting streams internally +#[cfg(feature = "process")] pub(crate) trait InternalStream { fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll>; } +#[cfg(feature = "process")] impl InternalStream for Signal { fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { self.poll_recv(cx) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index d7eb1d6b77e..def5cbc9f51 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -575,6 +575,8 @@ impl Future for Acquire<'_> { type Output = Result<(), AcquireError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + ready!(crate::trace::trace_leaf(cx)); + #[cfg(all(tokio_unstable, feature = "tracing"))] let _resource_span = self.node.ctx.resource_span.clone().entered(); #[cfg(all(tokio_unstable, feature = "tracing"))] diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 6ac97591fea..a4f98060b19 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -481,7 +481,7 @@ impl Receiver { /// assert!(!rx.is_closed()); /// /// rx.close(); - /// + /// /// assert!(rx.is_closed()); /// } /// ``` @@ -530,6 +530,86 @@ impl Receiver { self.chan.len() } + /// Returns the current capacity of the channel. + /// + /// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving + /// capacity with [`Sender::reserve`]. The capacity goes up when values are received. + /// This is distinct from [`max_capacity`], which always returns buffer capacity initially + /// specified when calling [`channel`]. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel::<()>(5); + /// + /// assert_eq!(rx.capacity(), 5); + /// + /// // Making a reservation drops the capacity by one. + /// let permit = tx.reserve().await.unwrap(); + /// assert_eq!(rx.capacity(), 4); + /// assert_eq!(rx.len(), 0); + /// + /// // Sending and receiving a value increases the capacity by one. + /// permit.send(()); + /// assert_eq!(rx.len(), 1); + /// rx.recv().await.unwrap(); + /// assert_eq!(rx.capacity(), 5); + /// + /// // Directly sending a message drops the capacity by one. + /// tx.send(()).await.unwrap(); + /// assert_eq!(rx.capacity(), 4); + /// assert_eq!(rx.len(), 1); + /// + /// // Receiving the message increases the capacity by one. + /// rx.recv().await.unwrap(); + /// assert_eq!(rx.capacity(), 5); + /// assert_eq!(rx.len(), 0); + /// } + /// ``` + /// [`capacity`]: Receiver::capacity + /// [`max_capacity`]: Receiver::max_capacity + pub fn capacity(&self) -> usize { + self.chan.semaphore().semaphore.available_permits() + } + + /// Returns the maximum buffer capacity of the channel. + /// + /// The maximum capacity is the buffer capacity initially specified when calling + /// [`channel`]. This is distinct from [`capacity`], which returns the *current* + /// available buffer capacity: as messages are sent and received, the value + /// returned by [`capacity`] will go up or down, whereas the value + /// returned by [`max_capacity`] will remain constant. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::channel::<()>(5); + /// + /// // both max capacity and capacity are the same at first + /// assert_eq!(rx.max_capacity(), 5); + /// assert_eq!(rx.capacity(), 5); + /// + /// // Making a reservation doesn't change the max capacity. + /// let permit = tx.reserve().await.unwrap(); + /// assert_eq!(rx.max_capacity(), 5); + /// // but drops the capacity by one + /// assert_eq!(rx.capacity(), 4); + /// } + /// ``` + /// [`capacity`]: Receiver::capacity + /// [`max_capacity`]: Receiver::max_capacity + pub fn max_capacity(&self) -> usize { + self.chan.semaphore().bound + } + /// Polls to receive the next message on this channel. /// /// This method returns: @@ -1059,7 +1139,7 @@ impl Sender { /// /// // The iterator should now be exhausted /// assert!(permit.next().is_none()); - /// + /// /// // The value sent on the permit is received /// assert_eq!(rx.recv().await.unwrap(), 456); /// assert_eq!(rx.recv().await.unwrap(), 457); @@ -1274,7 +1354,7 @@ impl Sender { /// // The value sent on the permit is received /// assert_eq!(rx.recv().await.unwrap(), 456); /// assert_eq!(rx.recv().await.unwrap(), 457); - /// + /// /// // Trying to call try_reserve_many with 0 will return an empty iterator /// let mut permit = tx.try_reserve_many(0).unwrap(); /// assert!(permit.next().is_none()); @@ -1447,7 +1527,7 @@ impl Sender { /// [`channel`]. This is distinct from [`capacity`], which returns the *current* /// available buffer capacity: as messages are sent and received, the /// value returned by [`capacity`] will go up or down, whereas the value - /// returned by `max_capacity` will remain constant. + /// returned by [`max_capacity`] will remain constant. /// /// # Examples /// diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index ae378d7ecb2..d8838242a39 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -465,6 +465,10 @@ impl Rx { } }) } + + pub(super) fn semaphore(&self) -> &S { + &self.inner.semaphore + } } impl Drop for Rx { diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index f45df10a982..32a87c93c55 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -318,10 +318,8 @@ cfg_rt! { pub use crate::runtime::task::{JoinError, JoinHandle}; - cfg_not_wasi! { - mod blocking; - pub use blocking::spawn_blocking; - } + mod blocking; + pub use blocking::spawn_blocking; mod spawn; pub use spawn::spawn; diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index dee28793a32..2b5246acfa4 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -480,7 +480,9 @@ impl Interval { self.missed_tick_behavior .next_timeout(timeout, now, self.period) } else { - timeout + self.period + timeout + .checked_add(self.period) + .unwrap_or_else(Instant::far_future) }; // When we arrive here, the internal delay returned `Poll::Ready`. diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index a1f27b839e9..c0cd7c62856 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -86,8 +86,9 @@ mod clock; pub(crate) use self::clock::Clock; -#[cfg(feature = "test-util")] -pub use clock::{advance, pause, resume}; +cfg_test_util! { + pub use clock::{advance, pause, resume}; +} pub mod error; diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 36f6e83c6b1..9223396fe54 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -254,12 +254,11 @@ impl Sleep { location: Option<&'static Location<'static>>, ) -> Sleep { use crate::runtime::scheduler; - let handle = scheduler::Handle::current(); - let entry = TimerEntry::new(&handle, deadline); - + let entry = TimerEntry::new(handle, deadline); #[cfg(all(tokio_unstable, feature = "tracing"))] let inner = { + let handle = scheduler::Handle::current(); let clock = handle.driver().clock(); let handle = &handle.driver().time(); let time_source = handle.time_source(); diff --git a/tokio/src/util/markers.rs b/tokio/src/util/markers.rs index c16ebdf0bc6..fee1d1ccc78 100644 --- a/tokio/src/util/markers.rs +++ b/tokio/src/util/markers.rs @@ -1,4 +1,5 @@ /// Marker for types that are `Sync` but not `Send` +#[allow(dead_code)] pub(crate) struct SyncNotSend(#[allow(dead_code)] *mut ()); unsafe impl Sync for SyncNotSend {} diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index abdb70406d2..d821ec897cf 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -19,6 +19,8 @@ pub(crate) mod once_cell; // rt and signal use `Notify`, which requires `WakeList`. feature = "rt", feature = "signal", + // time driver uses `WakeList` in `Handle::process_at_time`. + feature = "time", ))] mod wake_list; #[cfg(any( @@ -28,6 +30,7 @@ mod wake_list; feature = "fs", feature = "rt", feature = "signal", + feature = "time", ))] pub(crate) use wake_list::WakeList; diff --git a/tokio/src/util/wake_list.rs b/tokio/src/util/wake_list.rs index c5f432b0bb8..23a559d02be 100644 --- a/tokio/src/util/wake_list.rs +++ b/tokio/src/util/wake_list.rs @@ -37,12 +37,37 @@ impl WakeList { } pub(crate) fn wake_all(&mut self) { - assert!(self.curr <= NUM_WAKERS); - while self.curr > 0 { - self.curr -= 1; - // SAFETY: The first `curr` elements of `WakeList` are initialized, so by decrementing - // `curr`, we can take ownership of the last item. - let waker = unsafe { ptr::read(self.inner[self.curr].as_mut_ptr()) }; + struct DropGuard { + start: *mut Waker, + end: *mut Waker, + } + + impl Drop for DropGuard { + fn drop(&mut self) { + // SAFETY: Both pointers are part of the same object, with `start <= end`. + let len = unsafe { self.end.offset_from(self.start) } as usize; + let slice = ptr::slice_from_raw_parts_mut(self.start, len); + // SAFETY: All elements in `start..len` are initialized, so we can drop them. + unsafe { ptr::drop_in_place(slice) }; + } + } + + debug_assert!(self.curr <= NUM_WAKERS); + + let mut guard = { + let start = self.inner.as_mut_ptr().cast::(); + // SAFETY: The resulting pointer is in bounds or one after the length of the same object. + let end = unsafe { start.add(self.curr) }; + // Transfer ownership of the wakers in `inner` to `DropGuard`. + self.curr = 0; + DropGuard { start, end } + }; + while !ptr::eq(guard.start, guard.end) { + // SAFETY: `start` is always initialized if `start != end`. + let waker = unsafe { ptr::read(guard.start) }; + // SAFETY: The resulting pointer is in bounds or one after the length of the same object. + guard.start = unsafe { guard.start.add(1) }; + // If this panics, then `guard` will clean up the remaining wakers. waker.wake(); } } diff --git a/tokio/tests/_require_full.rs b/tokio/tests/_require_full.rs index d33943a960d..81c25179615 100644 --- a/tokio/tests/_require_full.rs +++ b/tokio/tests/_require_full.rs @@ -1,3 +1,5 @@ +#![allow(unknown_lints, unexpected_cfgs)] + #[cfg(not(any(feature = "full", target_family = "wasm")))] compile_error!("run main Tokio tests with `--features full`"); diff --git a/tokio/tests/dump.rs b/tokio/tests/dump.rs index c946f38436c..68b53aaf291 100644 --- a/tokio/tests/dump.rs +++ b/tokio/tests/dump.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![cfg(all( tokio_unstable, tokio_taskdump, diff --git a/tokio/tests/fs_open_options.rs b/tokio/tests/fs_open_options.rs index 41cfb45460c..84b63a504cf 100644 --- a/tokio/tests/fs_open_options.rs +++ b/tokio/tests/fs_open_options.rs @@ -55,8 +55,13 @@ async fn open_options_create_new() { #[tokio::test] #[cfg(unix)] async fn open_options_mode() { + let mode = format!("{:?}", OpenOptions::new().mode(0o644)); // TESTING HACK: use Debug output to check the stored data - assert!(format!("{:?}", OpenOptions::new().mode(0o644)).contains("mode: 420 ")); + assert!( + mode.contains("mode: 420 ") || mode.contains("mode: 0o000644 "), + "mode is: {}", + mode + ); } #[tokio::test] diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index f65cbdf2267..cbad971ab1f 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![cfg(feature = "macros")] #![allow(clippy::disallowed_names)] diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 47bf2dfdc12..a5204bd83f7 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index a71fc4a735e..75a20057166 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![allow(clippy::needless_range_loop)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] diff --git a/tokio/tests/rt_handle.rs b/tokio/tests/rt_handle.rs index 92fa777e321..9efe9b4bde9 100644 --- a/tokio/tests/rt_handle.rs +++ b/tokio/tests/rt_handle.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 4dfed06fed4..58869c530ae 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", tokio_unstable, not(target_os = "wasi")))] diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 6e769fc831f..26690550f93 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", not(target_os = "wasi")))] diff --git a/tokio/tests/rt_threaded_alt.rs b/tokio/tests/rt_threaded_alt.rs index 8b7143b2f97..9eed1fe78b6 100644 --- a/tokio/tests/rt_threaded_alt.rs +++ b/tokio/tests/rt_threaded_alt.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", not(target_os = "wasi")))] #![cfg(tokio_unstable)] diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index 481cc96f2ef..09414f094ed 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -7,8 +7,10 @@ use tokio::time::Duration; use tokio::runtime::Builder; +#[cfg(panic = "unwind")] struct PanicOnDrop; +#[cfg(panic = "unwind")] impl Drop for PanicOnDrop { fn drop(&mut self) { panic!("Well what did you expect would happen..."); diff --git a/tokio/tests/task_builder.rs b/tokio/tests/task_builder.rs index 78329ff26a4..4d1248500ab 100644 --- a/tokio/tests/task_builder.rs +++ b/tokio/tests/task_builder.rs @@ -1,81 +1,81 @@ -#[cfg(all(tokio_unstable, feature = "tracing"))] -mod tests { - use std::rc::Rc; - use tokio::{ - task::{Builder, LocalSet}, - test, - }; +#![allow(unknown_lints, unexpected_cfgs)] +#![cfg(all(tokio_unstable, feature = "tracing"))] - #[test] - async fn spawn_with_name() { - let result = Builder::new() - .name("name") - .spawn(async { "task executed" }) - .unwrap() - .await; +use std::rc::Rc; +use tokio::{ + task::{Builder, LocalSet}, + test, +}; - assert_eq!(result.unwrap(), "task executed"); - } +#[test] +async fn spawn_with_name() { + let result = Builder::new() + .name("name") + .spawn(async { "task executed" }) + .unwrap() + .await; - #[test] - async fn spawn_blocking_with_name() { - let result = Builder::new() - .name("name") - .spawn_blocking(|| "task executed") - .unwrap() - .await; + assert_eq!(result.unwrap(), "task executed"); +} + +#[test] +async fn spawn_blocking_with_name() { + let result = Builder::new() + .name("name") + .spawn_blocking(|| "task executed") + .unwrap() + .await; - assert_eq!(result.unwrap(), "task executed"); - } + assert_eq!(result.unwrap(), "task executed"); +} - #[test] - async fn spawn_local_with_name() { - let unsend_data = Rc::new("task executed"); - let result = LocalSet::new() - .run_until(async move { - Builder::new() - .name("name") - .spawn_local(async move { unsend_data }) - .unwrap() - .await - }) - .await; +#[test] +async fn spawn_local_with_name() { + let unsend_data = Rc::new("task executed"); + let result = LocalSet::new() + .run_until(async move { + Builder::new() + .name("name") + .spawn_local(async move { unsend_data }) + .unwrap() + .await + }) + .await; - assert_eq!(*result.unwrap(), "task executed"); - } + assert_eq!(*result.unwrap(), "task executed"); +} - #[test] - async fn spawn_without_name() { - let result = Builder::new() - .spawn(async { "task executed" }) - .unwrap() - .await; +#[test] +async fn spawn_without_name() { + let result = Builder::new() + .spawn(async { "task executed" }) + .unwrap() + .await; - assert_eq!(result.unwrap(), "task executed"); - } + assert_eq!(result.unwrap(), "task executed"); +} - #[test] - async fn spawn_blocking_without_name() { - let result = Builder::new() - .spawn_blocking(|| "task executed") - .unwrap() - .await; +#[test] +async fn spawn_blocking_without_name() { + let result = Builder::new() + .spawn_blocking(|| "task executed") + .unwrap() + .await; - assert_eq!(result.unwrap(), "task executed"); - } + assert_eq!(result.unwrap(), "task executed"); +} - #[test] - async fn spawn_local_without_name() { - let unsend_data = Rc::new("task executed"); - let result = LocalSet::new() - .run_until(async move { - Builder::new() - .spawn_local(async move { unsend_data }) - .unwrap() - .await - }) - .await; +#[test] +async fn spawn_local_without_name() { + let unsend_data = Rc::new("task executed"); + let result = LocalSet::new() + .run_until(async move { + Builder::new() + .spawn_local(async move { unsend_data }) + .unwrap() + .await + }) + .await; - assert_eq!(*result.unwrap(), "task executed"); - } + assert_eq!(*result.unwrap(), "task executed"); } diff --git a/tokio/tests/task_id.rs b/tokio/tests/task_id.rs index 95e48f4901d..574a050b0c1 100644 --- a/tokio/tests/task_id.rs +++ b/tokio/tests/task_id.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", tokio_unstable))] diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index 8a42be17b49..e87135337ba 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs index d965eb341eb..ac46291a36c 100644 --- a/tokio/tests/task_local_set.rs +++ b/tokio/tests/task_local_set.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] diff --git a/tokio/tests/task_yield_now.rs b/tokio/tests/task_yield_now.rs index b16bca52819..3cb8cb16e70 100644 --- a/tokio/tests/task_yield_now.rs +++ b/tokio/tests/task_yield_now.rs @@ -1,3 +1,4 @@ +#![allow(unknown_lints, unexpected_cfgs)] #![cfg(all(feature = "full", tokio_unstable))] use tokio::task; diff --git a/tokio/tests/time_interval.rs b/tokio/tests/time_interval.rs index 4f3e95b0d2a..7472a37123c 100644 --- a/tokio/tests/time_interval.rs +++ b/tokio/tests/time_interval.rs @@ -6,7 +6,7 @@ use std::task::{Context, Poll}; use futures::{Stream, StreamExt}; use tokio::time::{self, Duration, Instant, Interval, MissedTickBehavior}; -use tokio_test::{assert_pending, assert_ready_eq, task}; +use tokio_test::{assert_pending, assert_ready, assert_ready_eq, task}; // Takes the `Interval` task, `start` variable, and optional time deltas // For each time delta, it polls the `Interval` and asserts that the result is @@ -469,3 +469,9 @@ async fn stream_with_interval_poll_tick_no_waking() { // task when returning `Poll::Ready`. assert_eq!(items, vec![]); } + +#[tokio::test(start_paused = true)] +async fn interval_doesnt_panic_max_duration_when_polling() { + let mut timer = task::spawn(time::interval(Duration::MAX)); + assert_ready!(timer.enter(|cx, mut timer| timer.poll_tick(cx))); +}