diff --git a/README.md b/README.md index 4a6671ad814..1fdbdc7fc2f 100644 --- a/README.md +++ b/README.md @@ -218,16 +218,17 @@ releases are: * `1.20.x` - LTS release until September 2023. (MSRV 1.49) * `1.25.x` - LTS release until March 2024. (MSRV 1.49) + * `1.32.x` - LTS release until September 2024 (MSRV 1.63) Each LTS release will continue to receive backported fixes for at least a year. If you wish to use a fixed minor release in your project, we recommend that you use an LTS release. To use a fixed minor version, you can specify the version with a tilde. For -example, to specify that you wish to use the newest `1.18.x` patch release, you +example, to specify that you wish to use the newest `1.25.x` patch release, you can use the following dependency specification: ```text -tokio = { version = "~1.18", features = [...] } +tokio = { version = "~1.25", features = [...] } ``` ### Previous LTS releases diff --git a/tokio-util/src/codec/lines_codec.rs b/tokio-util/src/codec/lines_codec.rs index 7a0a8f04541..5a6035d13b9 100644 --- a/tokio-util/src/codec/lines_codec.rs +++ b/tokio-util/src/codec/lines_codec.rs @@ -6,6 +6,8 @@ use std::{cmp, fmt, io, str, usize}; /// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into lines. /// +/// This uses the `\n` character as the line ending on all platforms. +/// /// [`Decoder`]: crate::codec::Decoder /// [`Encoder`]: crate::codec::Encoder #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] diff --git a/tokio/README.md b/tokio/README.md index 4a6671ad814..1fdbdc7fc2f 100644 --- a/tokio/README.md +++ b/tokio/README.md @@ -218,16 +218,17 @@ releases are: * `1.20.x` - LTS release until September 2023. (MSRV 1.49) * `1.25.x` - LTS release until March 2024. (MSRV 1.49) + * `1.32.x` - LTS release until September 2024 (MSRV 1.63) Each LTS release will continue to receive backported fixes for at least a year. If you wish to use a fixed minor release in your project, we recommend that you use an LTS release. To use a fixed minor version, you can specify the version with a tilde. For -example, to specify that you wish to use the newest `1.18.x` patch release, you +example, to specify that you wish to use the newest `1.25.x` patch release, you can use the following dependency specification: ```text -tokio = { version = "~1.18", features = [...] } +tokio = { version = "~1.25", features = [...] } ``` ### Previous LTS releases diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index 5b404c21940..96676e64cff 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -124,6 +124,18 @@ impl AsyncWrite for DuplexStream { Pin::new(&mut *self.write.lock()).poll_write(cx, buf) } + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut *self.write.lock()).poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + true + } + #[allow(unused_mut)] fn poll_flush( mut self: Pin<&mut Self>, @@ -224,6 +236,37 @@ impl Pipe { } Poll::Ready(Ok(len)) } + + fn poll_write_vectored_internal( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + if self.is_closed { + return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())); + } + let avail = self.max_buf_size - self.buffer.len(); + if avail == 0 { + self.write_waker = Some(cx.waker().clone()); + return Poll::Pending; + } + + let mut rem = avail; + for buf in bufs { + if rem == 0 { + break; + } + + let len = buf.len().min(rem); + self.buffer.extend_from_slice(&buf[..len]); + rem -= len; + } + + if let Some(waker) = self.read_waker.take() { + waker.wake(); + } + Poll::Ready(Ok(avail - rem)) + } } impl AsyncRead for Pipe { @@ -285,6 +328,38 @@ impl AsyncWrite for Pipe { } } + cfg_coop! { + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + + let ret = self.poll_write_vectored_internal(cx, bufs); + if ret.is_ready() { + coop.made_progress(); + } + ret + } + } + + cfg_not_coop! { + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); + self.poll_write_vectored_internal(cx, bufs) + } + } + + fn is_write_vectored(&self) -> bool { + true + } + fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll> { Poll::Ready(Ok(())) } diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index 5cf06fe6087..60c34304796 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -1166,6 +1166,10 @@ impl Child { /// If the caller wishes to explicitly control when the child's stdin /// handle is closed, they may `.take()` it before calling `.wait()`: /// + /// # Cancel safety + /// + /// This function is cancel safe. + /// /// ``` /// # #[cfg(not(unix))]fn main(){} /// # #[cfg(unix)] diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 95b517c94eb..66a3e51bb97 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -474,7 +474,7 @@ impl RuntimeMetrics { /// /// This metric only applies to the **multi-threaded** scheduler. /// - /// The worker steal count starts at zero when the runtime is created and + /// The worker overflow count starts at zero when the runtime is created and /// increases by one each time the worker attempts to schedule a task /// locally, but its local queue is full. When this happens, half of the /// local queue is moved to the injection queue. diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 59b68b8d2cf..11dfa274ee3 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -575,6 +575,7 @@ impl LocalSet { run_until.await } + #[track_caller] pub(in crate::task) fn spawn_named( &self, future: F, diff --git a/tokio/tests/duplex_stream.rs b/tokio/tests/duplex_stream.rs new file mode 100644 index 00000000000..64111fb960d --- /dev/null +++ b/tokio/tests/duplex_stream.rs @@ -0,0 +1,47 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::io::IoSlice; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +const HELLO: &[u8] = b"hello world..."; + +#[tokio::test] +async fn write_vectored() { + let (mut client, mut server) = tokio::io::duplex(64); + + let ret = client + .write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)]) + .await + .unwrap(); + assert_eq!(ret, HELLO.len() * 2); + + client.flush().await.unwrap(); + drop(client); + + let mut buf = Vec::with_capacity(HELLO.len() * 2); + let bytes_read = server.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(bytes_read, HELLO.len() * 2); + assert_eq!(buf, [HELLO, HELLO].concat()); +} + +#[tokio::test] +async fn write_vectored_and_shutdown() { + let (mut client, mut server) = tokio::io::duplex(64); + + let ret = client + .write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)]) + .await + .unwrap(); + assert_eq!(ret, HELLO.len() * 2); + + client.shutdown().await.unwrap(); + drop(client); + + let mut buf = Vec::with_capacity(HELLO.len() * 2); + let bytes_read = server.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(bytes_read, HELLO.len() * 2); + assert_eq!(buf, [HELLO, HELLO].concat()); +}