diff --git a/examples/listen-multiple-addrs/Cargo.toml b/examples/listen-multiple-addrs/Cargo.toml index f77fc9d97b..8940b94332 100644 --- a/examples/listen-multiple-addrs/Cargo.toml +++ b/examples/listen-multiple-addrs/Cargo.toml @@ -7,4 +7,6 @@ publish = false [dependencies] axum = { path = "../../axum" } hyper = { version = "1.0.0", features = ["full"] } +hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] } tokio = { version = "1", features = ["full"] } +tower = { version = "0.4", features = ["util"] } diff --git a/examples/listen-multiple-addrs/src/main.rs b/examples/listen-multiple-addrs/src/main.rs index 9fe6a3bc7b..dafd4d64fc 100644 --- a/examples/listen-multiple-addrs/src/main.rs +++ b/examples/listen-multiple-addrs/src/main.rs @@ -1,72 +1,57 @@ -//! Showcases how listening on multiple addrs is possible by -//! implementing Accept for a custom struct. +//! Showcases how listening on multiple addrs is possible. //! //! This may be useful in cases where the platform does not //! listen on both IPv4 and IPv6 when the IPv6 catch-all listener is used (`::`), //! [like older versions of Windows.](https://docs.microsoft.com/en-us/windows/win32/winsock/dual-stack-sockets) -//! Showcases how listening on multiple addrs is possible by -//! implementing Accept for a custom struct. -//! -//! This may be useful in cases where the platform does not -//! listen on both IPv4 and IPv6 when the IPv6 catch-all listener is used (`::`), -//! [like older versions of Windows.](https://docs.microsoft.com/en-us/windows/win32/winsock/dual-stack-sockets) - -// TODO -fn main() { - eprint!("this example has not yet been updated to hyper 1.0"); +use axum::{extract::Request, routing::get, Router}; +use hyper::body::Incoming; +use hyper_util::{ + rt::{TokioExecutor, TokioIo}, + server, +}; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use tokio::net::TcpListener; +use tower::Service; + +#[tokio::main] +async fn main() { + let app: Router = Router::new().route("/", get(|| async { "Hello, World!" })); + + let localhost_v4 = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8080); + let listener_v4 = TcpListener::bind(&localhost_v4).await.unwrap(); + + let localhost_v6 = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 8080); + let listener_v6 = TcpListener::bind(&localhost_v6).await.unwrap(); + + // See https://github.com/tokio-rs/axum/blob/main/examples/serve-with-hyper/src/main.rs for + // more details about this setup + loop { + // Accept connections from `listener_v4` and `listener_v6` at the same time + let (socket, _remote_addr) = tokio::select! { + result = listener_v4.accept() => { + result.unwrap() + } + result = listener_v6.accept() => { + result.unwrap() + } + }; + + let tower_service = app.clone(); + + tokio::spawn(async move { + let socket = TokioIo::new(socket); + + let hyper_service = hyper::service::service_fn(move |request: Request| { + tower_service.clone().call(request) + }); + + if let Err(err) = server::conn::auto::Builder::new(TokioExecutor::new()) + .serve_connection_with_upgrades(socket, hyper_service) + .await + { + eprintln!("failed to serve connection: {err:#}"); + } + }); + } } - -// use axum::{routing::get, Router}; -// use hyper::server::{accept::Accept, conn::AddrIncoming}; -// use std::{ -// net::{Ipv4Addr, Ipv6Addr, SocketAddr}, -// pin::Pin, -// task::{Context, Poll}, -// }; - -// #[tokio::main] -// async fn main() { -// let app = Router::new().route("/", get(|| async { "Hello, World!" })); - -// let localhost_v4 = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8080); -// let incoming_v4 = AddrIncoming::bind(&localhost_v4).unwrap(); - -// let localhost_v6 = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 8080); -// let incoming_v6 = AddrIncoming::bind(&localhost_v6).unwrap(); - -// let combined = CombinedIncoming { -// a: incoming_v4, -// b: incoming_v6, -// }; - -// hyper::Server::builder(combined) -// .serve(app.into_make_service()) -// .await -// .unwrap(); -// } - -// struct CombinedIncoming { -// a: AddrIncoming, -// b: AddrIncoming, -// } - -// impl Accept for CombinedIncoming { -// type Conn = ::Conn; -// type Error = ::Error; - -// fn poll_accept( -// mut self: Pin<&mut Self>, -// cx: &mut Context<'_>, -// ) -> Poll>> { -// if let Poll::Ready(Some(value)) = Pin::new(&mut self.a).poll_accept(cx) { -// return Poll::Ready(Some(value)); -// } - -// if let Poll::Ready(Some(value)) = Pin::new(&mut self.b).poll_accept(cx) { -// return Poll::Ready(Some(value)); -// } - -// Poll::Pending -// } -// } diff --git a/examples/testing/Cargo.toml b/examples/testing/Cargo.toml index 0bdb6ed34a..00e8132f73 100644 --- a/examples/testing/Cargo.toml +++ b/examples/testing/Cargo.toml @@ -8,6 +8,7 @@ publish = false axum = { path = "../../axum" } http-body-util = "0.1.0" hyper = { version = "1.0.0", features = ["full"] } +hyper-util = { version = "0.1", features = ["client", "http1", "client-legacy"] } mime = "0.3" serde_json = "1.0" tokio = { version = "1.0", features = ["full"] } diff --git a/examples/testing/src/main.rs b/examples/testing/src/main.rs index 59cd8f6d08..efca8d4ef8 100644 --- a/examples/testing/src/main.rs +++ b/examples/testing/src/main.rs @@ -4,208 +4,198 @@ //! cargo test -p example-testing //! ``` -fn main() { - // This example has not yet been updated to Hyper 1.0 +use std::net::SocketAddr; + +use axum::{ + extract::ConnectInfo, + routing::{get, post}, + Json, Router, +}; +use tower_http::trace::TraceLayer; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "example_testing=debug,tower_http=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") + .await + .unwrap(); + tracing::debug!("listening on {}", listener.local_addr().unwrap()); + axum::serve(listener, app()).await.unwrap(); } -//use std::net::SocketAddr; - -//use axum::{ -// extract::ConnectInfo, -// routing::{get, post}, -// Json, Router, -//}; -//use tower_http::trace::TraceLayer; -//use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - -//#[tokio::main] -//async fn main() { -// tracing_subscriber::registry() -// .with( -// tracing_subscriber::EnvFilter::try_from_default_env() -// .unwrap_or_else(|_| "example_testing=debug,tower_http=debug".into()), -// ) -// .with(tracing_subscriber::fmt::layer()) -// .init(); - -// let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") -// .await -// .unwrap(); -// tracing::debug!("listening on {}", listener.local_addr().unwrap()); -// axum::serve(listener, app()).await.unwrap(); -//} - -///// Having a function that produces our app makes it easy to call it from tests -///// without having to create an HTTP server. -//fn app() -> Router { -// Router::new() -// .route("/", get(|| async { "Hello, World!" })) -// .route( -// "/json", -// post(|payload: Json| async move { -// Json(serde_json::json!({ "data": payload.0 })) -// }), -// ) -// .route( -// "/requires-connect-into", -// get(|ConnectInfo(addr): ConnectInfo| async move { format!("Hi {addr}") }), -// ) -// // We can still add middleware -// .layer(TraceLayer::new_for_http()) -//} - -//#[cfg(test)] -//mod tests { -// use super::*; -// use axum::{ -// body::Body, -// extract::connect_info::MockConnectInfo, -// http::{self, Request, StatusCode}, -// }; -// use http_body_util::BodyExt; -// use serde_json::{json, Value}; -// use std::net::SocketAddr; -// use tokio::net::{TcpListener, TcpStream}; -// use tower::Service; // for `call` -// use tower::ServiceExt; // for `oneshot` and `ready` // for `collect` - -// #[tokio::test] -// async fn hello_world() { -// let app = app(); - -// // `Router` implements `tower::Service>` so we can -// // call it like any tower service, no need to run an HTTP server. -// let response = app -// .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap()) -// .await -// .unwrap(); - -// assert_eq!(response.status(), StatusCode::OK); - -// let body = response.into_body().collect().await.unwrap().to_bytes(); -// assert_eq!(&body[..], b"Hello, World!"); -// } - -// #[tokio::test] -// async fn json() { -// let app = app(); - -// let response = app -// .oneshot( -// Request::builder() -// .method(http::Method::POST) -// .uri("/json") -// .header(http::header::CONTENT_TYPE, mime::APPLICATION_JSON.as_ref()) -// .body(Body::from( -// serde_json::to_vec(&json!([1, 2, 3, 4])).unwrap(), -// )) -// .unwrap(), -// ) -// .await -// .unwrap(); - -// assert_eq!(response.status(), StatusCode::OK); - -// let body = response.into_body().collect().await.unwrap().to_bytes(); -// let body: Value = serde_json::from_slice(&body).unwrap(); -// assert_eq!(body, json!({ "data": [1, 2, 3, 4] })); -// } - -// #[tokio::test] -// async fn not_found() { -// let app = app(); - -// let response = app -// .oneshot( -// Request::builder() -// .uri("/does-not-exist") -// .body(Body::empty()) -// .unwrap(), -// ) -// .await -// .unwrap(); - -// assert_eq!(response.status(), StatusCode::NOT_FOUND); -// let body = response.into_body().collect().await.unwrap().to_bytes(); -// assert!(body.is_empty()); -// } - -// // You can also spawn a server and talk to it like any other HTTP server: -// #[tokio::test] -// async fn the_real_deal() { -// // TODO(david): convert this to hyper-util when thats published - -// use hyper::client::conn; - -// let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); -// let addr = listener.local_addr().unwrap(); - -// tokio::spawn(async move { -// axum::serve(listener, app()).await.unwrap(); -// }); - -// let target_stream = TcpStream::connect(addr).await.unwrap(); - -// let (mut request_sender, connection) = conn::http1::handshake(target_stream).await.unwrap(); - -// tokio::spawn(async move { connection.await.unwrap() }); - -// let response = request_sender -// .send_request( -// Request::builder() -// .uri(format!("http://{addr}")) -// .header("Host", "localhost") -// .body(Body::empty()) -// .unwrap(), -// ) -// .await -// .unwrap(); - -// let body = response.into_body().collect().await.unwrap().to_bytes(); -// assert_eq!(&body[..], b"Hello, World!"); -// } - -// // You can use `ready()` and `call()` to avoid using `clone()` -// // in multiple request -// #[tokio::test] -// async fn multiple_request() { -// let mut app = app().into_service(); - -// let request = Request::builder().uri("/").body(Body::empty()).unwrap(); -// let response = ServiceExt::>::ready(&mut app) -// .await -// .unwrap() -// .call(request) -// .await -// .unwrap(); -// assert_eq!(response.status(), StatusCode::OK); - -// let request = Request::builder().uri("/").body(Body::empty()).unwrap(); -// let response = ServiceExt::>::ready(&mut app) -// .await -// .unwrap() -// .call(request) -// .await -// .unwrap(); -// assert_eq!(response.status(), StatusCode::OK); -// } - -// // Here we're calling `/requires-connect-into` which requires `ConnectInfo` -// // -// // That is normally set with `Router::into_make_service_with_connect_info` but we can't easily -// // use that during tests. The solution is instead to set the `MockConnectInfo` layer during -// // tests. -// #[tokio::test] -// async fn with_into_make_service_with_connect_info() { -// let mut app = app() -// .layer(MockConnectInfo(SocketAddr::from(([0, 0, 0, 0], 3000)))) -// .into_service(); - -// let request = Request::builder() -// .uri("/requires-connect-into") -// .body(Body::empty()) -// .unwrap(); -// let response = app.ready().await.unwrap().call(request).await.unwrap(); -// assert_eq!(response.status(), StatusCode::OK); -// } -//} +/// Having a function that produces our app makes it easy to call it from tests +/// without having to create an HTTP server. +fn app() -> Router { + Router::new() + .route("/", get(|| async { "Hello, World!" })) + .route( + "/json", + post(|payload: Json| async move { + Json(serde_json::json!({ "data": payload.0 })) + }), + ) + .route( + "/requires-connect-into", + get(|ConnectInfo(addr): ConnectInfo| async move { format!("Hi {addr}") }), + ) + // We can still add middleware + .layer(TraceLayer::new_for_http()) +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::{ + body::Body, + extract::connect_info::MockConnectInfo, + http::{self, Request, StatusCode}, + }; + use http_body_util::BodyExt; + use serde_json::{json, Value}; + use std::net::SocketAddr; + use tokio::net::TcpListener; + use tower::Service; // for `call` + use tower::ServiceExt; // for `oneshot` and `ready` // for `collect` + + #[tokio::test] + async fn hello_world() { + let app = app(); + + // `Router` implements `tower::Service>` so we can + // call it like any tower service, no need to run an HTTP server. + let response = app + .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap()) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(&body[..], b"Hello, World!"); + } + + #[tokio::test] + async fn json() { + let app = app(); + + let response = app + .oneshot( + Request::builder() + .method(http::Method::POST) + .uri("/json") + .header(http::header::CONTENT_TYPE, mime::APPLICATION_JSON.as_ref()) + .body(Body::from( + serde_json::to_vec(&json!([1, 2, 3, 4])).unwrap(), + )) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + let body: Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(body, json!({ "data": [1, 2, 3, 4] })); + } + + #[tokio::test] + async fn not_found() { + let app = app(); + + let response = app + .oneshot( + Request::builder() + .uri("/does-not-exist") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + let body = response.into_body().collect().await.unwrap().to_bytes(); + assert!(body.is_empty()); + } + + // You can also spawn a server and talk to it like any other HTTP server: + #[tokio::test] + async fn the_real_deal() { + let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + axum::serve(listener, app()).await.unwrap(); + }); + + let client = + hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) + .build_http(); + + let response = client + .request( + Request::builder() + .uri(format!("http://{addr}")) + .header("Host", "localhost") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(&body[..], b"Hello, World!"); + } + + // You can use `ready()` and `call()` to avoid using `clone()` + // in multiple request + #[tokio::test] + async fn multiple_request() { + let mut app = app().into_service(); + + let request = Request::builder().uri("/").body(Body::empty()).unwrap(); + let response = ServiceExt::>::ready(&mut app) + .await + .unwrap() + .call(request) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let request = Request::builder().uri("/").body(Body::empty()).unwrap(); + let response = ServiceExt::>::ready(&mut app) + .await + .unwrap() + .call(request) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + // Here we're calling `/requires-connect-into` which requires `ConnectInfo` + // + // That is normally set with `Router::into_make_service_with_connect_info` but we can't easily + // use that during tests. The solution is instead to set the `MockConnectInfo` layer during + // tests. + #[tokio::test] + async fn with_into_make_service_with_connect_info() { + let mut app = app() + .layer(MockConnectInfo(SocketAddr::from(([0, 0, 0, 0], 3000)))) + .into_service(); + + let request = Request::builder() + .uri("/requires-connect-into") + .body(Body::empty()) + .unwrap(); + let response = app.ready().await.unwrap().call(request).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } +} diff --git a/examples/unix-domain-socket/Cargo.toml b/examples/unix-domain-socket/Cargo.toml index 41bd4a1e6d..7f157c7dcb 100644 --- a/examples/unix-domain-socket/Cargo.toml +++ b/examples/unix-domain-socket/Cargo.toml @@ -6,8 +6,9 @@ publish = false [dependencies] axum = { path = "../../axum" } -futures = "0.3" +http-body-util = "0.1" hyper = { version = "1.0.0", features = ["full"] } +hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] } tokio = { version = "1.0", features = ["full"] } tower = { version = "0.4", features = ["util"] } tracing = "0.1" diff --git a/examples/unix-domain-socket/src/main.rs b/examples/unix-domain-socket/src/main.rs index 188b4b8234..d11792dd70 100644 --- a/examples/unix-domain-socket/src/main.rs +++ b/examples/unix-domain-socket/src/main.rs @@ -4,183 +4,136 @@ //! cargo run -p example-unix-domain-socket //! ``` -// TODO +#[cfg(unix)] +#[tokio::main] +async fn main() { + unix::server().await; +} + +#[cfg(not(unix))] fn main() { - eprint!("this example has not yet been updated to hyper 1.0"); + println!("This example requires unix") } -// #[cfg(unix)] -// #[tokio::main] -// async fn main() { -// unix::server().await; -// } - -// #[cfg(not(unix))] -// fn main() { -// println!("This example requires unix") -// } - -// #[cfg(unix)] -// mod unix { -// use axum::{ -// body::Body, -// extract::connect_info::{self, ConnectInfo}, -// http::{Method, Request, StatusCode, Uri}, -// routing::get, -// Router, -// }; -// use futures::ready; -// use hyper::{ -// client::connect::{Connected, Connection}, -// server::accept::Accept, -// }; -// use std::{ -// io, -// path::PathBuf, -// pin::Pin, -// sync::Arc, -// task::{Context, Poll}, -// }; -// use tokio::{ -// io::{AsyncRead, AsyncWrite}, -// net::{unix::UCred, UnixListener, UnixStream}, -// }; -// use tower::BoxError; -// use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - -// pub async fn server() { -// tracing_subscriber::registry() -// .with( -// tracing_subscriber::EnvFilter::try_from_default_env() -// .unwrap_or_else(|_| "debug".into()), -// ) -// .with(tracing_subscriber::fmt::layer()) -// .init(); - -// let path = PathBuf::from("/tmp/axum/helloworld"); - -// let _ = tokio::fs::remove_file(&path).await; -// tokio::fs::create_dir_all(path.parent().unwrap()) -// .await -// .unwrap(); - -// let uds = UnixListener::bind(path.clone()).unwrap(); -// tokio::spawn(async { -// let app = Router::new().route("/", get(handler)); - -// hyper::Server::builder(ServerAccept { uds }) -// .serve(app.into_make_service_with_connect_info::()) -// .await -// .unwrap(); -// }); - -// let connector = tower::service_fn(move |_: Uri| { -// let path = path.clone(); -// Box::pin(async move { -// let stream = UnixStream::connect(path).await?; -// Ok::<_, io::Error>(ClientConnection { stream }) -// }) -// }); -// let client = hyper::Client::builder().build(connector); - -// let request = Request::builder() -// .method(Method::GET) -// .uri("http://uri-doesnt-matter.com") -// .body(Body::empty()) -// .unwrap(); - -// let response = client.request(request).await.unwrap(); - -// assert_eq!(response.status(), StatusCode::OK); - -// let body = hyper::body::to_bytes(response.into_body()).await.unwrap(); -// let body = String::from_utf8(body.to_vec()).unwrap(); -// assert_eq!(body, "Hello, World!"); -// } - -// async fn handler(ConnectInfo(info): ConnectInfo) -> &'static str { -// println!("new connection from `{:?}`", info); - -// "Hello, World!" -// } - -// struct ServerAccept { -// uds: UnixListener, -// } - -// impl Accept for ServerAccept { -// type Conn = UnixStream; -// type Error = BoxError; - -// fn poll_accept( -// self: Pin<&mut Self>, -// cx: &mut Context<'_>, -// ) -> Poll>> { -// let (stream, _addr) = ready!(self.uds.poll_accept(cx))?; -// Poll::Ready(Some(Ok(stream))) -// } -// } - -// struct ClientConnection { -// stream: UnixStream, -// } - -// impl AsyncWrite for ClientConnection { -// fn poll_write( -// mut self: Pin<&mut Self>, -// cx: &mut Context<'_>, -// buf: &[u8], -// ) -> Poll> { -// Pin::new(&mut self.stream).poll_write(cx, buf) -// } - -// fn poll_flush( -// mut self: Pin<&mut Self>, -// cx: &mut Context<'_>, -// ) -> Poll> { -// Pin::new(&mut self.stream).poll_flush(cx) -// } - -// fn poll_shutdown( -// mut self: Pin<&mut Self>, -// cx: &mut Context<'_>, -// ) -> Poll> { -// Pin::new(&mut self.stream).poll_shutdown(cx) -// } -// } - -// impl AsyncRead for ClientConnection { -// fn poll_read( -// mut self: Pin<&mut Self>, -// cx: &mut Context<'_>, -// buf: &mut tokio::io::ReadBuf<'_>, -// ) -> Poll> { -// Pin::new(&mut self.stream).poll_read(cx, buf) -// } -// } - -// impl Connection for ClientConnection { -// fn connected(&self) -> Connected { -// Connected::new() -// } -// } - -// #[derive(Clone, Debug)] -// #[allow(dead_code)] -// struct UdsConnectInfo { -// peer_addr: Arc, -// peer_cred: UCred, -// } - -// impl connect_info::Connected<&UnixStream> for UdsConnectInfo { -// fn connect_info(target: &UnixStream) -> Self { -// let peer_addr = target.peer_addr().unwrap(); -// let peer_cred = target.peer_cred().unwrap(); - -// Self { -// peer_addr: Arc::new(peer_addr), -// peer_cred, -// } -// } -// } -// } +#[cfg(unix)] +mod unix { + use axum::{ + body::Body, + extract::connect_info::{self, ConnectInfo}, + http::{Method, Request, StatusCode}, + routing::get, + Router, + }; + use http_body_util::BodyExt; + use hyper::body::Incoming; + use hyper_util::{ + rt::{TokioExecutor, TokioIo}, + server, + }; + use std::{convert::Infallible, path::PathBuf, sync::Arc}; + use tokio::net::{unix::UCred, UnixListener, UnixStream}; + use tower::Service; + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + + pub async fn server() { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let path = PathBuf::from("/tmp/axum/helloworld"); + + let _ = tokio::fs::remove_file(&path).await; + tokio::fs::create_dir_all(path.parent().unwrap()) + .await + .unwrap(); + + let uds = UnixListener::bind(path.clone()).unwrap(); + tokio::spawn(async move { + let app = Router::new().route("/", get(handler)); + + let mut make_service = app.into_make_service_with_connect_info::(); + + // See https://github.com/tokio-rs/axum/blob/main/examples/serve-with-hyper/src/main.rs for + // more details about this setup + loop { + let (socket, _remote_addr) = uds.accept().await.unwrap(); + + let tower_service = unwrap_infallible(make_service.call(&socket).await); + + tokio::spawn(async move { + let socket = TokioIo::new(socket); + + let hyper_service = + hyper::service::service_fn(move |request: Request| { + tower_service.clone().call(request) + }); + + if let Err(err) = server::conn::auto::Builder::new(TokioExecutor::new()) + .serve_connection_with_upgrades(socket, hyper_service) + .await + { + eprintln!("failed to serve connection: {err:#}"); + } + }); + } + }); + + let stream = TokioIo::new(UnixStream::connect(path).await.unwrap()); + let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await.unwrap(); + tokio::task::spawn(async move { + if let Err(err) = conn.await { + println!("Connection failed: {:?}", err); + } + }); + + let request = Request::builder() + .method(Method::GET) + .uri("http://uri-doesnt-matter.com") + .body(Body::empty()) + .unwrap(); + + let response = sender.send_request(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = response.collect().await.unwrap().to_bytes(); + let body = String::from_utf8(body.to_vec()).unwrap(); + assert_eq!(body, "Hello, World!"); + } + + async fn handler(ConnectInfo(info): ConnectInfo) -> &'static str { + println!("new connection from `{:?}`", info); + + "Hello, World!" + } + + #[derive(Clone, Debug)] + #[allow(dead_code)] + struct UdsConnectInfo { + peer_addr: Arc, + peer_cred: UCred, + } + + impl connect_info::Connected<&UnixStream> for UdsConnectInfo { + fn connect_info(target: &UnixStream) -> Self { + let peer_addr = target.peer_addr().unwrap(); + let peer_cred = target.peer_cred().unwrap(); + + Self { + peer_addr: Arc::new(peer_addr), + peer_cred, + } + } + } + + fn unwrap_infallible(result: Result) -> T { + match result { + Ok(value) => value, + Err(err) => match err {}, + } + } +}