From 7dfd6097c3f0391cf51394d8d897d06ddc1eea70 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Wed, 13 Dec 2023 15:57:00 -0500 Subject: [PATCH] Add support for constructing sdk body types from http-body 1.0 (#3300) ## Motivation and Context - aws-sdk-rust#977 ## Description The first of several PRs to make add support for Hyper 1.0. This minimal change allows the use of Hyper 1.0 bodies although it does not actually leverage Hyper 1.0. ## Testing - Unit test suite - Ran cargo hack check --feature-powerset because we had a lot of features. I found a couple of issues. ## Checklist - [x] I have updated `CHANGELOG.next.toml` if I made changes to the smithy-rs codegen or runtime crates - [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS SDK, generated SDK code, or SDK runtime crates ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._ --------- Co-authored-by: John DiSanti --- CHANGELOG.next.toml | 12 + rust-runtime/aws-smithy-types/Cargo.toml | 4 + rust-runtime/aws-smithy-types/additional-ci | 3 + rust-runtime/aws-smithy-types/src/body.rs | 31 +- .../src/body/http_body_0_4_x.rs | 16 +- .../src/body/http_body_1_x.rs | 267 ++++++++++++++++++ .../aws-smithy-types/src/byte_stream.rs | 2 + .../src/byte_stream/bytestream_util.rs | 5 +- .../src/byte_stream/http_body_1_x.rs | 21 ++ 9 files changed, 347 insertions(+), 14 deletions(-) create mode 100644 rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs create mode 100644 rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 77a969b194..42e5b69a62 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -97,3 +97,15 @@ message = "[`Number`](https://docs.rs/aws-smithy-types/latest/aws_smithy_types/e references = ["smithy-rs#3294"] meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "all" } author = "rcoh" + +[[smithy-rs]] +message = "Add support for constructing [`SdkBody`] and [`ByteStream`] from `http-body` 1.0 bodies. Note that this is initial support and works via a backwards compatibility shim to http-body 0.4. Hyper 1.0 is not supported." +references = ["smithy-rs#3300", "aws-sdk-rust#977"] +meta = { "breaking" = false, "tada" = true, "bug" = false, "target" = "all" } +author = "rcoh" + +[[aws-sdk-rust]] +message = "Add support for constructing [`SdkBody`] and [`ByteStream`] from `http-body` 1.0 bodies. Note that this is initial support and works via a backwards compatibility shim to http-body 0.4. Hyper 1.0 is not supported." +references = ["smithy-rs#3300", "aws-sdk-rust#977"] +meta = { "breaking" = false, "tada" = true, "bug" = false } +author = "rcoh" diff --git a/rust-runtime/aws-smithy-types/Cargo.toml b/rust-runtime/aws-smithy-types/Cargo.toml index a46f9924fa..7118fb5dba 100644 --- a/rust-runtime/aws-smithy-types/Cargo.toml +++ b/rust-runtime/aws-smithy-types/Cargo.toml @@ -13,6 +13,7 @@ repository = "https://github.com/smithy-lang/smithy-rs" [features] byte-stream-poll-next = [] http-body-0-4-x = ["dep:http-body-0-4"] +http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "dep:http-body-0-4", "dep:http-1x"] hyper-0-14-x = ["dep:hyper-0-14"] rt-tokio = [ "dep:http-body-0-4", @@ -32,7 +33,10 @@ base64-simd = "0.8" bytes = "1" bytes-utils = "0.1" http = "0.2.3" +http-1x = { package = "http", version = "1", optional = true } http-body-0-4 = { package = "http-body", version = "0.4.4", optional = true } +http-body-1-0 = { package = "http-body", version = "1", optional = true } +http-body-util = { version = "0.1.0", optional = true } hyper-0-14 = { package = "hyper", version = "0.14.26", optional = true } itoa = "1.0.0" num-integer = "0.1.44" diff --git a/rust-runtime/aws-smithy-types/additional-ci b/rust-runtime/aws-smithy-types/additional-ci index 2d9305d510..7ce1a378ac 100755 --- a/rust-runtime/aws-smithy-types/additional-ci +++ b/rust-runtime/aws-smithy-types/additional-ci @@ -12,3 +12,6 @@ cargo tree -d --edges normal --all-features echo "### Checking whether the features are properly feature-gated" ! cargo tree -e no-dev | grep serde + +echo "### Checking feature powerset" +cargo hack check --feature-powerset --exclude-all-features diff --git a/rust-runtime/aws-smithy-types/src/body.rs b/rust-runtime/aws-smithy-types/src/body.rs index 33e5912a93..9798524387 100644 --- a/rust-runtime/aws-smithy-types/src/body.rs +++ b/rust-runtime/aws-smithy-types/src/body.rs @@ -19,6 +19,8 @@ use std::task::{Context, Poll}; /// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`. #[cfg(feature = "http-body-0-4-x")] pub mod http_body_0_4_x; +#[cfg(feature = "http-body-1-x")] +pub mod http_body_1_x; /// A generic, boxed error that's `Send` and `Sync` pub type Error = Box; @@ -55,7 +57,13 @@ impl Debug for SdkBody { /// A boxed generic HTTP body that, when consumed, will result in [`Bytes`] or an [`Error`]. enum BoxBody { - #[cfg(feature = "http-body-0-4-x")] + // This is enabled by the **dependency**, not the feature. This allows us to construct it + // whenever we have the dependency and keep the APIs private + #[cfg(any( + feature = "http-body-0-4-x", + feature = "http-body-1-x", + feature = "rt-tokio" + ))] HttpBody04(http_body_0_4::combinators::BoxBody), } @@ -162,6 +170,27 @@ impl SdkBody { } } + #[cfg(any( + feature = "http-body-0-4-x", + feature = "http-body-1-x", + feature = "rt-tokio" + ))] + pub(crate) fn from_body_0_4_internal(body: T) -> Self + where + T: http_body_0_4::Body + Send + Sync + 'static, + E: Into + 'static, + { + Self { + inner: Inner::Dyn { + inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new( + body.map_err(Into::into), + )), + }, + rebuild: None, + bytes_contents: None, + } + } + #[cfg(feature = "http-body-0-4-x")] pub(crate) fn poll_next_trailers( self: Pin<&mut Self>, diff --git a/rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs b/rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs index a6719d1fa2..d4eae4cf4c 100644 --- a/rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs +++ b/rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs @@ -3,11 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ -use crate::body::{BoxBody, Error, Inner, SdkBody}; -use bytes::Bytes; use std::pin::Pin; use std::task::{Context, Poll}; +use bytes::Bytes; + +use crate::body::{Error, SdkBody}; + impl SdkBody { /// Construct an `SdkBody` from a type that implements [`http_body_0_4::Body`](http_body_0_4::Body). /// @@ -17,15 +19,7 @@ impl SdkBody { T: http_body_0_4::Body + Send + Sync + 'static, E: Into + 'static, { - Self { - inner: Inner::Dyn { - inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new( - body.map_err(Into::into), - )), - }, - rebuild: None, - bytes_contents: None, - } + SdkBody::from_body_0_4_internal(body) } } diff --git a/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs b/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs new file mode 100644 index 0000000000..6b3278d987 --- /dev/null +++ b/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs @@ -0,0 +1,267 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Adapters to use http-body 1.0 bodies with SdkBody & ByteStream + +use std::pin::Pin; +use std::task::{ready, Context, Poll}; + +use bytes::Bytes; +use http_body_util::BodyExt; +use pin_project_lite::pin_project; + +use crate::body::{Error, SdkBody}; + +impl SdkBody { + /// Construct an `SdkBody` from a type that implements [`http_body_1_0::Body`](http_body_1_0::Body). + pub fn from_body_1_x(body: T) -> Self + where + T: http_body_1_0::Body + Send + Sync + 'static, + E: Into + 'static, + { + SdkBody::from_body_0_4_internal(Http1toHttp04::new(body.map_err(Into::into))) + } +} + +pin_project! { + struct Http1toHttp04 { + #[pin] + inner: B, + trailers: Option, + } +} + +impl Http1toHttp04 { + fn new(inner: B) -> Self { + Self { + inner, + trailers: None, + } + } +} + +impl http_body_0_4::Body for Http1toHttp04 +where + B: http_body_1_0::Body, +{ + type Data = B::Data; + type Error = B::Error; + + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + loop { + let this = self.as_mut().project(); + match ready!(this.inner.poll_frame(cx)) { + Some(Ok(frame)) => { + let frame = match frame.into_data() { + Ok(data) => return Poll::Ready(Some(Ok(data))), + Err(frame) => frame, + }; + // when we get a trailers frame, store the trailers for the next poll + if let Ok(trailers) = frame.into_trailers() { + this.trailers.replace(trailers); + return Poll::Ready(None); + }; + // if the frame type was unknown, discard it. the next one might be something + // useful + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => return Poll::Ready(None), + } + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + // all of the polling happens in poll_data, once we get to the trailers we've actually + // already read everything + let this = self.project(); + match this.trailers.take() { + Some(headers) => Poll::Ready(Ok(Some(convert_header_map(headers)))), + None => Poll::Ready(Ok(None)), + } + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } + + fn size_hint(&self) -> http_body_0_4::SizeHint { + let mut size_hint = http_body_0_4::SizeHint::new(); + let inner_hint = self.inner.size_hint(); + if let Some(exact) = inner_hint.exact() { + size_hint.set_exact(exact); + } else { + size_hint.set_lower(inner_hint.lower()); + if let Some(upper) = inner_hint.upper() { + size_hint.set_upper(upper); + } + } + size_hint + } +} + +fn convert_header_map(input: http_1x::HeaderMap) -> http::HeaderMap { + let mut map = http::HeaderMap::with_capacity(input.capacity()); + let mut mem: Option = None; + for (k, v) in input.into_iter() { + let name = k.or_else(|| mem.clone()).unwrap(); + map.append( + http::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"), + http::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"), + ); + mem = Some(name); + } + map +} + +#[cfg(test)] +mod test { + use std::collections::VecDeque; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use bytes::Bytes; + use http::header::{CONTENT_LENGTH as CL0, CONTENT_TYPE as CT0}; + use http_1x::header::{CONTENT_LENGTH as CL1, CONTENT_TYPE as CT1}; + use http_1x::{HeaderMap, HeaderName, HeaderValue}; + use http_body_1_0::Frame; + + use crate::body::http_body_1_x::convert_header_map; + use crate::body::{Error, SdkBody}; + use crate::byte_stream::ByteStream; + + struct TestBody { + chunks: VecDeque, + } + + enum Chunk { + Data(&'static str), + Error(&'static str), + Trailers(HeaderMap), + } + + impl http_body_1_0::Body for TestBody { + type Data = Bytes; + type Error = Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + let next = self.chunks.pop_front(); + let mk = |v: Frame| Poll::Ready(Some(Ok(v))); + + match next { + Some(Chunk::Data(s)) => mk(Frame::data(Bytes::from_static(s.as_bytes()))), + Some(Chunk::Trailers(headers)) => mk(Frame::trailers(headers)), + Some(Chunk::Error(err)) => Poll::Ready(Some(Err(err.into()))), + None => Poll::Ready(None), + } + } + } + + fn trailers() -> HeaderMap { + let mut map = HeaderMap::new(); + map.insert( + HeaderName::from_static("x-test"), + HeaderValue::from_static("x-test-value"), + ); + map.append( + HeaderName::from_static("x-test"), + HeaderValue::from_static("x-test-value-2"), + ); + map.append( + HeaderName::from_static("y-test"), + HeaderValue::from_static("y-test-value-2"), + ); + map + } + + #[tokio::test] + async fn test_body_with_trailers() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Trailers(trailers()), + ] + .into(), + }; + let body = SdkBody::from_body_1_x(body); + let data = ByteStream::new(body); + assert_eq!(data.collect().await.unwrap().to_vec(), b"123456789"); + } + + #[tokio::test] + async fn test_read_trailers() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Trailers(trailers()), + ] + .into(), + }; + let mut body = SdkBody::from_body_1_x(body); + while let Some(_data) = http_body_0_4::Body::data(&mut body).await {} + assert_eq!( + http_body_0_4::Body::trailers(&mut body).await.unwrap(), + Some(convert_header_map(trailers())) + ); + } + + #[tokio::test] + async fn test_errors() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Error("errors!"), + ] + .into(), + }; + + let body = SdkBody::from_body_1_x(body); + let body = ByteStream::new(body); + body.collect().await.expect_err("body returned an error"); + } + #[tokio::test] + async fn test_no_trailers() { + let body = TestBody { + chunks: vec![Chunk::Data("123"), Chunk::Data("456"), Chunk::Data("789")].into(), + }; + + let body = SdkBody::from_body_1_x(body); + let body = ByteStream::new(body); + assert_eq!(body.collect().await.unwrap().to_vec(), b"123456789"); + } + + #[test] + fn test_convert_headers() { + let mut http1_headermap = http_1x::HeaderMap::new(); + http1_headermap.append(CT1, HeaderValue::from_static("a")); + http1_headermap.append(CT1, HeaderValue::from_static("b")); + http1_headermap.append(CT1, HeaderValue::from_static("c")); + + http1_headermap.insert(CL1, HeaderValue::from_static("1234")); + + let mut expect = http::HeaderMap::new(); + expect.append(CT0, http::HeaderValue::from_static("a")); + expect.append(CT0, http::HeaderValue::from_static("b")); + expect.append(CT0, http::HeaderValue::from_static("c")); + + expect.insert(CL0, http::HeaderValue::from_static("1234")); + + assert_eq!(convert_header_map(http1_headermap), expect); + } +} diff --git a/rust-runtime/aws-smithy-types/src/byte_stream.rs b/rust-runtime/aws-smithy-types/src/byte_stream.rs index 2721b1b6b2..3acb18b562 100644 --- a/rust-runtime/aws-smithy-types/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-types/src/byte_stream.rs @@ -148,6 +148,8 @@ pub use self::bytestream_util::FsBuilder; /// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`. #[cfg(feature = "http-body-0-4-x")] pub mod http_body_0_4_x; +#[cfg(feature = "http-body-1-x")] +pub mod http_body_1_x; pin_project! { /// Stream of binary data diff --git a/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs b/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs index 39965c90c5..2ed7341eeb 100644 --- a/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs +++ b/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs @@ -193,7 +193,7 @@ impl FsBuilder { let body_loader = move || { // If an offset was provided, seeking will be handled in `PathBody::poll_data` each // time the file is loaded. - SdkBody::from_body_0_4(PathBody::from_path( + SdkBody::from_body_0_4_internal(PathBody::from_path( path.clone(), length, buffer_size, @@ -208,7 +208,8 @@ impl FsBuilder { let _s = file.seek(io::SeekFrom::Start(offset)).await?; } - let body = SdkBody::from_body_0_4(PathBody::from_file(file, length, buffer_size)); + let body = + SdkBody::from_body_0_4_internal(PathBody::from_file(file, length, buffer_size)); Ok(ByteStream::new(body)) } else { diff --git a/rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs b/rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs new file mode 100644 index 0000000000..bff8b201eb --- /dev/null +++ b/rust-runtime/aws-smithy-types/src/byte_stream/http_body_1_x.rs @@ -0,0 +1,21 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Adapters to use http-body 1.0 bodies with SdkBody & ByteStream + +use crate::body::SdkBody; +use crate::byte_stream::ByteStream; +use bytes::Bytes; + +impl ByteStream { + /// Construct a `ByteStream` from a type that implements [`http_body_1_0::Body`](http_body_1_0::Body). + pub fn from_body_1_x(body: T) -> Self + where + T: http_body_1_0::Body + Send + Sync + 'static, + E: Into + 'static, + { + ByteStream::new(SdkBody::from_body_1_x(body)) + } +}