From d88ec30932d6788984c4a85304a37b49abe4bbb7 Mon Sep 17 00:00:00 2001 From: ruben Date: Thu, 9 Mar 2023 08:25:16 +0100 Subject: [PATCH 01/21] create example --- Cargo.toml | 5 + examples/client_http2_single_threaded.rs | 132 +++++++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 examples/client_http2_single_threaded.rs diff --git a/Cargo.toml b/Cargo.toml index 16d8e585be..5259d1b6a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,6 +112,11 @@ name = "client" path = "examples/client.rs" required-features = ["full"] +[[example]] +name = "client_single_thread" +path = "examples/client_http2_single_threaded.rs" +required-features = ["full"] + [[example]] name = "client_json" path = "examples/client_json.rs" diff --git a/examples/client_http2_single_threaded.rs b/examples/client_http2_single_threaded.rs new file mode 100644 index 0000000000..de27171ea6 --- /dev/null +++ b/examples/client_http2_single_threaded.rs @@ -0,0 +1,132 @@ +#![deny(warnings)] +#![warn(rust_2018_idioms)] +use std::env; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use bytes::Bytes; +use http_body_util::{BodyExt, Empty}; +use hyper::body::{Body as HttpBody, Frame}; +use hyper::Error; +use hyper::Request; +use tokio::io::{self, AsyncWriteExt as _}; +use tokio::net::TcpStream; + +struct Body { + // Our Body type is !Send and !Sync: + _marker: PhantomData<*const ()>, + data: Option, +} + +impl From for Body { + fn from(a: String) -> Self { + Body { + _marker: PhantomData, + data: Some(a.into()), + } + } +} + +impl HttpBody for Body { + type Data = Bytes; + type Error = Error; + + fn poll_frame( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Poll::Ready(self.get_mut().data.take().map(|d| Ok(Frame::data(d)))) + } +} + +fn main() -> Result<(), Box> { + pretty_env_logger::init(); + + // Configure a runtime that runs everything on the current thread + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("build runtime"); + + // Combine it with a `LocalSet, which means it can spawn !Send futures... + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, init()) +} + +async fn init() -> Result<(), Box> { + // Some simple CLI args requirements... + let url = match env::args().nth(1) { + Some(url) => url, + None => { + println!("Usage: client "); + return Ok(()); + } + }; + + // HTTPS requires picking a TLS implementation, so give a better + // warning if the user tries to request an 'https' URL. + let url = url.parse::().unwrap(); + if url.scheme_str() != Some("http") { + println!("This example only works with 'http' URLs."); + return Ok(()); + } + + fetch_url(url).await +} + +async fn fetch_url(url: hyper::Uri) -> Result<(), Box> { + let host = url.host().expect("uri has no host"); + let port = url.port_u16().unwrap_or(80); + let addr = format!("{}:{}", host, port); + let stream = TcpStream::connect(addr).await?; + + let (mut sender, conn) = hyper::client::conn::http2::handshake(LocalExec, stream).await?; + tokio::task::spawn_local(async move { + if let Err(err) = conn.await { + println!("Connection failed: {:?}", err); + } + }); + + let authority = url.authority().unwrap().clone(); + + let req = Request::builder() + .uri(url) + .header(hyper::header::HOST, authority.as_str()) + .body(Body::from("test".to_string()))?; + + let mut res = sender.send_request(req).await?; + + println!("Response: {}", res.status()); + println!("Headers: {:#?}\n", res.headers()); + + // Stream the body, writing each chunk to stdout as we get it + // (instead of buffering and printing at the end). + while let Some(next) = res.frame().await { + let frame = next?; + if let Some(chunk) = frame.data_ref() { + io::stdout().write_all(&chunk).await?; + } + } + + println!("\n\nDone!"); + + Ok(()) +} + +// NOTE: This part is only needed for HTTP/2. HTTP/1 doesn't need an executor. +// +// Since the Server needs to spawn some background tasks, we needed +// to configure an Executor that can spawn !Send futures... +#[derive(Clone, Copy, Debug)] +struct LocalExec; + +impl hyper::rt::Executor for LocalExec +where + F: std::future::Future + 'static, // not requiring `Send` +{ + fn execute(&self, fut: F) { + // This will spawn into the currently running `LocalSet`. + tokio::task::spawn_local(fut); + } +} From 44587d6f6e83ef86a8818058ab1674408c368780 Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 21 May 2023 17:13:07 +0200 Subject: [PATCH 02/21] make anonymous futures structs --- Cargo.toml | 1 + examples/client_http2_single_threaded.rs | 2 +- src/client/conn/http2.rs | 69 ++-- src/client/dispatch.rs | 60 +++- src/common/exec.rs | 25 ++ src/lib.rs | 6 +- src/proto/h2/client.rs | 384 ++++++++++++++++++----- src/proto/h2/mod.rs | 2 +- tests/client.rs | 108 +++---- 9 files changed, 496 insertions(+), 161 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5259d1b6a5..2232bb3f82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ h2 = { version = "0.3.9", optional = true } itoa = "1" tracing = { version = "0.1", default-features = false, features = ["std"] } pin-project-lite = "0.2.4" +pin-project = "1.0.12" tokio = { version = "1", features = ["sync"] } want = "0.3" diff --git a/examples/client_http2_single_threaded.rs b/examples/client_http2_single_threaded.rs index de27171ea6..6a1e7aa83b 100644 --- a/examples/client_http2_single_threaded.rs +++ b/examples/client_http2_single_threaded.rs @@ -6,7 +6,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; -use http_body_util::{BodyExt, Empty}; +use http_body_util::BodyExt; use hyper::body::{Body as HttpBody, Frame}; use hyper::Error; use hyper::Request; diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index c45b67dffd..dc227accbf 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -11,6 +11,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use super::super::dispatch; use crate::body::{Body, Incoming as IncomingBody}; +use crate::common::exec::ExecutorClient; use crate::common::time::Time; use crate::common::{ exec::{BoxSendFuture, Exec}, @@ -26,7 +27,9 @@ pub struct SendRequest { impl Clone for SendRequest { fn clone(&self) -> SendRequest { - SendRequest { dispatch: self.dispatch.clone() } + SendRequest { + dispatch: self.dispatch.clone(), + } } } @@ -35,20 +38,25 @@ impl Clone for SendRequest { /// In most cases, this should just be spawned into an executor, so that it /// can process incoming and outgoing messages, notice hangups, and the like. #[must_use = "futures do nothing unless polled"] -pub struct Connection +pub struct Connection where - T: AsyncRead + AsyncWrite + Send + 'static, + T: AsyncRead + AsyncWrite + Send + 'static + Unpin, B: Body + 'static, + E: ExecutorClient + Unpin, + ::Error: std::error::Error + Send + Sync + 'static, { - inner: (PhantomData, proto::h2::ClientTask), + inner: (PhantomData, proto::h2::ClientTask), } /// A builder to configure an HTTP connection. /// /// After setting options, the builder is used to create a handshake future. #[derive(Clone, Debug)] -pub struct Builder { - pub(super) exec: Exec, +pub struct Builder +where + Ex: Executor + Send + Sync + 'static + Clone, +{ + pub(super) exec: Ex, pub(super) timer: Time, h2_builder: proto::h2::client::Config, } @@ -60,13 +68,15 @@ pub struct Builder { pub async fn handshake( exec: E, io: T, -) -> crate::Result<(SendRequest, Connection)> +) -> crate::Result<(SendRequest, Connection)> where E: Executor + Send + Sync + 'static, T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into>, + E: ExecutorClient + Unpin + Clone, + ::Error: std::error::Error + Send + Sync + 'static, { Builder::new(exec).handshake(io).await } @@ -189,12 +199,14 @@ impl fmt::Debug for SendRequest { // ===== impl Connection -impl Connection +impl Connection where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + Unpin + Send + 'static, B::Data: Send, B::Error: Into>, + E: ExecutorClient + Unpin, + ::Error: std::error::Error + Send + Sync + 'static, { /// Returns whether the [extended CONNECT protocol][1] is enabled or not. /// @@ -210,22 +222,27 @@ where } } -impl fmt::Debug for Connection +impl fmt::Debug for Connection where - T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static, + T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static+ Unpin, B: Body + 'static, + E: ExecutorClient + Unpin, + ::Error: std::error::Error + Send + Sync + 'static, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection").finish() } } -impl Future for Connection +impl Future for Connection where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, - B: Body + Send + 'static, + B: Body + 'static + Unpin, B::Data: Send, + E: Unpin, B::Error: Into>, + E: ExecutorClient + 'static + Send + Sync + Unpin, + ::Error: std::error::Error + Send + Sync + 'static, { type Output = crate::Result<()>; @@ -240,22 +257,22 @@ where // ===== impl Builder -impl Builder { +impl Builder +where + Ex: Executor + Send + Sync + 'static + Clone, +{ /// Creates a new connection builder. #[inline] - pub fn new(exec: E) -> Builder - where - E: Executor + Send + Sync + 'static, - { + pub fn new(exec: Ex) -> Builder { Builder { - exec: Exec::new(exec), + exec, timer: Time::Empty, h2_builder: Default::default(), } } /// Provide a timer to execute background HTTP2 tasks. - pub fn timer(&mut self, timer: M) -> &mut Builder + pub fn timer(&mut self, timer: M) -> &mut Builder where M: Timer + Send + Sync + 'static, { @@ -284,10 +301,7 @@ impl Builder { /// Passing `None` will do nothing. /// /// If not set, hyper will use a default. - pub fn initial_connection_window_size( - &mut self, - sz: impl Into>, - ) -> &mut Self { + pub fn initial_connection_window_size(&mut self, sz: impl Into>) -> &mut Self { if let Some(sz) = sz.into() { self.h2_builder.adaptive_window = false; self.h2_builder.initial_conn_window_size = sz; @@ -329,10 +343,7 @@ impl Builder { /// Pass `None` to disable HTTP2 keep-alive. /// /// Default is currently disabled. - pub fn keep_alive_interval( - &mut self, - interval: impl Into>, - ) -> &mut Self { + pub fn keep_alive_interval(&mut self, interval: impl Into>) -> &mut Self { self.h2_builder.keep_alive_interval = interval.into(); self } @@ -395,12 +406,14 @@ impl Builder { pub fn handshake( &self, io: T, - ) -> impl Future, Connection)>> + ) -> impl Future, Connection)>> + '_ where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into>, + Ex: ExecutorClient + Unpin, + ::Error: std::error::Error + Send + Sync + 'static, { let opts = self.clone(); diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 3aef84012f..2f50dd7937 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,11 +1,19 @@ #[cfg(feature = "http2")] use std::future::Future; +use http::{Request, Response}; +use http_body::Body; +use pin_project::pin_project; use tokio::sync::{mpsc, oneshot}; +use tracing::trace; #[cfg(feature = "http2")] use crate::common::Pin; -use crate::common::{task, Poll}; +use crate::{ + body::Incoming, + common::{task, Poll}, + proto::h2::client::ResponseFutMap, +}; #[cfg(test)] pub(crate) type RetryPromise = oneshot::Receiver)>>; @@ -275,7 +283,7 @@ impl Callback { use futures_util::future; use tracing::trace; - let mut cb = Some(self); + let mut cb: Option> = Some(self); // "select" on this callback being canceled, and the future completing future::poll_fn(move |cx| { @@ -300,6 +308,54 @@ impl Callback { } } +#[pin_project] +pub struct SendWhen +where + B: Body + 'static, +{ + #[pin] + pub(crate) when: ResponseFutMap, + #[pin] + pub(crate) call_back: Option, Response>>, +} + +impl Future for SendWhen +where + B: Body + 'static, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let mut this = self.project(); + + let mut call_back = this.call_back.take().expect("polled after complete"); + + match Pin::new(&mut this.when).poll(cx) { + Poll::Ready(Ok(res)) => { + call_back.send(Ok(res)); + Poll::Ready(()) + } + Poll::Pending => { + // check if the callback is canceled + match (call_back.poll_canceled(cx)) { + Poll::Ready(v) => v, + Poll::Pending => { + // Move call_back back to struct before return + this.call_back.set(Some(call_back)); + return std::task::Poll::Pending; + } + }; + trace!("send_when canceled"); + Poll::Ready(()) + } + Poll::Ready(Err(err)) => { + call_back.send(Err(err)); + Poll::Ready(()) + } + } + } +} + #[cfg(test)] mod tests { #[cfg(feature = "nightly")] diff --git a/src/common/exec.rs b/src/common/exec.rs index ef006c9d84..c9c28ca5b3 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -3,6 +3,9 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use tokio::io::{AsyncRead, AsyncWrite}; + +use crate::proto::h2::client::H2ClientFuture; use crate::rt::Executor; pub(crate) type BoxSendFuture = Pin + Send>>; @@ -35,6 +38,28 @@ impl fmt::Debug for Exec { } } +pub trait ExecutorClient +where + B: http_body::Body, + B::Error: std::error::Error + Send + Sync + 'static, + T: AsyncRead + AsyncWrite + Unpin, +{ + fn execute_h2_future(&mut self, future: H2ClientFuture); +} + +impl ExecutorClient for E +where + E: Executor>, + B: http_body::Body + 'static, + B::Error: std::error::Error + Send + Sync + 'static, + H2ClientFuture: Future, + T: AsyncRead + AsyncWrite + Unpin, +{ + fn execute_h2_future(&mut self, future: H2ClientFuture) { + self.execute(future) + } +} + // If http2 is not enable, we just have a stub here, so that the trait bounds // that *would* have been needed are still checked. Why? // diff --git a/src/lib.rs b/src/lib.rs index f5f442d71e..a3b56a101c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,8 @@ #![deny(missing_docs)] -#![deny(missing_debug_implementations)] +//#![deny(missing_debug_implementations)] #![cfg_attr(test, deny(rust_2018_idioms))] -#![cfg_attr(all(test, feature = "full"), deny(unreachable_pub))] -#![cfg_attr(all(test, feature = "full"), deny(warnings))] +//#![cfg_attr(all(test, feature = "full"), deny(unreachable_pub))] +//#![cfg_attr(all(test, feature = "full"), deny(warnings))] #![cfg_attr(all(test, feature = "nightly"), feature(test))] #![cfg_attr(docsrs, feature(doc_cfg))] diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 121e24dd84..1a1a20cbf5 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -1,20 +1,26 @@ use std::error::Error as StdError; +use std::marker::PhantomData; + use std::time::Duration; use bytes::Bytes; +use futures_channel::mpsc::{Receiver, Sender}; use futures_channel::{mpsc, oneshot}; -use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _}; -use futures_util::stream::StreamExt as _; -use h2::client::{Builder, SendRequest}; +use futures_util::future::{self, Either, FutureExt as _, Select, TryFutureExt as _}; +use futures_util::stream::{StreamExt as _, StreamFuture}; +use h2::client::{Builder, Connection, SendRequest}; use h2::SendStream; use http::{Method, StatusCode}; +use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, trace, warn}; +use super::ping::{Ponger, Recorder}; use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; use crate::body::{Body, Incoming as IncomingBody}; +use crate::client::dispatch::{Callback, SendWhen}; +use crate::common::exec::ExecutorClient; use crate::common::time::Time; -use crate::client::dispatch::Callback; use crate::common::{exec::Exec, task, Future, Never, Pin, Poll}; use crate::ext::Protocol; use crate::headers; @@ -98,17 +104,19 @@ fn new_ping_config(config: &Config) -> ping::Config { } } -pub(crate) async fn handshake( +pub(crate) async fn handshake( io: T, req_rx: ClientRx, config: &Config, - exec: Exec, + mut exec: E, timer: Time, -) -> crate::Result> +) -> crate::Result> where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, - B: Body, + B: Body + 'static, B::Data: Send + 'static, + E: ExecutorClient + Unpin, + ::Error: std::error::Error + Send + Sync + 'static, { let (h2_tx, mut conn) = new_builder(config) .handshake::<_, SendBuf>(io) @@ -122,40 +130,24 @@ where let (conn_drop_ref, rx) = mpsc::channel(1); let (cancel_tx, conn_eof) = oneshot::channel(); - let conn_drop_rx = rx.into_future().map(|(item, _rx)| { - if let Some(never) = item { - match never {} - } - }); + let conn_drop_rx = rx.into_future(); let ping_config = new_ping_config(&config); let (conn, ping) = if ping_config.is_enabled() { let pp = conn.ping_pong().expect("conn.ping_pong"); - let (recorder, mut ponger) = ping::channel(pp, ping_config, timer); - - let conn = future::poll_fn(move |cx| { - match ponger.poll(cx) { - Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { - conn.set_target_window_size(wnd); - conn.set_initial_window_size(wnd)?; - } - Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { - debug!("connection keep-alive timed out"); - return Poll::Ready(Ok(())); - } - Poll::Pending => {} - } + let (recorder, ponger) = ping::channel(pp, ping_config, timer); - Pin::new(&mut conn).poll(cx) - }); + let conn: Conn<_, B> = Conn::new(ponger, conn); (Either::Left(conn), recorder) } else { (Either::Right(conn), ping::disabled()) }; - let conn = conn.map_err(|e| debug!("connection error: {}", e)); + let conn: ConnMapErr = ConnMapErr { conn }; - exec.execute(conn_task(conn, conn_drop_rx, cancel_tx)); + exec.execute_h2_future(H2ClientFuture::Task { + task: ConnTask::new(conn, conn_drop_rx, cancel_tx), + }); Ok(ClientTask { ping, @@ -165,25 +157,183 @@ where h2_tx, req_rx, fut_ctx: None, + marker: PhantomData, }) } -async fn conn_task(conn: C, drop_rx: D, cancel_tx: oneshot::Sender) +#[pin_project] +struct Conn where - C: Future + Unpin, - D: Future + Unpin, + B: Body, { - match future::select(conn, drop_rx).await { - Either::Left(_) => { - // ok or err, the `conn` has finished + #[pin] + ponger: Ponger, + #[pin] + conn: Connection::Data>>, +} + +impl Conn +where + B: Body, + T: AsyncRead + AsyncWrite + Unpin, +{ + fn new(ponger: Ponger, conn: Connection::Data>>) -> Self { + Conn { ponger, conn } + } +} + +impl Future for Conn +where + B: Body, + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = Result<(), h2::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let mut this = self.project(); + match this.ponger.poll(cx) { + Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { + this.conn.set_target_window_size(wnd); + this.conn.set_initial_window_size(wnd)?; + } + Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { + debug!("connection keep-alive timed out"); + return Poll::Ready(Ok(())); + } + Poll::Pending => {} } - Either::Right(((), conn)) => { - // mpsc has been dropped, hopefully polling - // the connection some more should start shutdown - // and then close - trace!("send_request dropped, starting conn shutdown"); - drop(cancel_tx); - let _ = conn.await; + + Pin::new(&mut this.conn).poll(cx) + } +} + +#[pin_project] +struct ConnMapErr +where + B: Body, + T: AsyncRead + AsyncWrite + Unpin, +{ + #[pin] + conn: Either, Connection::Data>>>, +} + +impl Future for ConnMapErr +where + B: Body, + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = Result<(), ()>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + self.project() + .conn + .poll(cx) + .map_err(|e| debug!("connection error: {}", e)) + } +} + +#[pin_project] +pub struct ConnTask +where + B: Body, + T: AsyncRead + AsyncWrite + Unpin, +{ + #[pin] + select: Select, StreamFuture>>, + #[pin] + cancel_tx: Option>, + conn: Option>, +} + +impl ConnTask +where + B: Body, + T: AsyncRead + AsyncWrite + Unpin, +{ + fn new( + conn: ConnMapErr, + drop_rx: StreamFuture>, + cancel_tx: oneshot::Sender, + ) -> Self { + Self { + select: future::select(conn, drop_rx), + cancel_tx: Some(cancel_tx), + conn: None, + } + } +} + +impl Future for ConnTask +where + B: Body, + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let mut this = self.project(); + + if let Some(conn) = this.conn { + conn.poll_unpin(cx).map(|d| ()) + } else { + match ready!(this.select.poll_unpin(cx)) { + Either::Left((a, _)) => { + // ok or err, the `conn` has finished + return Poll::Ready(()); + } + Either::Right((_, b)) => { + // mpsc has been dropped, hopefully polling + // the connection some more should start shutdown + // and then close + trace!("send_request dropped, starting conn shutdown"); + drop(this.cancel_tx.take().expect("TODO: Error message")); + this.conn = &mut Some(b); + return Poll::Pending; + } + } + } + } +} + +#[pin_project(project = H2ClientFutureProject)] +pub enum H2ClientFuture +where + B: http_body::Body + 'static, + B::Error: Into>, + T: AsyncRead + AsyncWrite + Unpin, +{ + Pipe { + #[pin] + pipe: PipeMap, + }, + Send { + #[pin] + send_when: SendWhen, + }, + Task { + #[pin] + task: ConnTask, + }, +} + +impl Future for H2ClientFuture +where + B: http_body::Body + 'static, + B::Error: Into>, + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = (); + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut this = self.project(); + + match this { + H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx), + H2ClientFutureProject::Send { send_when } => send_when.poll(cx), + H2ClientFutureProject::Task { task } => task.poll(cx), } } } @@ -202,43 +352,89 @@ where impl Unpin for FutCtx {} -pub(crate) struct ClientTask +pub(crate) struct ClientTask where B: Body, + E: Unpin, { ping: ping::Recorder, conn_drop_ref: ConnDropRef, conn_eof: ConnEof, - executor: Exec, + executor: E, h2_tx: SendRequest>, req_rx: ClientRx, fut_ctx: Option>, + marker: PhantomData, } -impl ClientTask +impl ClientTask where B: Body + 'static, + E: ExecutorClient + Unpin, + ::Error: std::error::Error + Send + Sync + 'static, + T: AsyncRead + AsyncWrite + Unpin, { pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { self.h2_tx.is_extended_connect_protocol_enabled() } } -impl ClientTask +#[pin_project] +pub struct PipeMap where - B: Body + Send + 'static, + S: Body, +{ + #[pin] + pipe: PipeToSendStream, + #[pin] + conn_drop_ref: Option>, + #[pin] + ping: Option, +} + +impl Future for PipeMap +where + B: http_body::Body, + B::Error: Into>, +{ + type Output = (); + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut this = self.project(); + + match this.pipe.poll_unpin(cx) { + Poll::Ready(result) => { + if let Err(e) = result { + debug!("client request body error: {}", e); + } + drop(this.conn_drop_ref.take().expect("Call only one")); + drop(this.ping.take().expect("Call only one")); + return Poll::Ready(()); + } + Poll::Pending => (), + }; + Poll::Pending + } +} + +impl ClientTask +where + B: Body + 'static + Unpin, B::Data: Send, B::Error: Into>, + E: ExecutorClient + Unpin, + ::Error: std::error::Error + Send + Sync + 'static, + T: AsyncRead + AsyncWrite + Unpin, { fn poll_pipe(&mut self, f: FutCtx, cx: &mut task::Context<'_>) { let ping = self.ping.clone(); + let send_stream = if !f.is_connect { if !f.eos { - let mut pipe = Box::pin(PipeToSendStream::new(f.body, f.body_tx)).map(|res| { - if let Err(e) = res { - debug!("client request body error: {}", e); - } - }); + let mut pipe = PipeToSendStream::new(f.body, f.body_tx); // eagerly see if the body pipe is ready and // can thus skip allocating in the executor @@ -250,13 +446,15 @@ where // "open stream" alive while this body is // still sending... let ping = ping.clone(); - let pipe = pipe.map(move |x| { - drop(conn_drop_ref); - drop(ping); - x - }); + + let pipe = PipeMap { + pipe, + conn_drop_ref: Some(conn_drop_ref), + ping: Some(ping), + }; // Clear send task - self.executor.execute(pipe); + self.executor + .execute_h2_future(H2ClientFuture::Pipe { pipe: pipe }); } } } @@ -266,7 +464,47 @@ where Some(f.body_tx) }; - let fut = f.fut.map(move |result| match result { + self.executor.execute_h2_future(H2ClientFuture::Send { + send_when: SendWhen { + when: ResponseFutMap { + fut: f.fut, + ping: Some(ping), + send_stream: Some(send_stream), + }, + call_back: Some(f.cb), + }, + }); + } +} + +#[pin_project] +pub(crate) struct ResponseFutMap +where + B: Body + 'static, +{ + #[pin] + fut: ResponseFuture, + #[pin] + ping: Option, + #[pin] + send_stream: Option::Data>>>>, +} + +impl Future for ResponseFutMap +where + B: Body + 'static, +{ + type Output = Result, (crate::Error, Option>)>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let mut this = self.project(); + + let result = ready!(this.fut.poll(cx)); + + let ping = this.ping.take().expect("Todo: Error message"); + let send_stream = this.send_stream.take().expect("Todo: Error message"); + + match result { Ok(res) => { // record that we got the response headers ping.record_non_data(); @@ -277,17 +515,17 @@ where warn!("h2 connect response with non-zero body not supported"); send_stream.send_reset(h2::Reason::INTERNAL_ERROR); - return Err(( + return Poll::Ready(Err(( crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), - None, - )); + None::>, + ))); } let (parts, recv_stream) = res.into_parts(); let mut res = Response::from_parts(parts, IncomingBody::empty()); let (pending, on_upgrade) = crate::upgrade::pending(); let io = H2Upgraded { - ping, + ping: ping, send_stream: unsafe { UpgradedSendStream::new(send_stream) }, recv_stream, buf: Bytes::new(), @@ -297,31 +535,33 @@ where pending.fulfill(upgraded); res.extensions_mut().insert(on_upgrade); - Ok(res) + Poll::Ready(Ok(res)) } else { let res = res.map(|stream| { let ping = ping.for_stream(&stream); IncomingBody::h2(stream, content_length.into(), ping) }); - Ok(res) + Poll::Ready(Ok(res)) } } Err(err) => { ping.ensure_not_timed_out().map_err(|e| (e, None))?; debug!("client response error: {}", err); - Err((crate::Error::new_h2(err), None)) + Poll::Ready(Err((crate::Error::new_h2(err), None::>))) } - }); - self.executor.execute(f.cb.send_when(fut)); + } } } -impl Future for ClientTask +impl Future for ClientTask where - B: Body + Send + 'static, + B: Body + 'static + Unpin, B::Data: Send, B::Error: Into>, + E: ExecutorClient + 'static + Send + Sync + Unpin, + ::Error: std::error::Error + Send + Sync + 'static, + T: AsyncRead + AsyncWrite + Unpin, { type Output = crate::Result; diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index a1cbd25813..753e57f769 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -85,7 +85,7 @@ fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { // body adapters used by both Client and Server pin_project! { - struct PipeToSendStream + pub(crate) struct PipeToSendStream where S: Body, { diff --git a/tests/client.rs b/tests/client.rs index 739f223b16..3aa9339bbb 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -2067,60 +2067,60 @@ mod conn { ); } - #[tokio::test] - async fn http2_keep_alive_with_responsive_server() { - // Test that a responsive server works just when client keep - // alive is enabled - use hyper::service::service_fn; - - let (listener, addr) = setup_tk_test_server().await; - - // Spawn an HTTP2 server that reads the whole body and responds - tokio::spawn(async move { - let sock = listener.accept().await.unwrap().0; - hyper::server::conn::http2::Builder::new(TokioExecutor) - .timer(TokioTimer) - .serve_connection( - sock, - service_fn(|req| async move { - tokio::spawn(async move { - let _ = concat(req.into_body()) - .await - .expect("server req body aggregate"); - }); - Ok::<_, hyper::Error>(http::Response::new(Empty::::new())) - }), - ) - .await - .expect("serve_connection"); - }); - - let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) - .timer(TokioTimer) - .keep_alive_interval(Duration::from_secs(1)) - .keep_alive_timeout(Duration::from_secs(1)) - .handshake(io) - .await - .expect("http handshake"); - - tokio::spawn(async move { - conn.await.expect("client conn shouldn't error"); - }); - - // Use a channel to keep request stream open - let (_tx, recv) = mpsc::channel::, Box>>(0); - let req = http::Request::new(StreamBody::new(recv)); - - let _resp = client.send_request(req).await.expect("send_request"); - - // sleep longer than keepalive would trigger - TokioTimer.sleep(Duration::from_secs(4)).await; - - future::poll_fn(|ctx| client.poll_ready(ctx)) - .await - .expect("client should be open"); - } + // #[tokio::test] + // async fn http2_keep_alive_with_responsive_server() { + // // Test that a responsive server works just when client keep + // // alive is enabled + // use hyper::service::service_fn; + + // let (listener, addr) = setup_tk_test_server().await; + + // // Spawn an HTTP2 server that reads the whole body and responds + // tokio::spawn(async move { + // let sock = listener.accept().await.unwrap().0; + // hyper::server::conn::http2::Builder::new(TokioExecutor) + // .timer(TokioTimer) + // .serve_connection( + // sock, + // service_fn(|req| async move { + // tokio::spawn(async move { + // let _ = concat(req.into_body()) + // .await + // .expect("server req body aggregate"); + // }); + // Ok::<_, hyper::Error>(http::Response::new(Empty::::new())) + // }), + // ) + // .await + // .expect("serve_connection"); + // }); + + // let io = tcp_connect(&addr).await.expect("tcp connect"); + // let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) + // .timer(TokioTimer) + // .keep_alive_interval(Duration::from_secs(1)) + // .keep_alive_timeout(Duration::from_secs(1)) + // .handshake(io) + // .await + // .expect("http handshake"); + + // tokio::spawn(async move { + // conn.await.expect("client conn shouldn't error"); + // }); + + // // Use a channel to keep request stream open + // let (_tx, recv) = mpsc::channel::, Box>>(0); + // let req = http::Request::new(StreamBody::new(recv)); + + // let _resp = client.send_request(req).await.expect("send_request"); + + // // sleep longer than keepalive would trigger + // TokioTimer.sleep(Duration::from_secs(4)).await; + + // future::poll_fn(|ctx| client.poll_ready(ctx)) + // .await + // .expect("client should be open"); + // } #[tokio::test] async fn h2_connect() { From 55753a91f92cf158504cbd0f40255f553ede6456 Mon Sep 17 00:00:00 2001 From: ruben Date: Tue, 6 Jun 2023 21:06:26 +0200 Subject: [PATCH 03/21] remove exec and fix warnings --- src/client/conn/http2.rs | 21 +++++--------- src/client/dispatch.rs | 34 +--------------------- src/common/exec.rs | 63 +--------------------------------------- src/common/mod.rs | 2 +- src/proto/h2/client.rs | 12 ++++---- src/rt/bounds.rs | 55 +++++++++++++++++++++++------------ 6 files changed, 54 insertions(+), 133 deletions(-) diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index dc227accbf..147b70ce2b 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -11,14 +11,11 @@ use tokio::io::{AsyncRead, AsyncWrite}; use super::super::dispatch; use crate::body::{Body, Incoming as IncomingBody}; -use crate::common::exec::ExecutorClient; use crate::common::time::Time; -use crate::common::{ - exec::{BoxSendFuture, Exec}, - task, Future, Pin, Poll, -}; +use crate::common::{task, Future, Pin, Poll}; use crate::proto; -use crate::rt::{Executor, Timer}; +use crate::rt::bounds::ExecutorClient; +use crate::rt::Timer; /// The sender side of an established connection. pub struct SendRequest { @@ -54,7 +51,7 @@ where #[derive(Clone, Debug)] pub struct Builder where - Ex: Executor + Send + Sync + 'static + Clone, + Ex: Clone, { pub(super) exec: Ex, pub(super) timer: Time, @@ -70,13 +67,11 @@ pub async fn handshake( io: T, ) -> crate::Result<(SendRequest, Connection)> where - E: Executor + Send + Sync + 'static, T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, - B::Error: Into>, + B::Error: std::error::Error + Send + Sync + 'static, E: ExecutorClient + Unpin + Clone, - ::Error: std::error::Error + Send + Sync + 'static, { Builder::new(exec).handshake(io).await } @@ -224,7 +219,7 @@ where impl fmt::Debug for Connection where - T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static+ Unpin, + T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static + Unpin, B: Body + 'static, E: ExecutorClient + Unpin, ::Error: std::error::Error + Send + Sync + 'static, @@ -259,7 +254,7 @@ where impl Builder where - Ex: Executor + Send + Sync + 'static + Clone, + Ex: Clone, { /// Creates a new connection builder. #[inline] @@ -406,7 +401,7 @@ where pub fn handshake( &self, io: T, - ) -> impl Future, Connection)>> + '_ + ) -> impl Future, Connection)>> + where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 2f50dd7937..2353217d32 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -274,38 +274,6 @@ impl Callback { } } } - - #[cfg(feature = "http2")] - pub(crate) async fn send_when( - self, - mut when: impl Future)>> + Unpin, - ) { - use futures_util::future; - use tracing::trace; - - let mut cb: Option> = Some(self); - - // "select" on this callback being canceled, and the future completing - future::poll_fn(move |cx| { - match Pin::new(&mut when).poll(cx) { - Poll::Ready(Ok(res)) => { - cb.take().expect("polled after complete").send(Ok(res)); - Poll::Ready(()) - } - Poll::Pending => { - // check if the callback is canceled - ready!(cb.as_mut().unwrap().poll_canceled(cx)); - trace!("send_when canceled"); - Poll::Ready(()) - } - Poll::Ready(Err(err)) => { - cb.take().expect("polled after complete").send(Err(err)); - Poll::Ready(()) - } - } - }) - .await - } } #[pin_project] @@ -337,7 +305,7 @@ where } Poll::Pending => { // check if the callback is canceled - match (call_back.poll_canceled(cx)) { + match call_back.poll_canceled(cx) { Poll::Ready(v) => v, Poll::Pending => { // Move call_back back to struct before return diff --git a/src/common/exec.rs b/src/common/exec.rs index c9c28ca5b3..69d19e9bb7 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -1,75 +1,14 @@ -use std::fmt; use std::future::Future; use std::pin::Pin; -use std::sync::Arc; - -use tokio::io::{AsyncRead, AsyncWrite}; - -use crate::proto::h2::client::H2ClientFuture; -use crate::rt::Executor; - -pub(crate) type BoxSendFuture = Pin + Send>>; - -// Executor must be provided by the user -#[derive(Clone)] -pub(crate) struct Exec(Arc + Send + Sync>); - -// ===== impl Exec ===== - -impl Exec { - pub(crate) fn new(exec: E) -> Self - where - E: Executor + Send + Sync + 'static, - { - Self(Arc::new(exec)) - } - - pub(crate) fn execute(&self, fut: F) - where - F: Future + Send + 'static, - { - self.0.execute(Box::pin(fut)) - } -} - -impl fmt::Debug for Exec { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Exec").finish() - } -} - -pub trait ExecutorClient -where - B: http_body::Body, - B::Error: std::error::Error + Send + Sync + 'static, - T: AsyncRead + AsyncWrite + Unpin, -{ - fn execute_h2_future(&mut self, future: H2ClientFuture); -} - -impl ExecutorClient for E -where - E: Executor>, - B: http_body::Body + 'static, - B::Error: std::error::Error + Send + Sync + 'static, - H2ClientFuture: Future, - T: AsyncRead + AsyncWrite + Unpin, -{ - fn execute_h2_future(&mut self, future: H2ClientFuture) { - self.execute(future) - } -} // If http2 is not enable, we just have a stub here, so that the trait bounds // that *would* have been needed are still checked. Why? // // Because enabling `http2` shouldn't suddenly add new trait bounds that cause // a compilation error. -#[cfg(not(feature = "http2"))] -#[allow(missing_debug_implementations)] + pub struct H2Stream(std::marker::PhantomData<(F, B)>); -#[cfg(not(feature = "http2"))] impl Future for H2Stream where F: Future, E>>, diff --git a/src/common/mod.rs b/src/common/mod.rs index 67b2bbde59..2392851951 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -10,7 +10,7 @@ macro_rules! ready { pub(crate) mod buf; #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] pub(crate) mod date; -#[cfg(any(feature = "http1", feature = "http2", feature = "server"))] +#[cfg(not(feature = "http2"))] pub(crate) mod exec; pub(crate) mod io; mod never; diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 1a1a20cbf5..9d85821687 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -6,7 +6,7 @@ use std::time::Duration; use bytes::Bytes; use futures_channel::mpsc::{Receiver, Sender}; use futures_channel::{mpsc, oneshot}; -use futures_util::future::{self, Either, FutureExt as _, Select, TryFutureExt as _}; +use futures_util::future::{self, Either, FutureExt as _, Select}; use futures_util::stream::{StreamExt as _, StreamFuture}; use h2::client::{Builder, Connection, SendRequest}; use h2::SendStream; @@ -19,13 +19,13 @@ use super::ping::{Ponger, Recorder}; use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; use crate::body::{Body, Incoming as IncomingBody}; use crate::client::dispatch::{Callback, SendWhen}; -use crate::common::exec::ExecutorClient; use crate::common::time::Time; -use crate::common::{exec::Exec, task, Future, Never, Pin, Poll}; +use crate::common::{task, Future, Never, Pin, Poll}; use crate::ext::Protocol; use crate::headers; use crate::proto::h2::UpgradedSendStream; use crate::proto::Dispatched; +use crate::rt::bounds::ExecutorClient; use crate::upgrade::Upgraded; use crate::{Request, Response}; use h2::client::ResponseFuture; @@ -274,10 +274,10 @@ where let mut this = self.project(); if let Some(conn) = this.conn { - conn.poll_unpin(cx).map(|d| ()) + conn.poll_unpin(cx).map(|_| ()) } else { match ready!(this.select.poll_unpin(cx)) { - Either::Left((a, _)) => { + Either::Left((_, _)) => { // ok or err, the `conn` has finished return Poll::Ready(()); } @@ -328,7 +328,7 @@ where self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { - let mut this = self.project(); + let this = self.project(); match this { H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx), diff --git a/src/rt/bounds.rs b/src/rt/bounds.rs index 69115ef2ca..a8cc5232b3 100644 --- a/src/rt/bounds.rs +++ b/src/rt/bounds.rs @@ -6,10 +6,46 @@ #[cfg(all(feature = "server", feature = "http2"))] pub use self::h2::Http2ConnExec; +#[cfg(all(feature = "client", feature = "http2"))] +pub use self::h2_client::ExecutorClient; + +#[cfg(all(feature = "client", feature = "http2"))] +#[cfg_attr(docsrs, doc(cfg(all(feature = "server", feature = "http2"))))] +mod h2_client { + use futures_core::Future; + use tokio::io::{AsyncRead, AsyncWrite}; + + use crate::{rt::Executor, proto::h2::client::H2ClientFuture}; + + /// TODO + pub trait ExecutorClient + where + B: http_body::Body, + B::Error: std::error::Error + Send + Sync + 'static, + T: AsyncRead + AsyncWrite + Unpin, + { + #[doc(hidden)] + fn execute_h2_future(&mut self, future: H2ClientFuture); + } + + impl ExecutorClient for E + where + E: Executor>, + B: http_body::Body + 'static, + B::Error: std::error::Error + Send + Sync + 'static, + H2ClientFuture: Future, + T: AsyncRead + AsyncWrite + Unpin, + { + fn execute_h2_future(&mut self, future: H2ClientFuture) { + self.execute(future) + } + } +} + #[cfg(all(feature = "server", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(all(feature = "server", feature = "http2"))))] mod h2 { - use crate::{common::exec::Exec, proto::h2::server::H2Stream, rt::Executor}; + use crate::{proto::h2::server::H2Stream, rt::Executor}; use http_body::Body; use std::future::Future; @@ -26,23 +62,6 @@ mod h2 { fn execute_h2stream(&mut self, fut: H2Stream); } - impl Http2ConnExec for Exec - where - H2Stream: Future + Send + 'static, - B: Body, - { - fn execute_h2stream(&mut self, fut: H2Stream) { - self.execute(fut) - } - } - - impl sealed::Sealed<(F, B)> for Exec - where - H2Stream: Future + Send + 'static, - B: Body, - { - } - #[doc(hidden)] impl Http2ConnExec for E where From ad57f1c5ece83db48db868a1b307c8d3c23e4cfe Mon Sep 17 00:00:00 2001 From: ruben Date: Tue, 6 Jun 2023 21:10:22 +0200 Subject: [PATCH 04/21] remove bound --- src/client/conn/http2.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 147b70ce2b..01753367d9 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -50,8 +50,7 @@ where /// After setting options, the builder is used to create a handshake future. #[derive(Clone, Debug)] pub struct Builder -where - Ex: Clone, + { pub(super) exec: Ex, pub(super) timer: Time, From 4132aa6ce6a727010b9d72ec3758bb6a6997179f Mon Sep 17 00:00:00 2001 From: ruben Date: Tue, 6 Jun 2023 21:10:36 +0200 Subject: [PATCH 05/21] fmt --- src/rt/bounds.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rt/bounds.rs b/src/rt/bounds.rs index a8cc5232b3..c6de59d6e1 100644 --- a/src/rt/bounds.rs +++ b/src/rt/bounds.rs @@ -15,7 +15,7 @@ mod h2_client { use futures_core::Future; use tokio::io::{AsyncRead, AsyncWrite}; - use crate::{rt::Executor, proto::h2::client::H2ClientFuture}; + use crate::{proto::h2::client::H2ClientFuture, rt::Executor}; /// TODO pub trait ExecutorClient From 4138669ef43ffc9fe0e41a4ab32eafa38e762994 Mon Sep 17 00:00:00 2001 From: ruben Date: Tue, 6 Jun 2023 21:18:46 +0200 Subject: [PATCH 06/21] use right future --- src/rt/bounds.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rt/bounds.rs b/src/rt/bounds.rs index c6de59d6e1..be072623ed 100644 --- a/src/rt/bounds.rs +++ b/src/rt/bounds.rs @@ -12,7 +12,7 @@ pub use self::h2_client::ExecutorClient; #[cfg(all(feature = "client", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(all(feature = "server", feature = "http2"))))] mod h2_client { - use futures_core::Future; + use std::future::Future; use tokio::io::{AsyncRead, AsyncWrite}; use crate::{proto::h2::client::H2ClientFuture, rt::Executor}; From ce4619e78a9edb2eb50fab6d5ef5cd8faf1ee273 Mon Sep 17 00:00:00 2001 From: ruben Date: Tue, 6 Jun 2023 21:47:18 +0200 Subject: [PATCH 07/21] fix features ci --- src/client/dispatch.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 2353217d32..33d7af66d3 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -7,13 +7,12 @@ use pin_project::pin_project; use tokio::sync::{mpsc, oneshot}; use tracing::trace; -#[cfg(feature = "http2")] -use crate::common::Pin; use crate::{ body::Incoming, common::{task, Poll}, - proto::h2::client::ResponseFutMap, }; +#[cfg(feature = "http2")] +use crate::{common::Pin, proto::h2::client::ResponseFutMap}; #[cfg(test)] pub(crate) type RetryPromise = oneshot::Receiver)>>; @@ -276,6 +275,7 @@ impl Callback { } } +#[cfg(feature = "http2")] #[pin_project] pub struct SendWhen where @@ -287,6 +287,7 @@ where pub(crate) call_back: Option, Response>>, } +#[cfg(feature = "http2")] impl Future for SendWhen where B: Body + 'static, From 4ac835ac04c21eb1caf07776fc36decc0513467b Mon Sep 17 00:00:00 2001 From: ruben Date: Thu, 8 Jun 2023 12:39:57 +0200 Subject: [PATCH 08/21] fix ffi --- src/ffi/task.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ffi/task.rs b/src/ffi/task.rs index ef54fe408f..a973a7bab3 100644 --- a/src/ffi/task.rs +++ b/src/ffi/task.rs @@ -177,8 +177,12 @@ impl WeakExec { } } -impl crate::rt::Executor> for WeakExec { - fn execute(&self, fut: BoxFuture<()>) { +impl crate::rt::Executor for WeakExec +where + F: Future + Send + 'static, + F::Output: Send + Sync + AsTaskType, +{ + fn execute(&self, fut: F) { if let Some(exec) = self.0.upgrade() { exec.spawn(hyper_task::boxed(fut)); } From ed95666dfdb2c7bf82f8e197e9ef99a697c0bed7 Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 9 Jun 2023 00:23:53 +0200 Subject: [PATCH 09/21] move client example to single_threaded example --- examples/client_http2_single_threaded.rs | 132 ----------------------- examples/single_threaded.rs | 88 ++++++++++++--- 2 files changed, 76 insertions(+), 144 deletions(-) delete mode 100644 examples/client_http2_single_threaded.rs diff --git a/examples/client_http2_single_threaded.rs b/examples/client_http2_single_threaded.rs deleted file mode 100644 index 6a1e7aa83b..0000000000 --- a/examples/client_http2_single_threaded.rs +++ /dev/null @@ -1,132 +0,0 @@ -#![deny(warnings)] -#![warn(rust_2018_idioms)] -use std::env; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use bytes::Bytes; -use http_body_util::BodyExt; -use hyper::body::{Body as HttpBody, Frame}; -use hyper::Error; -use hyper::Request; -use tokio::io::{self, AsyncWriteExt as _}; -use tokio::net::TcpStream; - -struct Body { - // Our Body type is !Send and !Sync: - _marker: PhantomData<*const ()>, - data: Option, -} - -impl From for Body { - fn from(a: String) -> Self { - Body { - _marker: PhantomData, - data: Some(a.into()), - } - } -} - -impl HttpBody for Body { - type Data = Bytes; - type Error = Error; - - fn poll_frame( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll, Self::Error>>> { - Poll::Ready(self.get_mut().data.take().map(|d| Ok(Frame::data(d)))) - } -} - -fn main() -> Result<(), Box> { - pretty_env_logger::init(); - - // Configure a runtime that runs everything on the current thread - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("build runtime"); - - // Combine it with a `LocalSet, which means it can spawn !Send futures... - let local = tokio::task::LocalSet::new(); - local.block_on(&rt, init()) -} - -async fn init() -> Result<(), Box> { - // Some simple CLI args requirements... - let url = match env::args().nth(1) { - Some(url) => url, - None => { - println!("Usage: client "); - return Ok(()); - } - }; - - // HTTPS requires picking a TLS implementation, so give a better - // warning if the user tries to request an 'https' URL. - let url = url.parse::().unwrap(); - if url.scheme_str() != Some("http") { - println!("This example only works with 'http' URLs."); - return Ok(()); - } - - fetch_url(url).await -} - -async fn fetch_url(url: hyper::Uri) -> Result<(), Box> { - let host = url.host().expect("uri has no host"); - let port = url.port_u16().unwrap_or(80); - let addr = format!("{}:{}", host, port); - let stream = TcpStream::connect(addr).await?; - - let (mut sender, conn) = hyper::client::conn::http2::handshake(LocalExec, stream).await?; - tokio::task::spawn_local(async move { - if let Err(err) = conn.await { - println!("Connection failed: {:?}", err); - } - }); - - let authority = url.authority().unwrap().clone(); - - let req = Request::builder() - .uri(url) - .header(hyper::header::HOST, authority.as_str()) - .body(Body::from("test".to_string()))?; - - let mut res = sender.send_request(req).await?; - - println!("Response: {}", res.status()); - println!("Headers: {:#?}\n", res.headers()); - - // Stream the body, writing each chunk to stdout as we get it - // (instead of buffering and printing at the end). - while let Some(next) = res.frame().await { - let frame = next?; - if let Some(chunk) = frame.data_ref() { - io::stdout().write_all(&chunk).await?; - } - } - - println!("\n\nDone!"); - - Ok(()) -} - -// NOTE: This part is only needed for HTTP/2. HTTP/1 doesn't need an executor. -// -// Since the Server needs to spawn some background tasks, we needed -// to configure an Executor that can spawn !Send futures... -#[derive(Clone, Copy, Debug)] -struct LocalExec; - -impl hyper::rt::Executor for LocalExec -where - F: std::future::Future + 'static, // not requiring `Send` -{ - fn execute(&self, fut: F) { - // This will spawn into the currently running `LocalSet`. - tokio::task::spawn_local(fut); - } -} diff --git a/examples/single_threaded.rs b/examples/single_threaded.rs index ee109d54fa..12c007e363 100644 --- a/examples/single_threaded.rs +++ b/examples/single_threaded.rs @@ -1,5 +1,6 @@ #![deny(warnings)] +use http_body_util::BodyExt; use hyper::server::conn::http2; use std::cell::Cell; use std::net::SocketAddr; @@ -8,10 +9,13 @@ use tokio::net::TcpListener; use hyper::body::{Body as HttpBody, Bytes, Frame}; use hyper::service::service_fn; +use hyper::Request; use hyper::{Error, Response}; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; +use std::thread; +use tokio::net::TcpStream; struct Body { // Our Body type is !Send and !Sync: @@ -40,23 +44,44 @@ impl HttpBody for Body { } } -fn main() -> Result<(), Box> { +fn main() { pretty_env_logger::init(); - // Configure a runtime that runs everything on the current thread - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("build runtime"); - - // Combine it with a `LocalSet, which means it can spawn !Send futures... - let local = tokio::task::LocalSet::new(); - local.block_on(&rt, run()) + let server = thread::spawn(move || { + // Configure a runtime for the server that runs everything on the current thread + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("build runtime"); + + // Combine it with a `LocalSet, which means it can spawn !Send futures... + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, server()).unwrap(); + }); + + let client = thread::spawn(move || { + // Configure a runtime for the client that runs everything on the current thread + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("build runtime"); + + // Combine it with a `LocalSet, which means it can spawn !Send futures... + let local = tokio::task::LocalSet::new(); + local + .block_on( + &rt, + client("http://localhost:3000".parse::().unwrap()), + ) + .unwrap(); + }); + + server.join().unwrap(); + client.join().unwrap(); } -async fn run() -> Result<(), Box> { +async fn server() -> Result<(), Box> { let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); - // Using a !Send request counter is fine on 1 thread... let counter = Rc::new(Cell::new(0)); @@ -86,6 +111,45 @@ async fn run() -> Result<(), Box> { } } +async fn client(url: hyper::Uri) -> Result<(), Box> { + let host = url.host().expect("uri has no host"); + let port = url.port_u16().unwrap_or(80); + let addr = format!("{}:{}", host, port); + let stream = TcpStream::connect(addr).await?; + + let (mut sender, conn) = hyper::client::conn::http2::handshake(LocalExec, stream).await?; + + tokio::task::spawn_local(async move { + if let Err(err) = conn.await { + println!("Connection failed: {:?}", err); + } + }); + + let authority = url.authority().unwrap().clone(); + + // Make 4 requests + for _ in 0..4 { + let req = Request::builder() + .uri(url.clone()) + .header(hyper::header::HOST, authority.as_str()) + .body(Body::from("test".to_string()))?; + + let mut res = sender.send_request(req).await?; + + println!("Response: {}", res.status()); + println!("Headers: {:#?}\n", res.headers()); + + // Print the response body + while let Some(next) = res.frame().await { + let frame = next?; + if let Some(chunk) = frame.data_ref() { + println!("data {:#?}", chunk) + } + } + } + Ok(()) +} + // NOTE: This part is only needed for HTTP/2. HTTP/1 doesn't need an executor. // // Since the Server needs to spawn some background tasks, we needed From c4a51b3abc8b95b0747d38a7a5885513923ee93f Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 9 Jun 2023 00:27:45 +0200 Subject: [PATCH 10/21] remove from cargo toml --- Cargo.toml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 81163a3fc8..637107bee3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,11 +111,6 @@ name = "client" path = "examples/client.rs" required-features = ["full"] -[[example]] -name = "client_single_thread" -path = "examples/client_http2_single_threaded.rs" -required-features = ["full"] - [[example]] name = "client_json" path = "examples/client_json.rs" From 7cbfc72dde3992e447a46762dc056ee96d5bb8d0 Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 9 Jun 2023 11:37:18 +0200 Subject: [PATCH 11/21] use pin_project_lite --- Cargo.toml | 1 - src/client/dispatch.rs | 22 +++--- src/proto/h2/client.rs | 155 ++++++++++++++++++++++------------------- 3 files changed, 97 insertions(+), 81 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 637107bee3..7665e97e88 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,6 @@ h2 = { version = "0.3.9", optional = true } itoa = "1" tracing = { version = "0.1", default-features = false, features = ["std"] } pin-project-lite = "0.2.4" -pin-project = "1.0.12" tokio = { version = "1", features = ["sync"] } want = "0.3" diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 33d7af66d3..40cb554917 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -3,7 +3,7 @@ use std::future::Future; use http::{Request, Response}; use http_body::Body; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::sync::{mpsc, oneshot}; use tracing::trace; @@ -276,15 +276,17 @@ impl Callback { } #[cfg(feature = "http2")] -#[pin_project] -pub struct SendWhen -where - B: Body + 'static, -{ - #[pin] - pub(crate) when: ResponseFutMap, - #[pin] - pub(crate) call_back: Option, Response>>, +pin_project! { + pub struct SendWhen + where + B: Body, + B: 'static, + { + #[pin] + pub(crate) when: ResponseFutMap, + #[pin] + pub(crate) call_back: Option, Response>>, + } } #[cfg(feature = "http2")] diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 9d85821687..c93629bf9a 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -11,7 +11,7 @@ use futures_util::stream::{StreamExt as _, StreamFuture}; use h2::client::{Builder, Connection, SendRequest}; use h2::SendStream; use http::{Method, StatusCode}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, trace, warn}; @@ -161,15 +161,16 @@ where }) } -#[pin_project] -struct Conn -where - B: Body, -{ - #[pin] - ponger: Ponger, - #[pin] - conn: Connection::Data>>, +pin_project! { + struct Conn + where + B: Body, + { + #[pin] + ponger: Ponger, + #[pin] + conn: Connection::Data>>, + } } impl Conn @@ -207,14 +208,17 @@ where } } -#[pin_project] -struct ConnMapErr -where - B: Body, - T: AsyncRead + AsyncWrite + Unpin, -{ - #[pin] - conn: Either, Connection::Data>>>, +pin_project! { + struct ConnMapErr + where + B: Body, + T: AsyncRead, + T: AsyncWrite, + T: Unpin, + { + #[pin] + conn: Either, Connection::Data>>>, + } } impl Future for ConnMapErr @@ -232,17 +236,20 @@ where } } -#[pin_project] -pub struct ConnTask -where - B: Body, - T: AsyncRead + AsyncWrite + Unpin, -{ - #[pin] - select: Select, StreamFuture>>, - #[pin] - cancel_tx: Option>, - conn: Option>, +pin_project! { + pub struct ConnTask + where + B: Body, + T: AsyncRead, + T: AsyncWrite, + T: Unpin, + { + #[pin] + select: Select, StreamFuture>>, + #[pin] + cancel_tx: Option>, + conn: Option>, + } } impl ConnTask @@ -295,25 +302,30 @@ where } } -#[pin_project(project = H2ClientFutureProject)] -pub enum H2ClientFuture -where - B: http_body::Body + 'static, - B::Error: Into>, - T: AsyncRead + AsyncWrite + Unpin, -{ - Pipe { - #[pin] - pipe: PipeMap, - }, - Send { - #[pin] - send_when: SendWhen, - }, - Task { - #[pin] - task: ConnTask, - }, +pin_project! { + #[project = H2ClientFutureProject] + pub enum H2ClientFuture + where + B: http_body::Body, + B: 'static, + B::Error: Into>, + T: AsyncRead, + T: AsyncWrite, + T: Unpin, + { + Pipe { + #[pin] + pipe: PipeMap, + }, + Send { + #[pin] + send_when: SendWhen, + }, + Task { + #[pin] + task: ConnTask, + }, + } } impl Future for H2ClientFuture @@ -379,17 +391,18 @@ where } } -#[pin_project] -pub struct PipeMap -where - S: Body, -{ - #[pin] - pipe: PipeToSendStream, - #[pin] - conn_drop_ref: Option>, - #[pin] - ping: Option, +pin_project! { + pub struct PipeMap + where + S: Body, + { + #[pin] + pipe: PipeToSendStream, + #[pin] + conn_drop_ref: Option>, + #[pin] + ping: Option, + } } impl Future for PipeMap @@ -477,17 +490,19 @@ where } } -#[pin_project] -pub(crate) struct ResponseFutMap -where - B: Body + 'static, -{ - #[pin] - fut: ResponseFuture, - #[pin] - ping: Option, - #[pin] - send_stream: Option::Data>>>>, +pin_project! { + pub(crate) struct ResponseFutMap + where + B: Body, + B: 'static, + { + #[pin] + fut: ResponseFuture, + #[pin] + ping: Option, + #[pin] + send_stream: Option::Data>>>>, + } } impl Future for ResponseFutMap From d3821a9900321bd5c45ef111e2ac6d11a9ca4c1d Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 9 Jun 2023 11:37:45 +0200 Subject: [PATCH 12/21] deny warnings --- src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e900cf273c..054beb225a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,8 @@ #![deny(missing_docs)] -//#![deny(missing_debug_implementations)] +#![deny(missing_debug_implementations)] #![cfg_attr(test, deny(rust_2018_idioms))] -//#![cfg_attr(all(test, feature = "full"), deny(unreachable_pub))] -//#![cfg_attr(all(test, feature = "full"), deny(warnings))] +#![cfg_attr(all(test, feature = "full"), deny(unreachable_pub))] +#![cfg_attr(all(test, feature = "full"), deny(warnings))] #![cfg_attr(all(test, feature = "nightly"), feature(test))] #![cfg_attr(docsrs, feature(doc_cfg))] From 23949491a2fc92314f39083aa81cd1f8d020338e Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 9 Jun 2023 13:03:34 +0200 Subject: [PATCH 13/21] error bounds --- src/client/conn/http2.rs | 13 +++++-------- src/proto/h2/client.rs | 11 ++++------- src/rt/bounds.rs | 4 ++-- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 01753367d9..c75fbd4d5e 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -1,6 +1,6 @@ //! HTTP/2 client connections -use std::error::Error as StdError; +use std::error::Error; use std::fmt; use std::marker::PhantomData; use std::sync::Arc; @@ -40,7 +40,7 @@ where T: AsyncRead + AsyncWrite + Send + 'static + Unpin, B: Body + 'static, E: ExecutorClient + Unpin, - ::Error: std::error::Error + Send + Sync + 'static, + B::Error: Into>, { inner: (PhantomData, proto::h2::ClientTask), } @@ -198,9 +198,8 @@ where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + Unpin + Send + 'static, B::Data: Send, - B::Error: Into>, + B::Error: Into>, E: ExecutorClient + Unpin, - ::Error: std::error::Error + Send + Sync + 'static, { /// Returns whether the [extended CONNECT protocol][1] is enabled or not. /// @@ -234,9 +233,8 @@ where B: Body + 'static + Unpin, B::Data: Send, E: Unpin, - B::Error: Into>, + B::Error: Into>, E: ExecutorClient + 'static + Send + Sync + Unpin, - ::Error: std::error::Error + Send + Sync + 'static, { type Output = crate::Result<()>; @@ -405,9 +403,8 @@ where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, - B::Error: Into>, + B::Error: Into>, Ex: ExecutorClient + Unpin, - ::Error: std::error::Error + Send + Sync + 'static, { let opts = self.clone(); diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index c93629bf9a..c146238e60 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -1,4 +1,3 @@ -use std::error::Error as StdError; use std::marker::PhantomData; use std::time::Duration; @@ -116,7 +115,7 @@ where B: Body + 'static, B::Data: Send + 'static, E: ExecutorClient + Unpin, - ::Error: std::error::Error + Send + Sync + 'static, + B::Error: Into>, { let (h2_tx, mut conn) = new_builder(config) .handshake::<_, SendBuf>(io) @@ -383,7 +382,7 @@ impl ClientTask where B: Body + 'static, E: ExecutorClient + Unpin, - ::Error: std::error::Error + Send + Sync + 'static, + B::Error: Into>, T: AsyncRead + AsyncWrite + Unpin, { pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { @@ -437,9 +436,8 @@ impl ClientTask where B: Body + 'static + Unpin, B::Data: Send, - B::Error: Into>, E: ExecutorClient + Unpin, - ::Error: std::error::Error + Send + Sync + 'static, + B::Error: Into>, T: AsyncRead + AsyncWrite + Unpin, { fn poll_pipe(&mut self, f: FutCtx, cx: &mut task::Context<'_>) { @@ -573,9 +571,8 @@ impl Future for ClientTask where B: Body + 'static + Unpin, B::Data: Send, - B::Error: Into>, + B::Error: Into>, E: ExecutorClient + 'static + Send + Sync + Unpin, - ::Error: std::error::Error + Send + Sync + 'static, T: AsyncRead + AsyncWrite + Unpin, { type Output = crate::Result; diff --git a/src/rt/bounds.rs b/src/rt/bounds.rs index be072623ed..0597214efc 100644 --- a/src/rt/bounds.rs +++ b/src/rt/bounds.rs @@ -12,7 +12,7 @@ pub use self::h2_client::ExecutorClient; #[cfg(all(feature = "client", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(all(feature = "server", feature = "http2"))))] mod h2_client { - use std::future::Future; + use std::{error::Error, future::Future}; use tokio::io::{AsyncRead, AsyncWrite}; use crate::{proto::h2::client::H2ClientFuture, rt::Executor}; @@ -21,7 +21,7 @@ mod h2_client { pub trait ExecutorClient where B: http_body::Body, - B::Error: std::error::Error + Send + Sync + 'static, + B::Error: Into>, T: AsyncRead + AsyncWrite + Unpin, { #[doc(hidden)] From 8e0f907634ac288bec7add75a7193c1e6064c7ad Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 9 Jun 2023 13:27:57 +0200 Subject: [PATCH 14/21] fix test --- tests/client.rs | 120 ++++++++++++++++++++++++++---------------------- 1 file changed, 66 insertions(+), 54 deletions(-) diff --git a/tests/client.rs b/tests/client.rs index e565a773a4..4a02cd0669 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1327,6 +1327,7 @@ test! { mod conn { use std::error::Error; + use std::fmt::Display; use std::io::{self, Read, Write}; use std::net::{SocketAddr, TcpListener}; use std::pin::Pin; @@ -2067,60 +2068,71 @@ mod conn { ); } - // #[tokio::test] - // async fn http2_keep_alive_with_responsive_server() { - // // Test that a responsive server works just when client keep - // // alive is enabled - // use hyper::service::service_fn; - - // let (listener, addr) = setup_tk_test_server().await; - - // // Spawn an HTTP2 server that reads the whole body and responds - // tokio::spawn(async move { - // let sock = listener.accept().await.unwrap().0; - // hyper::server::conn::http2::Builder::new(TokioExecutor) - // .timer(TokioTimer) - // .serve_connection( - // sock, - // service_fn(|req| async move { - // tokio::spawn(async move { - // let _ = concat(req.into_body()) - // .await - // .expect("server req body aggregate"); - // }); - // Ok::<_, hyper::Error>(http::Response::new(Empty::::new())) - // }), - // ) - // .await - // .expect("serve_connection"); - // }); - - // let io = tcp_connect(&addr).await.expect("tcp connect"); - // let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) - // .timer(TokioTimer) - // .keep_alive_interval(Duration::from_secs(1)) - // .keep_alive_timeout(Duration::from_secs(1)) - // .handshake(io) - // .await - // .expect("http handshake"); - - // tokio::spawn(async move { - // conn.await.expect("client conn shouldn't error"); - // }); - - // // Use a channel to keep request stream open - // let (_tx, recv) = mpsc::channel::, Box>>(0); - // let req = http::Request::new(StreamBody::new(recv)); - - // let _resp = client.send_request(req).await.expect("send_request"); - - // // sleep longer than keepalive would trigger - // TokioTimer.sleep(Duration::from_secs(4)).await; - - // future::poll_fn(|ctx| client.poll_ready(ctx)) - // .await - // .expect("client should be open"); - // } + #[tokio::test] + async fn http2_keep_alive_with_responsive_server() { + // Test that a responsive server works just when client keep + // alive is enabled + use hyper::service::service_fn; + + let (listener, addr) = setup_tk_test_server().await; + + // Spawn an HTTP2 server that reads the whole body and responds + tokio::spawn(async move { + let sock = listener.accept().await.unwrap().0; + hyper::server::conn::http2::Builder::new(TokioExecutor) + .timer(TokioTimer) + .serve_connection( + sock, + service_fn(|req| async move { + tokio::spawn(async move { + let _ = concat(req.into_body()) + .await + .expect("server req body aggregate"); + }); + Ok::<_, hyper::Error>(http::Response::new(Empty::::new())) + }), + ) + .await + .expect("serve_connection"); + }); + + let io = tcp_connect(&addr).await.expect("tcp connect"); + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) + .timer(TokioTimer) + .keep_alive_interval(Duration::from_secs(1)) + .keep_alive_timeout(Duration::from_secs(1)) + .handshake(io) + .await + .expect("http handshake"); + + tokio::spawn(async move { + conn.await.expect("client conn shouldn't error"); + }); + + // Use a channel to keep request stream open + let (_tx, recv) = mpsc::channel::, TestError>>(0); + let req = http::Request::new(StreamBody::new(recv)); + + let _resp = client.send_request(req).await.expect("send_request"); + + // sleep longer than keepalive would trigger + TokioTimer.sleep(Duration::from_secs(4)).await; + + future::poll_fn(|ctx| client.poll_ready(ctx)) + .await + .expect("client should be open"); + } + + #[derive(Debug)] + struct TestError; + + impl Display for TestError { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + unimplemented!() + } + } + + impl std::error::Error for TestError {} #[tokio::test] async fn h2_connect() { From 6d44c294164aaf1aea1921cc8f67f87cf5d852a7 Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 9 Jun 2023 13:50:15 +0200 Subject: [PATCH 15/21] sealed ExecutorClient --- src/rt/bounds.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/rt/bounds.rs b/src/rt/bounds.rs index 0597214efc..7d29362960 100644 --- a/src/rt/bounds.rs +++ b/src/rt/bounds.rs @@ -17,8 +17,15 @@ mod h2_client { use crate::{proto::h2::client::H2ClientFuture, rt::Executor}; - /// TODO - pub trait ExecutorClient + /// An executor to spawn http2 futures for the client. + /// + /// This trait is implemented for any type that implements [`Executor`] + /// trait for any future. + /// + /// This trait is sealed and cannot be implemented for types outside this crate. + /// + /// [`Executor`]: crate::rt::Executor + pub trait ExecutorClient: sealed_client::Sealed<(B, T)> where B: http_body::Body, B::Error: Into>, @@ -40,6 +47,20 @@ mod h2_client { self.execute(future) } } + + impl sealed_client::Sealed<(B, T)> for E + where + E: Executor>, + B: http_body::Body + 'static, + B::Error: std::error::Error + Send + Sync + 'static, + H2ClientFuture: Future, + T: AsyncRead + AsyncWrite + Unpin, + { + } + + mod sealed_client { + pub trait Sealed {} + } } #[cfg(all(feature = "server", feature = "http2"))] From c7d59fe59a1c1f4cf6b6a81f4bc95aa4047aa231 Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 9 Jun 2023 14:07:56 +0200 Subject: [PATCH 16/21] better error message --- src/proto/h2/client.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index c146238e60..9f1f0e1eac 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -292,7 +292,7 @@ where // the connection some more should start shutdown // and then close trace!("send_request dropped, starting conn shutdown"); - drop(this.cancel_tx.take().expect("TODO: Error message")); + drop(this.cancel_tx.take().expect("Future polled twice")); this.conn = &mut Some(b); return Poll::Pending; } @@ -422,8 +422,8 @@ where if let Err(e) = result { debug!("client request body error: {}", e); } - drop(this.conn_drop_ref.take().expect("Call only one")); - drop(this.ping.take().expect("Call only one")); + drop(this.conn_drop_ref.take().expect("Future polled twice")); + drop(this.ping.take().expect("Future polled twice")); return Poll::Ready(()); } Poll::Pending => (), @@ -514,8 +514,8 @@ where let result = ready!(this.fut.poll(cx)); - let ping = this.ping.take().expect("Todo: Error message"); - let send_stream = this.send_stream.take().expect("Todo: Error message"); + let ping = this.ping.take().expect("Future polled twice"); + let send_stream = this.send_stream.take().expect("Future polled twice"); match result { Ok(res) => { From 0269396cd2d4dbf7459ff79a1fd44fc0b590e23e Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 9 Jun 2023 14:39:21 +0200 Subject: [PATCH 17/21] make it work also for io types --- examples/single_threaded.rs | 51 +++++++++++++++++++++++++++++++++++++ src/client/conn/http2.rs | 12 ++++----- src/proto/h2/client.rs | 2 +- 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/examples/single_threaded.rs b/examples/single_threaded.rs index 12c007e363..5bec1c481b 100644 --- a/examples/single_threaded.rs +++ b/examples/single_threaded.rs @@ -5,6 +5,7 @@ use hyper::server::conn::http2; use std::cell::Cell; use std::net::SocketAddr; use std::rc::Rc; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; use hyper::body::{Body as HttpBody, Bytes, Frame}; @@ -111,12 +112,62 @@ async fn server() -> Result<(), Box> { } } +struct IOTypeNotSend { + _marker: PhantomData<*const ()>, + stream: TcpStream, +} + +impl IOTypeNotSend { + fn new(stream: TcpStream) -> Self { + Self { + _marker: PhantomData, + stream, + } + } +} + +impl AsyncWrite for IOTypeNotSend { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.stream).poll_write(cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.stream).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.stream).poll_shutdown(cx) + } +} + +impl AsyncRead for IOTypeNotSend { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.stream).poll_read(cx, buf) + } +} + async fn client(url: hyper::Uri) -> Result<(), Box> { let host = url.host().expect("uri has no host"); let port = url.port_u16().unwrap_or(80); let addr = format!("{}:{}", host, port); let stream = TcpStream::connect(addr).await?; + let stream = IOTypeNotSend::new(stream); + let (mut sender, conn) = hyper::client::conn::http2::handshake(LocalExec, stream).await?; tokio::task::spawn_local(async move { diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index c75fbd4d5e..2b2085465b 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -37,7 +37,7 @@ impl Clone for SendRequest { #[must_use = "futures do nothing unless polled"] pub struct Connection where - T: AsyncRead + AsyncWrite + Send + 'static + Unpin, + T: AsyncRead + AsyncWrite + 'static + Unpin, B: Body + 'static, E: ExecutorClient + Unpin, B::Error: Into>, @@ -66,7 +66,7 @@ pub async fn handshake( io: T, ) -> crate::Result<(SendRequest, Connection)> where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static, B::Data: Send, B::Error: std::error::Error + Send + Sync + 'static, @@ -195,7 +195,7 @@ impl fmt::Debug for SendRequest { impl Connection where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + Unpin + Send + 'static, B::Data: Send, B::Error: Into>, @@ -217,7 +217,7 @@ where impl fmt::Debug for Connection where - T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static + Unpin, + T: AsyncRead + AsyncWrite + fmt::Debug + 'static + Unpin, B: Body + 'static, E: ExecutorClient + Unpin, ::Error: std::error::Error + Send + Sync + 'static, @@ -229,7 +229,7 @@ where impl Future for Connection where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static + Unpin, B::Data: Send, E: Unpin, @@ -400,7 +400,7 @@ where io: T, ) -> impl Future, Connection)>> + where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static, B::Data: Send, B::Error: Into>, diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 9f1f0e1eac..56aff85a9f 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -111,7 +111,7 @@ pub(crate) async fn handshake( timer: Time, ) -> crate::Result> where - T: AsyncRead + AsyncWrite + Send + Unpin + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static, B::Data: Send + 'static, E: ExecutorClient + Unpin, From 3a78a57836417f19ca5b696ac7f3e42a2518a24d Mon Sep 17 00:00:00 2001 From: ruben Date: Mon, 12 Jun 2023 23:45:59 +0200 Subject: [PATCH 18/21] improve example --- examples/single_threaded.rs | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/examples/single_threaded.rs b/examples/single_threaded.rs index 5bec1c481b..157ce79d01 100644 --- a/examples/single_threaded.rs +++ b/examples/single_threaded.rs @@ -5,7 +5,7 @@ use hyper::server::conn::http2; use std::cell::Cell; use std::net::SocketAddr; use std::rc::Rc; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, self, AsyncWriteExt}; use tokio::net::TcpListener; use hyper::body::{Body as HttpBody, Bytes, Frame}; @@ -82,12 +82,19 @@ fn main() { } async fn server() -> Result<(), Box> { + let mut stdout = io::stdout(); + + + let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); // Using a !Send request counter is fine on 1 thread... let counter = Rc::new(Cell::new(0)); let listener = TcpListener::bind(addr).await?; - println!("Listening on http://{}", addr); + + stdout.write_all(format!("Listening on http://{}",addr).as_bytes()).await.unwrap(); + stdout.flush().await.unwrap(); + loop { let (stream, _) = listener.accept().await?; @@ -106,7 +113,10 @@ async fn server() -> Result<(), Box> { .serve_connection(stream, service) .await { - println!("Error serving connection: {:?}", err); + let mut stdout = io::stdout(); + stdout.write_all(format!("Error serving connection: {:?}", err).as_bytes()).await.unwrap(); + stdout.flush().await.unwrap(); + } }); } @@ -172,7 +182,9 @@ async fn client(url: hyper::Uri) -> Result<(), Box> { tokio::task::spawn_local(async move { if let Err(err) = conn.await { - println!("Connection failed: {:?}", err); + let mut stdout = io::stdout(); + stdout.write_all(format!("Connection failed: {:?}", err).as_bytes()).await.unwrap(); + stdout.flush().await.unwrap(); } }); @@ -186,17 +198,21 @@ async fn client(url: hyper::Uri) -> Result<(), Box> { .body(Body::from("test".to_string()))?; let mut res = sender.send_request(req).await?; - - println!("Response: {}", res.status()); - println!("Headers: {:#?}\n", res.headers()); + + let mut stdout = io::stdout(); + stdout.write_all(format!("Response: {}\n", res.status()).as_bytes()).await.unwrap(); + stdout.write_all(format!("Headers: {:#?}\n", res.headers()).as_bytes()).await.unwrap(); + stdout.flush().await.unwrap(); // Print the response body while let Some(next) = res.frame().await { let frame = next?; if let Some(chunk) = frame.data_ref() { - println!("data {:#?}", chunk) + stdout.write_all(&chunk).await.unwrap(); } } + stdout.write_all(b"\n-----------------\n").await.unwrap(); + stdout.flush().await.unwrap(); } Ok(()) } From be5c6548c8521933eb457d4d388e121183584942 Mon Sep 17 00:00:00 2001 From: ruben Date: Mon, 12 Jun 2023 23:47:12 +0200 Subject: [PATCH 19/21] fmt --- examples/single_threaded.rs | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/examples/single_threaded.rs b/examples/single_threaded.rs index 157ce79d01..de6256239c 100644 --- a/examples/single_threaded.rs +++ b/examples/single_threaded.rs @@ -5,7 +5,7 @@ use hyper::server::conn::http2; use std::cell::Cell; use std::net::SocketAddr; use std::rc::Rc; -use tokio::io::{AsyncRead, AsyncWrite, self, AsyncWriteExt}; +use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpListener; use hyper::body::{Body as HttpBody, Bytes, Frame}; @@ -84,15 +84,16 @@ fn main() { async fn server() -> Result<(), Box> { let mut stdout = io::stdout(); - - let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); // Using a !Send request counter is fine on 1 thread... let counter = Rc::new(Cell::new(0)); let listener = TcpListener::bind(addr).await?; - stdout.write_all(format!("Listening on http://{}",addr).as_bytes()).await.unwrap(); + stdout + .write_all(format!("Listening on http://{}", addr).as_bytes()) + .await + .unwrap(); stdout.flush().await.unwrap(); loop { @@ -114,9 +115,11 @@ async fn server() -> Result<(), Box> { .await { let mut stdout = io::stdout(); - stdout.write_all(format!("Error serving connection: {:?}", err).as_bytes()).await.unwrap(); + stdout + .write_all(format!("Error serving connection: {:?}", err).as_bytes()) + .await + .unwrap(); stdout.flush().await.unwrap(); - } }); } @@ -183,7 +186,10 @@ async fn client(url: hyper::Uri) -> Result<(), Box> { tokio::task::spawn_local(async move { if let Err(err) = conn.await { let mut stdout = io::stdout(); - stdout.write_all(format!("Connection failed: {:?}", err).as_bytes()).await.unwrap(); + stdout + .write_all(format!("Connection failed: {:?}", err).as_bytes()) + .await + .unwrap(); stdout.flush().await.unwrap(); } }); @@ -198,10 +204,16 @@ async fn client(url: hyper::Uri) -> Result<(), Box> { .body(Body::from("test".to_string()))?; let mut res = sender.send_request(req).await?; - + let mut stdout = io::stdout(); - stdout.write_all(format!("Response: {}\n", res.status()).as_bytes()).await.unwrap(); - stdout.write_all(format!("Headers: {:#?}\n", res.headers()).as_bytes()).await.unwrap(); + stdout + .write_all(format!("Response: {}\n", res.status()).as_bytes()) + .await + .unwrap(); + stdout + .write_all(format!("Headers: {:#?}\n", res.headers()).as_bytes()) + .await + .unwrap(); stdout.flush().await.unwrap(); // Print the response body From 15be46fedf8697815ae2f67570674b378e6ddb93 Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 16 Jun 2023 22:54:40 +0200 Subject: [PATCH 20/21] fix merge fail --- src/client/conn/http2.rs | 13 ++++--------- src/proto/h2/client.rs | 2 +- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 1473b28f23..f077cd1e04 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -49,9 +49,7 @@ where /// /// After setting options, the builder is used to create a handshake future. #[derive(Clone, Debug)] -pub struct Builder - -{ +pub struct Builder { pub(super) exec: Ex, pub(super) timer: Time, h2_builder: proto::h2::client::Config, @@ -64,7 +62,7 @@ pub struct Builder pub async fn handshake( exec: E, io: T, -) -> crate::Result<(SendRequest, Connection)> +) -> crate::Result<(SendRequest, Connection)> where T: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static, @@ -255,10 +253,7 @@ where { /// Creates a new connection builder. #[inline] - pub fn new(exec: E) -> Builder - where - E: Executor + Send + Sync + 'static, - { + pub fn new(exec: Ex) -> Builder { Builder { exec, timer: Time::Empty, @@ -401,7 +396,7 @@ where pub fn handshake( &self, io: T, - ) -> impl Future, Connection)>> + + ) -> impl Future, Connection)>> where T: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static, diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index d9d7aded07..56aff85a9f 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -17,7 +17,7 @@ use tracing::{debug, trace, warn}; use super::ping::{Ponger, Recorder}; use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; use crate::body::{Body, Incoming as IncomingBody}; -use crate::client::dispatch::Callback; +use crate::client::dispatch::{Callback, SendWhen}; use crate::common::time::Time; use crate::common::{task, Future, Never, Pin, Poll}; use crate::ext::Protocol; From 8391604e49bdba536f93d713ef0eff4f63682bbf Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 16 Jun 2023 23:43:14 +0200 Subject: [PATCH 21/21] fix error bounds --- src/client/conn/http2.rs | 4 ++-- src/rt/bounds.rs | 4 ++-- tests/client.rs | 14 +------------- 3 files changed, 5 insertions(+), 17 deletions(-) diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index f077cd1e04..16c7af0a3c 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -67,7 +67,7 @@ where T: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static, B::Data: Send, - B::Error: std::error::Error + Send + Sync + 'static, + B::Error: Into>, E: ExecutorClient + Unpin + Clone, { Builder::new(exec).handshake(io).await @@ -218,7 +218,7 @@ where T: AsyncRead + AsyncWrite + fmt::Debug + 'static + Unpin, B: Body + 'static, E: ExecutorClient + Unpin, - ::Error: std::error::Error + Send + Sync + 'static, + B::Error: Into>, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection").finish() diff --git a/src/rt/bounds.rs b/src/rt/bounds.rs index 7d29362960..6368339796 100644 --- a/src/rt/bounds.rs +++ b/src/rt/bounds.rs @@ -39,7 +39,7 @@ mod h2_client { where E: Executor>, B: http_body::Body + 'static, - B::Error: std::error::Error + Send + Sync + 'static, + B::Error: Into>, H2ClientFuture: Future, T: AsyncRead + AsyncWrite + Unpin, { @@ -52,7 +52,7 @@ mod h2_client { where E: Executor>, B: http_body::Body + 'static, - B::Error: std::error::Error + Send + Sync + 'static, + B::Error: Into>, H2ClientFuture: Future, T: AsyncRead + AsyncWrite + Unpin, { diff --git a/tests/client.rs b/tests/client.rs index 4a02cd0669..842282c5bb 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1327,7 +1327,6 @@ test! { mod conn { use std::error::Error; - use std::fmt::Display; use std::io::{self, Read, Write}; use std::net::{SocketAddr, TcpListener}; use std::pin::Pin; @@ -2110,7 +2109,7 @@ mod conn { }); // Use a channel to keep request stream open - let (_tx, recv) = mpsc::channel::, TestError>>(0); + let (_tx, recv) = mpsc::channel::, Box>>(0); let req = http::Request::new(StreamBody::new(recv)); let _resp = client.send_request(req).await.expect("send_request"); @@ -2123,17 +2122,6 @@ mod conn { .expect("client should be open"); } - #[derive(Debug)] - struct TestError; - - impl Display for TestError { - fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - unimplemented!() - } - } - - impl std::error::Error for TestError {} - #[tokio::test] async fn h2_connect() { let (listener, addr) = setup_tk_test_server().await;