From 99eb0319a2c7ee229f7c88d67de02ae696729eeb Mon Sep 17 00:00:00 2001 From: David Calavera Date: Tue, 16 Jan 2024 01:16:09 -0800 Subject: [PATCH] Test runtime with httpmock. (#780) * Test runtime with httpmock. - Remove generic runtime over the client. - Make tests use httpmock for more concise assertions. Signed-off-by: David Calavera * Bump MSRV to 1.65. Fixes compilation issues with the regex crate. Signed-off-by: David Calavera --------- Signed-off-by: David Calavera --- .github/workflows/build-events.yml | 2 +- .github/workflows/build-extension.yml | 2 +- .github/workflows/build-runtime.yml | 2 +- README.md | 2 +- lambda-runtime-api-client/src/lib.rs | 41 +-- lambda-runtime/Cargo.toml | 3 +- lambda-runtime/src/lib.rs | 345 +++++++++----------------- lambda-runtime/src/simulated.rs | 133 ---------- 8 files changed, 134 insertions(+), 396 deletions(-) delete mode 100644 lambda-runtime/src/simulated.rs diff --git a/.github/workflows/build-events.yml b/.github/workflows/build-events.yml index 4e5fb34d..caee1131 100644 --- a/.github/workflows/build-events.yml +++ b/.github/workflows/build-events.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: toolchain: - - "1.64.0" # Current MSRV + - "1.65.0" # Current MSRV - stable env: RUST_BACKTRACE: 1 diff --git a/.github/workflows/build-extension.yml b/.github/workflows/build-extension.yml index 7365bc64..7165a281 100644 --- a/.github/workflows/build-extension.yml +++ b/.github/workflows/build-extension.yml @@ -18,7 +18,7 @@ jobs: strategy: matrix: toolchain: - - "1.64.0" # Current MSRV + - "1.65.0" # Current MSRV - stable env: RUST_BACKTRACE: 1 diff --git a/.github/workflows/build-runtime.yml b/.github/workflows/build-runtime.yml index 9657a840..026c20e0 100644 --- a/.github/workflows/build-runtime.yml +++ b/.github/workflows/build-runtime.yml @@ -19,7 +19,7 @@ jobs: strategy: matrix: toolchain: - - "1.64.0" # Current MSRV + - "1.65.0" # Current MSRV - stable env: RUST_BACKTRACE: 1 diff --git a/README.md b/README.md index ef43ba74..eeeaf2fa 100644 --- a/README.md +++ b/README.md @@ -440,7 +440,7 @@ This will make your function compile much faster. ## Supported Rust Versions (MSRV) -The AWS Lambda Rust Runtime requires a minimum of Rust 1.64, and is not guaranteed to build on compiler versions earlier than that. +The AWS Lambda Rust Runtime requires a minimum of Rust 1.65, and is not guaranteed to build on compiler versions earlier than that. ## Security diff --git a/lambda-runtime-api-client/src/lib.rs b/lambda-runtime-api-client/src/lib.rs index 15185f81..ec7418ba 100644 --- a/lambda-runtime-api-client/src/lib.rs +++ b/lambda-runtime-api-client/src/lib.rs @@ -6,9 +6,8 @@ //! the AWS Lambda Runtime API. use http::{uri::PathAndQuery, uri::Scheme, Request, Response, Uri}; use hyper::body::Incoming; -use hyper_util::client::legacy::connect::{Connect, Connection, HttpConnector}; +use hyper_util::client::legacy::connect::HttpConnector; use std::{convert::TryInto, fmt::Debug}; -use tower_service::Service; const USER_AGENT_HEADER: &str = "User-Agent"; const DEFAULT_USER_AGENT: &str = concat!("aws-lambda-rust/", env!("CARGO_PKG_VERSION")); @@ -20,16 +19,16 @@ pub mod body; /// API client to interact with the AWS Lambda Runtime API. #[derive(Debug)] -pub struct Client { +pub struct Client { /// The runtime API URI pub base: Uri, /// The client that manages the API connections - pub client: hyper_util::client::legacy::Client, + pub client: hyper_util::client::legacy::Client, } impl Client { /// Create a builder struct to configure the client. - pub fn builder() -> ClientBuilder { + pub fn builder() -> ClientBuilder { ClientBuilder { connector: HttpConnector::new(), uri: None, @@ -37,10 +36,7 @@ impl Client { } } -impl Client -where - C: Connect + Sync + Send + Clone + 'static, -{ +impl Client { /// Send a given request to the Runtime API. /// Use the client's base URI to ensure the API endpoint is correct. pub async fn call(&self, req: Request) -> Result, BoxError> { @@ -49,7 +45,7 @@ where } /// Create a new client with a given base URI and HTTP connector. - pub fn with(base: Uri, connector: C) -> Self { + fn with(base: Uri, connector: HttpConnector) -> Self { let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) .http1_max_buf_size(1024 * 1024) .build(connector); @@ -80,26 +76,14 @@ where } /// Builder implementation to construct any Runtime API clients. -pub struct ClientBuilder = HttpConnector> { - connector: C, +pub struct ClientBuilder { + connector: HttpConnector, uri: Option, } -impl ClientBuilder -where - C: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: Connection + Unpin + Send + 'static, -{ +impl ClientBuilder { /// Create a new builder with a given HTTP connector. - pub fn with_connector(self, connector: C2) -> ClientBuilder - where - C2: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: Connection + Unpin + Send + 'static, - { + pub fn with_connector(self, connector: HttpConnector) -> ClientBuilder { ClientBuilder { connector, uri: self.uri, @@ -113,10 +97,7 @@ where } /// Create the new client to interact with the Runtime API. - pub fn build(self) -> Result, Error> - where - C: Connect + Sync + Send + Clone + 'static, - { + pub fn build(self) -> Result { let uri = match self.uri { Some(uri) => uri, None => { diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index ccbb3b2e..221aa6f0 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -51,6 +51,7 @@ tower = { workspace = true, features = ["util"] } tracing = { version = "0.1", features = ["log"] } [dev-dependencies] +httpmock = "0.7.0" hyper-util = { workspace = true, features = [ "client", "client-legacy", @@ -59,4 +60,4 @@ hyper-util = { workspace = true, features = [ "server-auto", "tokio", ] } -pin-project-lite = { workspace = true } \ No newline at end of file +pin-project-lite = { workspace = true } diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index 10dfce92..3a91d943 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -11,7 +11,6 @@ use bytes::Bytes; use futures::FutureExt; use http_body_util::BodyExt; use hyper::{body::Incoming, http::Request}; -use hyper_util::client::legacy::connect::{Connect, Connection, HttpConnector}; use lambda_runtime_api_client::{body::Body, BoxError, Client}; use serde::{Deserialize, Serialize}; use std::{ @@ -28,8 +27,6 @@ use tracing::{error, trace, Instrument}; mod deserializer; mod requests; -#[cfg(test)] -mod simulated; /// Utilities for Lambda Streaming functions. pub mod streaming; /// Types available to a Lambda function. @@ -85,18 +82,12 @@ where service_fn(move |req: LambdaEvent| f(req.payload, req.context)) } -struct Runtime = HttpConnector> { - client: Client, +struct Runtime { + client: Client, config: RefConfig, } -impl Runtime -where - C: Service + Connect + Clone + Send + Sync + Unpin + 'static, - C::Future: Unpin + Send, - C::Error: Into>, - C::Response: Connection + Unpin + Send + 'static, -{ +impl Runtime { async fn run( &self, incoming: impl Stream, Error>> + Send, @@ -196,13 +187,7 @@ where } } -fn incoming(client: &Client) -> impl Stream, Error>> + Send + '_ -where - C: Service + Connect + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: Connection + Unpin + Send + 'static, -{ +fn incoming(client: &Client) -> impl Stream, Error>> + Send + '_ { async_stream::stream! { loop { trace!("Waiting for next event (incoming loop)"); @@ -276,231 +261,135 @@ where mod endpoint_tests { use crate::{ incoming, - requests::{ - EventCompletionRequest, EventErrorRequest, IntoRequest, IntoResponse, NextEventRequest, NextEventResponse, - }, - simulated, + requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest}, types::Diagnostic, Config, Error, Runtime, }; use futures::future::BoxFuture; - use http::{uri::PathAndQuery, HeaderValue, Method, Request, Response, StatusCode, Uri}; - use hyper::body::Incoming; - use hyper::rt::{Read, Write}; - use hyper::service::service_fn; - - use hyper_util::server::conn::auto::Builder; - use lambda_runtime_api_client::{body::Body, Client}; - use serde_json::json; - use simulated::DuplexStreamWrapper; - use std::{convert::TryFrom, env, sync::Arc}; - use tokio::{ - io, select, - sync::{self, oneshot}, - }; - use tokio_stream::StreamExt; + use http::{HeaderValue, StatusCode}; + use http_body_util::BodyExt; + use httpmock::prelude::*; - #[cfg(test)] - async fn next_event(req: &Request) -> Result, Error> { - let path = "/2018-06-01/runtime/invocation/next"; - assert_eq!(req.method(), Method::GET); - assert_eq!(req.uri().path_and_query().unwrap(), &PathAndQuery::from_static(path)); - let body = json!({"message": "hello"}); - - let rsp = NextEventResponse { - request_id: "8476a536-e9f4-11e8-9739-2dfe598c3fcd", - deadline: 1_542_409_706_888, - arn: "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime", - trace_id: "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419", - body: serde_json::to_vec(&body)?, - }; - rsp.into_rsp() - } - - #[cfg(test)] - async fn complete_event(req: &Request, id: &str) -> Result, Error> { - assert_eq!(Method::POST, req.method()); - let rsp = Response::builder() - .status(StatusCode::ACCEPTED) - .body(Body::empty()) - .expect("Unable to construct response"); - - let expected = format!("/2018-06-01/runtime/invocation/{id}/response"); - assert_eq!(expected, req.uri().path()); - - Ok(rsp) - } - - #[cfg(test)] - async fn event_err(req: &Request, id: &str) -> Result, Error> { - let expected = format!("/2018-06-01/runtime/invocation/{id}/error"); - assert_eq!(expected, req.uri().path()); - - assert_eq!(req.method(), Method::POST); - let header = "lambda-runtime-function-error-type"; - let expected = "unhandled"; - assert_eq!(req.headers()[header], HeaderValue::try_from(expected)?); - - let rsp = Response::builder().status(StatusCode::ACCEPTED).body(Body::empty())?; - Ok(rsp) - } - - #[cfg(test)] - async fn handle_incoming(req: Request) -> Result, Error> { - let path: Vec<&str> = req - .uri() - .path_and_query() - .expect("PathAndQuery not found") - .as_str() - .split('/') - .collect::>(); - match path[1..] { - ["2018-06-01", "runtime", "invocation", "next"] => next_event(&req).await, - ["2018-06-01", "runtime", "invocation", id, "response"] => complete_event(&req, id).await, - ["2018-06-01", "runtime", "invocation", id, "error"] => event_err(&req, id).await, - ["2018-06-01", "runtime", "init", "error"] => unimplemented!(), - _ => unimplemented!(), - } - } - - #[cfg(test)] - async fn handle(io: I, rx: oneshot::Receiver<()>) -> Result<(), Error> - where - I: Read + Write + Unpin + 'static, - { - use hyper_util::rt::TokioExecutor; - - let builder = Builder::new(TokioExecutor::new()); - let conn = builder.serve_connection(io, service_fn(handle_incoming)); - select! { - _ = rx => { - Ok(()) - } - res = conn => { - match res { - Ok(()) => Ok(()), - Err(e) => { - Err(e) - } - } - } - } - } + use lambda_runtime_api_client::Client; + use std::{env, sync::Arc}; + use tokio_stream::StreamExt; #[tokio::test] async fn test_next_event() -> Result<(), Error> { - let base = Uri::from_static("http://localhost:9001"); - let (client, server) = io::duplex(64); - - let (tx, rx) = sync::oneshot::channel(); - let server = tokio::spawn(async { - handle(DuplexStreamWrapper::new(server), rx) - .await - .expect("Unable to handle request"); + let server = MockServer::start(); + let request_id = "156cb537-e2d4-11e8-9b34-d36013741fb9"; + let deadline = "1542409706888"; + + let mock = server.mock(|when, then| { + when.method(GET).path("/2018-06-01/runtime/invocation/next"); + then.status(200) + .header("content-type", "application/json") + .header("lambda-runtime-aws-request-id", request_id) + .header("lambda-runtime-deadline-ms", deadline) + .body("{}"); }); - let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; - let client = Client::with(base, conn); + let base = server.base_url().parse().expect("Invalid mock server Uri"); + let client = Client::builder().with_endpoint(base).build()?; let req = NextEventRequest.into_req()?; let rsp = client.call(req).await.expect("Unable to send request"); + mock.assert_async().await; assert_eq!(rsp.status(), StatusCode::OK); - let header = "lambda-runtime-deadline-ms"; - assert_eq!(rsp.headers()[header], &HeaderValue::try_from("1542409706888")?); - - // shutdown server... - tx.send(()).expect("Receiver has been dropped"); - match server.await { - Ok(_) => Ok(()), - Err(e) if e.is_panic() => Err::<(), Error>(e.into()), - Err(_) => unreachable!("This branch shouldn't be reachable"), - } + assert_eq!( + rsp.headers()["lambda-runtime-aws-request-id"], + &HeaderValue::from_static(request_id) + ); + assert_eq!( + rsp.headers()["lambda-runtime-deadline-ms"], + &HeaderValue::from_static(deadline) + ); + + let body = rsp.into_body().collect().await?.to_bytes(); + assert_eq!("{}", std::str::from_utf8(&body)?); + Ok(()) } #[tokio::test] async fn test_ok_response() -> Result<(), Error> { - let (client, server) = io::duplex(64); - let (tx, rx) = sync::oneshot::channel(); - let base = Uri::from_static("http://localhost:9001"); - - let server = tokio::spawn(async { - handle(DuplexStreamWrapper::new(server), rx) - .await - .expect("Unable to handle request"); + let server = MockServer::start(); + + let mock = server.mock(|when, then| { + when.method(POST) + .path("/2018-06-01/runtime/invocation/156cb537-e2d4-11e8-9b34-d36013741fb9/response") + .body("\"{}\""); + then.status(200).body(""); }); - let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; - let client = Client::with(base, conn); + let base = server.base_url().parse().expect("Invalid mock server Uri"); + let client = Client::builder().with_endpoint(base).build()?; - let req = EventCompletionRequest::new("156cb537-e2d4-11e8-9b34-d36013741fb9", "done"); + let req = EventCompletionRequest::new("156cb537-e2d4-11e8-9b34-d36013741fb9", "{}"); let req = req.into_req()?; let rsp = client.call(req).await?; - assert_eq!(rsp.status(), StatusCode::ACCEPTED); - - // shutdown server - tx.send(()).expect("Receiver has been dropped"); - match server.await { - Ok(_) => Ok(()), - Err(e) if e.is_panic() => Err::<(), Error>(e.into()), - Err(_) => unreachable!("This branch shouldn't be reachable"), - } + + mock.assert_async().await; + assert_eq!(rsp.status(), StatusCode::OK); + Ok(()) } #[tokio::test] async fn test_error_response() -> Result<(), Error> { - let (client, server) = io::duplex(200); - let (tx, rx) = sync::oneshot::channel(); - let base = Uri::from_static("http://localhost:9001"); - - let server = tokio::spawn(async { - handle(DuplexStreamWrapper::new(server), rx) - .await - .expect("Unable to handle request"); + let diagnostic = Diagnostic { + error_type: "InvalidEventDataError", + error_message: "Error parsing event data", + }; + let body = serde_json::to_string(&diagnostic)?; + + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(POST) + .path("/2018-06-01/runtime/invocation/156cb537-e2d4-11e8-9b34-d36013741fb9/error") + .header("lambda-runtime-function-error-type", "unhandled") + .body(body); + then.status(200).body(""); }); - let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; - let client = Client::with(base, conn); + let base = server.base_url().parse().expect("Invalid mock server Uri"); + let client = Client::builder().with_endpoint(base).build()?; let req = EventErrorRequest { request_id: "156cb537-e2d4-11e8-9b34-d36013741fb9", - diagnostic: Diagnostic { - error_type: "InvalidEventDataError", - error_message: "Error parsing event data", - }, + diagnostic, }; let req = req.into_req()?; let rsp = client.call(req).await?; - assert_eq!(rsp.status(), StatusCode::ACCEPTED); - - // shutdown server - tx.send(()).expect("Receiver has been dropped"); - match server.await { - Ok(_) => Ok(()), - Err(e) if e.is_panic() => Err::<(), Error>(e.into()), - Err(_) => unreachable!("This branch shouldn't be reachable"), - } + + mock.assert_async().await; + assert_eq!(rsp.status(), StatusCode::OK); + Ok(()) } #[tokio::test] async fn successful_end_to_end_run() -> Result<(), Error> { - let (client, server) = io::duplex(64); - let (tx, rx) = sync::oneshot::channel(); - let base = Uri::from_static("http://localhost:9001"); - - let server = tokio::spawn(async { - handle(DuplexStreamWrapper::new(server), rx) - .await - .expect("Unable to handle request"); + let server = MockServer::start(); + let request_id = "156cb537-e2d4-11e8-9b34-d36013741fb9"; + let deadline = "1542409706888"; + + let next_request = server.mock(|when, then| { + when.method(GET).path("/2018-06-01/runtime/invocation/next"); + then.status(200) + .header("content-type", "application/json") + .header("lambda-runtime-aws-request-id", request_id) + .header("lambda-runtime-deadline-ms", deadline) + .body("{}"); + }); + let next_response = server.mock(|when, then| { + when.method(POST) + .path(format!("/2018-06-01/runtime/invocation/{}/response", request_id)) + .body("{}"); + then.status(200).body(""); }); - let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; - let client = Client::builder() - .with_endpoint(base) - .with_connector(conn) - .build() - .expect("Unable to build client"); + let base = server.base_url().parse().expect("Invalid mock server Uri"); + let client = Client::builder().with_endpoint(base).build()?; async fn func(event: crate::LambdaEvent) -> Result { let (event, _) = event.into_parts(); @@ -510,7 +399,7 @@ mod endpoint_tests { // set env vars needed to init Config if they are not already set in the environment if env::var("AWS_LAMBDA_RUNTIME_API").is_err() { - env::set_var("AWS_LAMBDA_RUNTIME_API", "http://localhost:9001"); + env::set_var("AWS_LAMBDA_RUNTIME_API", server.base_url()); } if env::var("AWS_LAMBDA_FUNCTION_NAME").is_err() { env::set_var("AWS_LAMBDA_FUNCTION_NAME", "test_fn"); @@ -537,35 +426,37 @@ mod endpoint_tests { let incoming = incoming(client).take(1); runtime.run(incoming, f).await?; - // shutdown server - tx.send(()).expect("Receiver has been dropped"); - match server.await { - Ok(_) => Ok(()), - Err(e) if e.is_panic() => Err::<(), Error>(e.into()), - Err(_) => unreachable!("This branch shouldn't be reachable"), - } + next_request.assert_async().await; + next_response.assert_async().await; + Ok(()) } async fn run_panicking_handler(func: F) -> Result<(), Error> where F: FnMut(crate::LambdaEvent) -> BoxFuture<'static, Result>, { - let (client, server) = io::duplex(64); - let (_tx, rx) = oneshot::channel(); - let base = Uri::from_static("http://localhost:9001"); - - let server = tokio::spawn(async { - handle(DuplexStreamWrapper::new(server), rx) - .await - .expect("Unable to handle request"); + let server = MockServer::start(); + let request_id = "156cb537-e2d4-11e8-9b34-d36013741fb9"; + let deadline = "1542409706888"; + + let next_request = server.mock(|when, then| { + when.method(GET).path("/2018-06-01/runtime/invocation/next"); + then.status(200) + .header("content-type", "application/json") + .header("lambda-runtime-aws-request-id", request_id) + .header("lambda-runtime-deadline-ms", deadline) + .body("{}"); + }); + + let next_response = server.mock(|when, then| { + when.method(POST) + .path(format!("/2018-06-01/runtime/invocation/{}/error", request_id)) + .header("lambda-runtime-function-error-type", "unhandled"); + then.status(200).body(""); }); - let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; - let client = Client::builder() - .with_endpoint(base) - .with_connector(conn) - .build() - .expect("Unable to build client"); + let base = server.base_url().parse().expect("Invalid mock server Uri"); + let client = Client::builder().with_endpoint(base).build()?; let f = crate::service_fn(func); @@ -582,11 +473,9 @@ mod endpoint_tests { let incoming = incoming(client).take(1); runtime.run(incoming, f).await?; - match server.await { - Ok(_) => Ok(()), - Err(e) if e.is_panic() => Err::<(), Error>(e.into()), - Err(_) => unreachable!("This branch shouldn't be reachable"), - } + next_request.assert_async().await; + next_response.assert_async().await; + Ok(()) } #[tokio::test] diff --git a/lambda-runtime/src/simulated.rs b/lambda-runtime/src/simulated.rs deleted file mode 100644 index 018664fe..00000000 --- a/lambda-runtime/src/simulated.rs +++ /dev/null @@ -1,133 +0,0 @@ -use http::Uri; -use hyper::rt::{Read, Write}; -use hyper_util::client::legacy::connect::{Connected, Connection}; -use pin_project_lite::pin_project; -use std::{ - collections::HashMap, - future::Future, - pin::Pin, - sync::{Arc, Mutex}, - task::{Context, Poll}, -}; -use tokio::io::DuplexStream; - -use crate::Error; - -#[derive(Clone)] -pub struct Connector { - inner: Arc>>, -} - -pin_project! { -pub struct DuplexStreamWrapper { - #[pin] - inner: DuplexStream, -} -} - -impl DuplexStreamWrapper { - pub(crate) fn new(inner: DuplexStream) -> DuplexStreamWrapper { - DuplexStreamWrapper { inner } - } -} - -impl Connector { - pub fn new() -> Self { - #[allow(clippy::mutable_key_type)] - let map = HashMap::new(); - Connector { - inner: Arc::new(Mutex::new(map)), - } - } - - pub fn insert(&self, uri: Uri, stream: DuplexStreamWrapper) -> Result<(), Error> { - match self.inner.lock() { - Ok(mut map) => { - map.insert(uri, stream); - Ok(()) - } - Err(_) => Err("mutex was poisoned".into()), - } - } - - pub fn with(uri: Uri, stream: DuplexStreamWrapper) -> Result { - let connector = Connector::new(); - match connector.insert(uri, stream) { - Ok(_) => Ok(connector), - Err(e) => Err(e), - } - } -} - -impl tower::Service for Connector { - type Response = DuplexStreamWrapper; - type Error = crate::Error; - #[allow(clippy::type_complexity)] - type Future = Pin> + Send>>; - - fn call(&mut self, uri: Uri) -> Self::Future { - let res = match self.inner.lock() { - Ok(mut map) if map.contains_key(&uri) => Ok(map.remove(&uri).unwrap()), - Ok(_) => Err(format!("Uri {uri} is not in map").into()), - Err(_) => Err("mutex was poisoned".into()), - }; - Box::pin(async move { res }) - } - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } -} - -impl Connection for DuplexStreamWrapper { - fn connected(&self) -> Connected { - Connected::new() - } -} - -impl Read for DuplexStreamWrapper { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut buf: hyper::rt::ReadBufCursor<'_>, - ) -> Poll> { - let n = unsafe { - let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); - match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { - Poll::Ready(Ok(())) => tbuf.filled().len(), - other => return other, - } - }; - - unsafe { - buf.advance(n); - } - Poll::Ready(Ok(())) - } -} - -impl Write for DuplexStreamWrapper { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) - } - - fn is_write_vectored(&self) -> bool { - tokio::io::AsyncWrite::is_write_vectored(&self.inner) - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[std::io::IoSlice<'_>], - ) -> Poll> { - tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) - } -}