From 9e52e64ff596897d944232ef79a836c813e0739b Mon Sep 17 00:00:00 2001 From: Urhengulas Date: Sat, 24 Oct 2020 17:14:14 +0200 Subject: [PATCH 1/6] Bump tokio (and related dependencies) --- Cargo.toml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fcde338e71..0b528a096b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ itoa = "0.4.1" tracing = { version = "0.1", default-features = false, features = ["log", "std"] } pin-project = "1.0" tower-service = "0.3" -tokio = { version = "0.2.11", features = ["sync"] } +tokio = { version = "0.3", features = ["sync"] } want = "0.3" # Optional @@ -49,9 +49,9 @@ spmc = "0.3" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -tokio = { version = "0.2.2", features = ["fs", "macros", "io-std", "rt-util", "sync", "time", "test-util"] } -tokio-test = "0.2" -tokio-util = { version = "0.3", features = ["codec"] } +tokio = { version = "0.3", features = ["fs", "macros", "io-std", "rt", "sync", "time", "test-util"] } +tokio-test = "0.3" +tokio-util = { version = "0.4", features = ["codec"] } tower-util = "0.3" url = "1.0" @@ -65,12 +65,12 @@ default = [ ] runtime = [ "tcp", - "tokio/rt-core", + "tokio/rt", ] tcp = [ "socket2", - "tokio/blocking", - "tokio/tcp", + "tokio/net", + "tokio/rt", "tokio/time", ] From 9c75fc74322842659d016c736bbe945db9a78f24 Mon Sep 17 00:00:00 2001 From: Urhengulas Date: Sat, 24 Oct 2020 16:46:49 +0200 Subject: [PATCH 2/6] WIP: update h2 (git dependency) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 0b528a096b..79108c72e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ http = "0.2" http-body = "0.3.1" httpdate = "0.3" httparse = "1.0" -h2 = "0.2.2" +h2 = { git = "https://github.com/hyperium/h2" } itoa = "0.4.1" tracing = { version = "0.1", default-features = false, features = ["log", "std"] } pin-project = "1.0" From 842ad6e4c83598077ed2d5eae3e93bb785c187b4 Mon Sep 17 00:00:00 2001 From: Urhengulas Date: Sat, 24 Oct 2020 17:12:39 +0200 Subject: [PATCH 3/6] Sleep changes --- src/client/connect/http.rs | 6 +++--- src/client/pool.rs | 4 ++-- src/proto/h2/ping.rs | 6 +++--- src/server/tcp.rs | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index c1cdf4e129..342d4397ed 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -13,7 +13,7 @@ use futures_util::future::Either; use http::uri::{Scheme, Uri}; use pin_project::pin_project; use tokio::net::TcpStream; -use tokio::time::Delay; +use tokio::time::Sleep; use super::dns::{self, resolve, GaiResolver, Resolve}; use super::{Connected, Connection}; @@ -510,7 +510,7 @@ impl ConnectingTcp { local_addr_ipv6, preferred: ConnectingTcpRemote::new(preferred_addrs, connect_timeout), fallback: Some(ConnectingTcpFallback { - delay: tokio::time::delay_for(fallback_timeout), + delay: tokio::time::sleep(fallback_timeout), remote: ConnectingTcpRemote::new(fallback_addrs, connect_timeout), }), reuse_address, @@ -528,7 +528,7 @@ impl ConnectingTcp { } struct ConnectingTcpFallback { - delay: Delay, + delay: Sleep, remote: ConnectingTcpRemote, } diff --git a/src/client/pool.rs b/src/client/pool.rs index 8c1ee24c0d..933bb9f592 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -850,7 +850,7 @@ mod tests { let pooled = pool.pooled(c(key.clone()), Uniq(41)); drop(pooled); - tokio::time::delay_for(pool.locked().timeout.unwrap()).await; + tokio::time::sleep(pool.locked().timeout.unwrap()).await; let mut checkout = pool.checkout(key); let poll_once = PollOnce(&mut checkout); let is_not_ready = poll_once.await.is_none(); @@ -871,7 +871,7 @@ mod tests { pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3) ); - tokio::time::delay_for(pool.locked().timeout.unwrap()).await; + tokio::time::sleep(pool.locked().timeout.unwrap()).await; let mut checkout = pool.checkout(key.clone()); let poll_once = PollOnce(&mut checkout); diff --git a/src/proto/h2/ping.rs b/src/proto/h2/ping.rs index c4fe2dd15c..7b1a5eb9dd 100644 --- a/src/proto/h2/ping.rs +++ b/src/proto/h2/ping.rs @@ -33,7 +33,7 @@ use std::time::Instant; use h2::{Ping, PingPong}; #[cfg(feature = "runtime")] -use tokio::time::{Delay, Instant}; +use tokio::time::{Instant, Sleep}; type WindowSize = u32; @@ -60,7 +60,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) interval, timeout: config.keep_alive_timeout, while_idle: config.keep_alive_while_idle, - timer: tokio::time::delay_for(interval), + timer: tokio::time::sleep(interval), state: KeepAliveState::Init, }); @@ -156,7 +156,7 @@ struct KeepAlive { while_idle: bool, state: KeepAliveState, - timer: Delay, + timer: Sleep, } #[cfg(feature = "runtime")] diff --git a/src/server/tcp.rs b/src/server/tcp.rs index e526303429..61a6ff6818 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -4,7 +4,7 @@ use std::net::{SocketAddr, TcpListener as StdTcpListener}; use std::time::Duration; use tokio::net::TcpListener; -use tokio::time::Delay; +use tokio::time::Sleep; use crate::common::{task, Future, Pin, Poll}; @@ -19,7 +19,7 @@ pub struct AddrIncoming { sleep_on_errors: bool, tcp_keepalive_timeout: Option, tcp_nodelay: bool, - timeout: Option, + timeout: Option, } impl AddrIncoming { @@ -119,7 +119,7 @@ impl AddrIncoming { error!("accept error: {}", e); // Sleep 1s. - let mut timeout = tokio::time::delay_for(Duration::from_secs(1)); + let mut timeout = tokio::time::sleep(Duration::from_secs(1)); match Pin::new(&mut timeout).poll(cx) { Poll::Ready(()) => { From 538f70ffe73ffcf129480d4a7425437067df9bc6 Mon Sep 17 00:00:00 2001 From: Urhengulas Date: Sat, 24 Oct 2020 17:17:54 +0200 Subject: [PATCH 4/6] Adapt AsyncRead, AsynWrite --- src/common/io/rewind.rs | 30 ++++++++---------------------- src/server/tcp.rs | 33 ++++----------------------------- src/upgrade.rs | 40 ++++++++-------------------------------- 3 files changed, 20 insertions(+), 83 deletions(-) diff --git a/src/common/io/rewind.rs b/src/common/io/rewind.rs index 14650697c3..ad5263d703 100644 --- a/src/common/io/rewind.rs +++ b/src/common/io/rewind.rs @@ -2,7 +2,7 @@ use std::marker::Unpin; use std::{cmp, io}; use bytes::{Buf, Bytes}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::common::{task, Pin, Poll}; @@ -46,27 +46,22 @@ impl AsyncRead for Rewind where T: AsyncRead + Unpin, { - #[inline] - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } - fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut task::Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { if let Some(mut prefix) = self.pre.take() { // If there are no remaining bytes, let the bytes get dropped. if !prefix.is_empty() { - let copy_len = cmp::min(prefix.len(), buf.len()); - prefix.copy_to_slice(&mut buf[..copy_len]); + let copy_len = cmp::min(prefix.len(), buf.remaining()); + // TODO: There should be a way to do following two lines cleaner... + buf.put_slice(prefix.to_vec().as_slice()); + prefix.advance(copy_len); // Put back whats left if !prefix.is_empty() { self.pre = Some(prefix); } - - return Poll::Ready(Ok(copy_len)); } } Pin::new(&mut self.inner).poll_read(cx, buf) @@ -92,15 +87,6 @@ where fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_shutdown(cx) } - - #[inline] - fn poll_write_buf( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - Pin::new(&mut self.inner).poll_write_buf(cx, buf) - } } #[cfg(test)] diff --git a/src/server/tcp.rs b/src/server/tcp.rs index 61a6ff6818..20e584a078 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -186,7 +186,7 @@ mod addr_stream { use std::net::SocketAddr; #[cfg(unix)] use std::os::unix::io::{AsRawFd, RawFd}; - use tokio::io::{AsyncRead, AsyncWrite}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; use crate::common::{task, Pin, Poll}; @@ -231,30 +231,14 @@ mod addr_stream { } impl AsyncRead for AddrStream { - unsafe fn prepare_uninitialized_buffer( - &self, - buf: &mut [std::mem::MaybeUninit], - ) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } - #[inline] fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut task::Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.inner).poll_read(cx, buf) } - - #[inline] - fn poll_read_buf( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - Pin::new(&mut self.inner).poll_read_buf(cx, buf) - } } impl AsyncWrite for AddrStream { @@ -267,15 +251,6 @@ mod addr_stream { Pin::new(&mut self.inner).poll_write(cx, buf) } - #[inline] - fn poll_write_buf( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - Pin::new(&mut self.inner).poll_write_buf(cx, buf) - } - #[inline] fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { // TCP flush is a noop diff --git a/src/upgrade.rs b/src/upgrade.rs index 55f390431f..d5b4816cca 100644 --- a/src/upgrade.rs +++ b/src/upgrade.rs @@ -12,7 +12,7 @@ use std::io; use std::marker::Unpin; use bytes::{Buf, Bytes}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::sync::oneshot; use crate::common::io::Rewind; @@ -105,15 +105,11 @@ impl Upgraded { } impl AsyncRead for Upgraded { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit]) -> bool { - self.io.prepare_uninitialized_buffer(buf) - } - fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut task::Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.io).poll_read(cx, buf) } } @@ -127,14 +123,6 @@ impl AsyncWrite for Upgraded { Pin::new(&mut self.io).poll_write(cx, buf) } - fn poll_write_buf( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - Pin::new(self.io.get_mut()).poll_write_dyn_buf(cx, buf) - } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.io).poll_flush(cx) } @@ -247,15 +235,11 @@ impl dyn Io + Send { } impl AsyncRead for ForwardsWriteBuf { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit]) -> bool { - self.0.prepare_uninitialized_buffer(buf) - } - fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut task::Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.0).poll_read(cx, buf) } } @@ -269,14 +253,6 @@ impl AsyncWrite for ForwardsWriteBuf { Pin::new(&mut self.0).poll_write(cx, buf) } - fn poll_write_buf( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - Pin::new(&mut self.0).poll_write_buf(cx, buf) - } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.0).poll_flush(cx) } @@ -292,7 +268,7 @@ impl Io for ForwardsWriteBuf { cx: &mut task::Context<'_>, mut buf: &mut dyn Buf, ) -> Poll> { - Pin::new(&mut self.0).poll_write_buf(cx, &mut buf) + Pin::new(&mut self.0).poll_write(cx, buf.bytes()) } } From 8b0a3215bea008cb55005816d5e1ea0b5096699b Mon Sep 17 00:00:00 2001 From: Urhengulas Date: Sat, 24 Oct 2020 17:18:51 +0200 Subject: [PATCH 5/6] TcpStream::connect_std -> from_std --- src/client/connect/http.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index 342d4397ed..133240b97d 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -640,17 +640,7 @@ fn connect( let std_tcp = socket.into_tcp_stream(); - Ok(async move { - let connect = TcpStream::connect_std(std_tcp, &addr); - match connect_timeout { - Some(dur) => match tokio::time::timeout(dur, connect).await { - Ok(Ok(s)) => Ok(s), - Ok(Err(e)) => Err(e), - Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)), - }, - None => connect.await, - } - }) + Ok(async move { TcpStream::from_std(std_tcp) }) } impl ConnectingTcp { From d60d8a35acbb013c329be876fe9a555527bb81ca Mon Sep 17 00:00:00 2001 From: Urhengulas Date: Sat, 24 Oct 2020 17:23:50 +0200 Subject: [PATCH 6/6] WIP: adapt tests --- src/upgrade.rs | 4 ++-- tests/client.rs | 52 ++++++++++++++++++++++++------------------------- tests/server.rs | 20 +++++++++---------- 3 files changed, 38 insertions(+), 38 deletions(-) diff --git a/src/upgrade.rs b/src/upgrade.rs index d5b4816cca..344af29dda 100644 --- a/src/upgrade.rs +++ b/src/upgrade.rs @@ -302,8 +302,8 @@ mod tests { fn poll_read( self: Pin<&mut Self>, _cx: &mut task::Context<'_>, - _buf: &mut [u8], - ) -> Poll> { + _buf: &mut ReadBuf<'_>, + ) -> Poll> { unreachable!("Mock::poll_read") } } diff --git a/tests/client.rs b/tests/client.rs index 576423768f..3eadf369d4 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -963,7 +963,7 @@ mod dispatch_impl { use futures_util::future::{FutureExt, TryFutureExt}; use futures_util::stream::StreamExt; use http::Uri; - use tokio::io::{AsyncRead, AsyncWrite}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; use tokio::runtime::Runtime; @@ -1016,7 +1016,7 @@ mod dispatch_impl { rt.block_on(async move { let (res, ()) = future::join(res, rx).await; res.unwrap(); - tokio::time::delay_for(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; }); rt.block_on(closes.into_future()).0.expect("closes"); @@ -1075,7 +1075,7 @@ mod dispatch_impl { rt.block_on(async move { let (res, ()) = future::join(res, rx).await; res.unwrap(); - tokio::time::delay_for(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; }); rt.block_on(closes.into_future()).0.expect("closes"); @@ -1147,7 +1147,7 @@ mod dispatch_impl { drop(client); // and wait a few ticks for the connections to close - let t = tokio::time::delay_for(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -1195,7 +1195,7 @@ mod dispatch_impl { future::select(res, rx1).await; // res now dropped - let t = tokio::time::delay_for(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -1250,7 +1250,7 @@ mod dispatch_impl { res.unwrap(); // and wait a few ticks to see the connection drop - let t = tokio::time::delay_for(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -1300,7 +1300,7 @@ mod dispatch_impl { let (res, ()) = future::join(res, rx).await; res.unwrap(); - let t = tokio::time::delay_for(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -1346,7 +1346,7 @@ mod dispatch_impl { let (res, ()) = future::join(res, rx).await; res.unwrap(); - let t = tokio::time::delay_for(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -1544,7 +1544,7 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 0); let delayed_body = rx1 - .then(|_| tokio::time::delay_for(Duration::from_millis(200))) + .then(|_| tokio::time::sleep(Duration::from_millis(200))) .map(|_| Ok::<_, ()>("hello a")) .map_err(|_| -> hyper::Error { panic!("rx1") }) .into_stream(); @@ -1559,7 +1559,7 @@ mod dispatch_impl { // req 1 let fut = future::join(client.request(req), rx) - .then(|_| tokio::time::delay_for(Duration::from_millis(200))) + .then(|_| tokio::time::sleep(Duration::from_millis(200))) // req 2 .then(move |()| { let rx = rx3.expect("thread panicked"); @@ -1646,7 +1646,7 @@ mod dispatch_impl { // sleep real quick to let the threadpool put connection in ready // state and back into client pool - tokio::time::delay_for(Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(50)).await; let rx = rx2.expect("thread panicked"); let req = Request::builder() @@ -1961,10 +1961,10 @@ mod dispatch_impl { impl AsyncRead for DebugStream { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.tcp).poll_read(cx, buf) } } @@ -2090,7 +2090,7 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); let chunk = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); assert_eq!(chunk.len(), 5); } @@ -2185,7 +2185,7 @@ mod conn { concat(res) }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); } @@ -2231,7 +2231,7 @@ mod conn { concat(res) }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); } @@ -2283,7 +2283,7 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); rt.block_on(future::join3(res1, res2, rx).map(|r| r.0)) .unwrap(); } @@ -2346,7 +2346,7 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); rt.block_on(future::join3(until_upgrade, res, rx).map(|r| r.0)) .unwrap(); @@ -2439,7 +2439,7 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); rt.block_on(future::join3(until_tunneled, res, rx).map(|r| r.0)) .unwrap(); @@ -2529,7 +2529,7 @@ mod conn { let _ = shdn_tx.send(()); // Allow time for graceful shutdown roundtrips... - tokio::time::delay_for(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; // After graceful shutdown roundtrips, the client should be closed... future::poll_fn(|ctx| client.poll_ready(ctx)) @@ -2606,7 +2606,7 @@ mod conn { }); // sleep longer than keepalive would trigger - tokio::time::delay_for(Duration::from_secs(4)).await; + tokio::time::sleep(Duration::from_secs(4)).await; future::poll_fn(|ctx| client.poll_ready(ctx)) .await @@ -2711,7 +2711,7 @@ mod conn { let _resp = client.send_request(req1).await.expect("send_request"); // sleep longer than keepalive would trigger - tokio::time::delay_for(Duration::from_secs(4)).await; + tokio::time::sleep(Duration::from_secs(4)).await; future::poll_fn(|ctx| client.poll_ready(ctx)) .await @@ -2761,10 +2761,10 @@ mod conn { impl AsyncRead for DebugStream { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.tcp).poll_read(cx, buf) } } diff --git a/tests/server.rs b/tests/server.rs index 19b4cac25b..36906c4f30 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -821,7 +821,7 @@ async fn expect_continue_waits_for_body_poll() { service_fn(|req| { assert_eq!(req.headers()["expect"], "100-continue"); // But! We're never going to poll the body! - tokio::time::delay_for(Duration::from_millis(50)).map(move |_| { + tokio::time::sleep(Duration::from_millis(50)).map(move |_| { // Move and drop the req, so we don't auto-close drop(req); Response::builder() @@ -1100,7 +1100,7 @@ async fn http1_allow_half_close() { .serve_connection( socket, service_fn(|_| { - tokio::time::delay_for(Duration::from_millis(500)) + tokio::time::sleep(Duration::from_millis(500)) .map(|_| Ok::<_, hyper::Error>(Response::new(Body::empty()))) }), ) @@ -1127,7 +1127,7 @@ async fn disconnect_after_reading_request_before_responding() { .serve_connection( socket, service_fn(|_| { - tokio::time::delay_for(Duration::from_secs(2)).map( + tokio::time::sleep(Duration::from_secs(2)).map( |_| -> Result, hyper::Error> { panic!("response future should have been dropped"); }, @@ -1897,7 +1897,7 @@ async fn http2_keep_alive_with_responsive_client() { conn.await.expect("client conn"); }); - tokio::time::delay_for(Duration::from_secs(4)).await; + tokio::time::sleep(Duration::from_secs(4)).await; let req = http::Request::new(hyper::Body::empty()); client.send_request(req).await.expect("client.send_request"); @@ -2319,7 +2319,7 @@ impl Write for DebugStream { impl AsyncWrite for DebugStream { fn poll_write( mut self: Pin<&mut Self>, - cx: &mut Context<'_>, + cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(&mut self.stream).poll_write(cx, buf) @@ -2331,7 +2331,7 @@ impl AsyncWrite for DebugStream { fn poll_shutdown( mut self: Pin<&mut Self>, - cx: &mut Context<'_>, + cx: &mut task::Context<'_>, ) -> Poll> { Pin::new(&mut self.stream).poll_shutdown(cx) } @@ -2339,10 +2339,10 @@ impl AsyncWrite for DebugStream { impl AsyncRead for DebugStream { fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.stream).poll_read(cx, buf) } }