diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 780f18a25e..3eb05932a7 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -22,6 +22,32 @@ references = ["aws-sdk-rust#560", "smithy-rs#1487"] meta = { "breaking" = false, "tada" = false, "bug" = true } author = "rcoh" +[[smithy-rs]] +message = """ +Replaced use of `pin-project` with equivalent `pin-project-lite`. For pinned enum tuple variants and tuple structs, this +change requires that we switch to using enum struct variants and regular structs. Most of the structs and enums that +were updated had only private fields/variants and so have the same public API. However, this change does affect the +public API of `aws_smithy_http_tower::map_request::MapRequestFuture`. The `Inner` and `Ready` variants contained a +single value. Each have been converted to struct variants and the inner value is now accessible by the `inner` field +instead of the `0` field. +""" +references = ["smithy-rs#932"] +meta = { "breaking" = true, "tada" = false, "bug" = false } +author = "Velfi" + +[[aws-sdk-rust]] +message = """ +Replaced use of `pin-project` with equivalent `pin-project-lite`. For pinned enum tuple variants and tuple structs, this +change requires that we switch to using enum struct variants and regular structs. Most of the structs and enums that +were updated had only private fields/variants and so have the same public API. However, this change does affect the +public API of `aws_smithy_http_tower::map_request::MapRequestFuture`. The `Inner` and `Ready` variants contained a +single value. Each have been converted to struct variants and the inner value is now accessible by the `inner` field +instead of the `0` field. +""" +references = ["smithy-rs#932"] +meta = { "breaking" = true, "tada" = false, "bug" = false } +author = "Velfi" + [[aws-sdk-rust]] message = "Add comments for docker settings needed when using this sdk" references = ["aws-sdk-rust#540"] diff --git a/rust-runtime/aws-smithy-client/Cargo.toml b/rust-runtime/aws-smithy-client/Cargo.toml index 9dda44e9b0..7b035ce78c 100644 --- a/rust-runtime/aws-smithy-client/Cargo.toml +++ b/rust-runtime/aws-smithy-client/Cargo.toml @@ -18,6 +18,7 @@ client-hyper = ["hyper"] aws-smithy-async = { path = "../aws-smithy-async" } aws-smithy-http = { path = "../aws-smithy-http" } aws-smithy-http-tower = { path = "../aws-smithy-http-tower" } +aws-smithy-protocol-test = { path = "../aws-smithy-protocol-test", optional = true } aws-smithy-types = { path = "../aws-smithy-types" } bytes = "1" fastrand = "1.4.0" @@ -28,23 +29,17 @@ hyper-rustls = { version = "0.22.1", optional = true, features = ["rustls-native hyper-tls = { version = "0.5.0", optional = true } lazy_static = { version = "1", optional = true } pin-project-lite = "0.2.7" -# tokio but with no features enabled (traits only) +serde = { version = "1", features = ["derive"], optional = true } tokio = { version = "1"} tower = { version = "0.4.6", features = ["util", "retry"] } - -pin-project = "1" tracing = "0.1" -aws-smithy-protocol-test = { path = "../aws-smithy-protocol-test", optional = true } -serde = { version = "1", features = ["derive"], optional = true } - [dev-dependencies] -tokio = { version = "1", features = ["full", "test-util"] } aws-smithy-async = { path = "../aws-smithy-async", features = ["rt-tokio"] } - -tower-test = "0.4.0" serde = { version = "1", features = ["derive"] } serde_json = "1" +tokio = { version = "1", features = ["full", "test-util"] } +tower-test = "0.4.0" tracing-test = "0.2.1" [package.metadata.docs.rs] diff --git a/rust-runtime/aws-smithy-http-tower/Cargo.toml b/rust-runtime/aws-smithy-http-tower/Cargo.toml index 370b92b07e..4c7dcf357f 100644 --- a/rust-runtime/aws-smithy-http-tower/Cargo.toml +++ b/rust-runtime/aws-smithy-http-tower/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/awslabs/smithy-rs" [dependencies] aws-smithy-http = { path = "../aws-smithy-http" } tower = { version = "0.4.4" } -pin-project = "1" +pin-project-lite = "0.2.9" http = "0.2.3" bytes = "1" http-body = "0.4.4" diff --git a/rust-runtime/aws-smithy-http-tower/src/map_request.rs b/rust-runtime/aws-smithy-http-tower/src/map_request.rs index 0ab2f1a2cb..87ad0e6a09 100644 --- a/rust-runtime/aws-smithy-http-tower/src/map_request.rs +++ b/rust-runtime/aws-smithy-http-tower/src/map_request.rs @@ -6,7 +6,7 @@ use crate::SendOperationError; use aws_smithy_http::middleware::{AsyncMapRequest, MapRequest}; use aws_smithy_http::operation; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -97,10 +97,15 @@ where } } -#[pin_project(project = EnumProj)] -pub enum MapRequestFuture { - Inner(#[pin] F), - Ready(Option), +pin_project! { + #[project = EnumProj] + pub enum MapRequestFuture { + Inner { + #[pin] + inner: F + }, + Ready { inner: Option }, + } } impl Future for MapRequestFuture @@ -111,8 +116,8 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.project() { - EnumProj::Ready(e) => Poll::Ready(Err(e.take().unwrap())), - EnumProj::Inner(f) => f.poll(cx), + EnumProj::Inner { inner: f } => f.poll(cx), + EnumProj::Ready { inner: e } => Poll::Ready(Err(e.take().unwrap())), } } } @@ -143,8 +148,10 @@ where .apply(req) .map_err(|e| SendOperationError::RequestConstructionError(e.into())) { - Err(e) => MapRequestFuture::Ready(Some(e)), - Ok(req) => MapRequestFuture::Inner(self.inner.call(req)), + Err(e) => MapRequestFuture::Ready { inner: Some(e) }, + Ok(req) => MapRequestFuture::Inner { + inner: self.inner.call(req), + }, } } } diff --git a/rust-runtime/aws-smithy-http/Cargo.toml b/rust-runtime/aws-smithy-http/Cargo.toml index fe5334c0ab..d5b01cc5a4 100644 --- a/rust-runtime/aws-smithy-http/Cargo.toml +++ b/rust-runtime/aws-smithy-http/Cargo.toml @@ -23,7 +23,7 @@ http = "0.2.3" http-body = "0.4.4" once_cell = "1.10" percent-encoding = "2.1.0" -pin-project = "1" +pin-project-lite = "0.2.9" tracing = "0.1" # We are using hyper for our streaming body implementation, but this is an internal detail. diff --git a/rust-runtime/aws-smithy-http/src/body.rs b/rust-runtime/aws-smithy-http/src/body.rs index dda1af5228..e4e1b8aae0 100644 --- a/rust-runtime/aws-smithy-http/src/body.rs +++ b/rust-runtime/aws-smithy-http/src/body.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use http::{HeaderMap, HeaderValue}; use http_body::{Body, SizeHint}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::error::Error as StdError; use std::fmt::{self, Debug, Formatter}; use std::pin::Pin; @@ -18,26 +18,27 @@ use crate::header::append_merge_header_maps; pub type Error = Box; -/// SdkBody type -/// -/// This is the Body used for dispatching all HTTP Requests. -/// For handling responses, the type of the body will be controlled -/// by the HTTP stack. -/// -/// TODO(naming): Consider renaming to simply `Body`, although I'm concerned about naming headaches -/// between hyper::Body and our Body -#[pin_project] -pub struct SdkBody { - #[pin] - inner: Inner, - /// An optional function to recreate the inner body +pin_project! { + /// SdkBody type /// - /// In the event of retry, this function will be called to generate a new body. See - /// [`try_clone()`](SdkBody::try_clone) - rebuild: Option Inner) + Send + Sync>>, - /// A list of callbacks that will be called at various points of this `SdkBody`'s lifecycle - #[pin] - callbacks: Vec>, + /// This is the Body used for dispatching all HTTP Requests. + /// For handling responses, the type of the body will be controlled + /// by the HTTP stack. + /// + /// TODO(naming): Consider renaming to simply `Body`, although I'm concerned about naming headaches + /// between hyper::Body and our Body + pub struct SdkBody { + #[pin] + inner: Inner, + // An optional function to recreate the inner body + // + // In the event of retry, this function will be called to generate a new body. See + // [`try_clone()`](SdkBody::try_clone) + rebuild: Option Inner) + Send + Sync>>, + // A list of callbacks that will be called at various points of this `SdkBody`'s lifecycle + #[pin] + callbacks: Vec>, + } } impl Debug for SdkBody { @@ -49,27 +50,39 @@ impl Debug for SdkBody { } } -type BoxBody = http_body::combinators::BoxBody; - -#[pin_project(project = InnerProj)] -enum Inner { - Once(#[pin] Option), - Streaming(#[pin] hyper::Body), - Dyn(#[pin] BoxBody), - - /// When a streaming body is transferred out to a stream parser, the body is replaced with - /// `Taken`. This will return an Error when polled. Attempting to read data out of a `Taken` - /// Body is a bug. - Taken, +pub type BoxBody = http_body::combinators::BoxBody; + +pin_project! { + #[project = InnerProj] + enum Inner { + Once { + inner: Option + }, + Streaming { + #[pin] + inner: hyper::Body + }, + Dyn { + #[pin] + inner: BoxBody + }, + + /// When a streaming body is transferred out to a stream parser, the body is replaced with + /// `Taken`. This will return an Error when polled. Attempting to read data out of a `Taken` + /// Body is a bug. + Taken, + } } impl Debug for Inner { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match &self { - Inner::Once(once) => f.debug_tuple("Once").field(once).finish(), - Inner::Streaming(streaming) => f.debug_tuple("Streaming").field(streaming).finish(), + Inner::Once { inner: once } => f.debug_tuple("Once").field(once).finish(), + Inner::Streaming { inner: streaming } => { + f.debug_tuple("Streaming").field(streaming).finish() + } Inner::Taken => f.debug_tuple("Taken").finish(), - Inner::Dyn(_) => write!(f, "BoxBody"), + Inner::Dyn { .. } => write!(f, "BoxBody"), } } } @@ -78,7 +91,7 @@ impl SdkBody { /// Construct an SdkBody from a Boxed implementation of http::Body pub fn from_dyn(body: BoxBody) -> Self { Self { - inner: Inner::Dyn(body), + inner: Inner::Dyn { inner: body }, rebuild: None, callbacks: Vec::new(), } @@ -111,8 +124,8 @@ impl SdkBody { pub fn empty() -> Self { Self { - inner: Inner::Once(None), - rebuild: Some(Arc::new(|| Inner::Once(None))), + inner: Inner::Once { inner: None }, + rebuild: Some(Arc::new(|| Inner::Once { inner: None })), callbacks: Vec::new(), } } @@ -123,16 +136,16 @@ impl SdkBody { ) -> Poll>> { let mut this = self.project(); let polling_result = match this.inner.project() { - InnerProj::Once(ref mut opt) => { - let data = opt.take(); + InnerProj::Once { ref mut inner } => { + let data = inner.take(); match data { Some(bytes) if bytes.is_empty() => Poll::Ready(None), Some(bytes) => Poll::Ready(Some(Ok(bytes))), None => Poll::Ready(None), } } - InnerProj::Streaming(body) => body.poll_data(cx).map_err(|e| e.into()), - InnerProj::Dyn(box_body) => box_body.poll_data(cx), + InnerProj::Streaming { inner: body } => body.poll_data(cx).map_err(|e| e.into()), + InnerProj::Dyn { inner: box_body } => box_body.poll_data(cx), InnerProj::Taken => { Poll::Ready(Some(Err("A `Taken` body should never be polled".into()))) } @@ -167,8 +180,8 @@ impl SdkBody { /// If this SdkBody is streaming, this will return `None` pub fn bytes(&self) -> Option<&[u8]> { match &self.inner { - Inner::Once(Some(b)) => Some(b), - Inner::Once(None) => Some(&[]), + Inner::Once { inner: Some(b) } => Some(b), + Inner::Once { inner: None } => Some(&[]), _ => None, } } @@ -187,13 +200,21 @@ impl SdkBody { } pub fn content_length(&self) -> Option { - self.size_hint().exact() + http_body::Body::size_hint(self).exact() } pub fn with_callback(&mut self, callback: Box) -> &mut Self { self.callbacks.push(callback); self } + + pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody { + if self.rebuild.is_some() { + SdkBody::retryable(move || f(self.try_clone().unwrap())) + } else { + f(self) + } + } } impl From<&str> for SdkBody { @@ -205,8 +226,12 @@ impl From<&str> for SdkBody { impl From for SdkBody { fn from(bytes: Bytes) -> Self { SdkBody { - inner: Inner::Once(Some(bytes.clone())), - rebuild: Some(Arc::new(move || Inner::Once(Some(bytes.clone())))), + inner: Inner::Once { + inner: Some(bytes.clone()), + }, + rebuild: Some(Arc::new(move || Inner::Once { + inner: Some(bytes.clone()), + })), callbacks: Vec::new(), } } @@ -215,7 +240,7 @@ impl From for SdkBody { impl From for SdkBody { fn from(body: hyper::Body) -> Self { SdkBody { - inner: Inner::Streaming(body), + inner: Inner::Streaming { inner: body }, rebuild: None, callbacks: Vec::new(), } @@ -283,20 +308,20 @@ impl http_body::Body for SdkBody { fn is_end_stream(&self) -> bool { match &self.inner { - Inner::Once(None) => true, - Inner::Once(Some(bytes)) => bytes.is_empty(), - Inner::Streaming(hyper_body) => hyper_body.is_end_stream(), - Inner::Dyn(box_body) => box_body.is_end_stream(), + Inner::Once { inner: None } => true, + Inner::Once { inner: Some(bytes) } => bytes.is_empty(), + Inner::Streaming { inner: hyper_body } => hyper_body.is_end_stream(), + Inner::Dyn { inner: box_body } => box_body.is_end_stream(), Inner::Taken => true, } } fn size_hint(&self) -> SizeHint { match &self.inner { - Inner::Once(None) => SizeHint::with_exact(0), - Inner::Once(Some(bytes)) => SizeHint::with_exact(bytes.len() as u64), - Inner::Streaming(hyper_body) => hyper_body.size_hint(), - Inner::Dyn(box_body) => box_body.size_hint(), + Inner::Once { inner: None } => SizeHint::with_exact(0), + Inner::Once { inner: Some(bytes) } => SizeHint::with_exact(bytes.len() as u64), + Inner::Streaming { inner: hyper_body } => hyper_body.size_hint(), + Inner::Dyn { inner: box_body } => box_body.size_hint(), Inner::Taken => SizeHint::new(), } } diff --git a/rust-runtime/aws-smithy-http/src/byte_stream.rs b/rust-runtime/aws-smithy-http/src/byte_stream.rs index e9b3c0ccc9..23a7e66c28 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream.rs @@ -126,7 +126,7 @@ use bytes::Buf; use bytes::Bytes; use bytes_utils::SegmentedBuf; use http_body::Body; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::error::Error as StdError; use std::fmt::{Debug, Formatter}; use std::io::IoSlice; @@ -141,128 +141,136 @@ pub use bytestream_util::Length; #[cfg(feature = "rt-tokio")] pub use self::bytestream_util::FsBuilder; -/// Stream of binary data -/// -/// `ByteStream` wraps a stream of binary data for ease of use. -/// -/// ## Getting data out of a `ByteStream` -/// -/// `ByteStream` provides two primary mechanisms for accessing the data: -/// 1. With `.collect()`: -/// -/// [`.collect()`](crate::byte_stream::ByteStream::collect) reads the complete ByteStream into memory and stores it in `AggregatedBytes`, -/// a non-contiguous ByteBuffer. -/// ```no_run -/// use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes}; -/// use aws_smithy_http::body::SdkBody; -/// use bytes::Buf; -/// async fn example() { -/// let stream = ByteStream::new(SdkBody::from("hello! This is some data")); -/// // Load data from the stream into memory: -/// let data = stream.collect().await.expect("error reading data"); -/// // collect returns a `bytes::Buf`: -/// println!("first chunk: {:?}", data.chunk()); -/// } -/// ``` -/// 2. Via [`impl Stream`](futures_core::Stream): -/// -/// _Note: An import of `StreamExt` is required to use `.try_next()`._ -/// -/// For use-cases where holding the entire ByteStream in memory is unnecessary, use the -/// `Stream` implementation: -/// ```no_run -/// # mod crc32 { -/// # pub struct Digest { } -/// # impl Digest { -/// # pub fn new() -> Self { Digest {} } -/// # pub fn write(&mut self, b: &[u8]) { } -/// # pub fn finish(&self) -> u64 { 6 } -/// # } -/// # } -/// use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes, Error}; -/// use aws_smithy_http::body::SdkBody; -/// use tokio_stream::StreamExt; -/// -/// async fn example() -> Result<(), Error> { -/// let mut stream = ByteStream::from(vec![1, 2, 3, 4, 5, 99]); -/// let mut digest = crc32::Digest::new(); -/// while let Some(bytes) = stream.try_next().await? { -/// digest.write(&bytes); -/// } -/// println!("digest: {}", digest.finish()); -/// Ok(()) -/// } -/// ``` -/// -/// 3. Via [`.into_async_read()`](crate::byte_stream::ByteStream::into_async_read): -/// -/// _Note: The `rt-tokio` feature must be active to use `.into_async_read()`._ -/// -/// It's possible to convert a `ByteStream` into a struct that implements [`tokio::io::AsyncRead`](tokio::io::AsyncRead). -/// Then, you can use pre-existing tools like [`tokio::io::BufReader`](tokio::io::BufReader): -/// ```no_run -/// use aws_smithy_http::byte_stream::ByteStream; -/// use aws_smithy_http::body::SdkBody; -/// use tokio::io::{AsyncBufReadExt, BufReader}; -/// #[cfg(feature = "rt-tokio")] -/// async fn example() -> std::io::Result<()> { -/// let stream = ByteStream::new(SdkBody::from("hello!\nThis is some data")); -/// // Wrap the stream in a BufReader -/// let buf_reader = BufReader::new(stream.into_async_read()); -/// let mut lines = buf_reader.lines(); -/// assert_eq!(lines.next_line().await?, Some("hello!".to_owned())); -/// assert_eq!(lines.next_line().await?, Some("This is some data".to_owned())); -/// assert_eq!(lines.next_line().await?, None); -/// Ok(()) -/// } -/// ``` -/// -/// ## Getting data into a ByteStream -/// ByteStreams can be created in one of three ways: -/// 1. **From in-memory binary data**: ByteStreams created from in-memory data are always retryable. Data -/// will be converted into `Bytes` enabling a cheap clone during retries. -/// ```no_run -/// use bytes::Bytes; -/// use aws_smithy_http::byte_stream::ByteStream; -/// let stream = ByteStream::from(vec![1,2,3]); -/// let stream = ByteStream::from(Bytes::from_static(b"hello!")); -/// ``` -/// -/// 2. **From a file**: ByteStreams created from a path can be retried. A new file descriptor will be opened if a retry occurs. -/// ```no_run -/// #[cfg(feature = "tokio-rt")] -/// # { -/// use aws_smithy_http::byte_stream::ByteStream; -/// let stream = ByteStream::from_path("big_file.csv"); -/// # } -/// ``` -/// -/// 3. **From an `SdkBody` directly**: For more advanced / custom use cases, a ByteStream can be created directly -/// from an SdkBody. **When created from an SdkBody, care must be taken to ensure retriability.** An SdkBody is retryable -/// when constructed from in-memory data or when using [`SdkBody::retryable`](crate::body::SdkBody::retryable). -/// ```no_run -/// use aws_smithy_http::byte_stream::ByteStream; -/// use aws_smithy_http::body::SdkBody; -/// use bytes::Bytes; -/// let (mut tx, channel_body) = hyper::Body::channel(); -/// // this will not be retryable because the SDK has no way to replay this stream -/// let stream = ByteStream::new(SdkBody::from(channel_body)); -/// tx.send_data(Bytes::from_static(b"hello world!")); -/// tx.send_data(Bytes::from_static(b"hello again!")); -/// // NOTE! You must ensure that `tx` is dropped to ensure that EOF is sent -/// ``` -/// -#[pin_project] -#[derive(Debug)] -pub struct ByteStream(#[pin] Inner); +pin_project! { + /// Stream of binary data + /// + /// `ByteStream` wraps a stream of binary data for ease of use. + /// + /// ## Getting data out of a `ByteStream` + /// + /// `ByteStream` provides two primary mechanisms for accessing the data: + /// 1. With `.collect()`: + /// + /// [`.collect()`](crate::byte_stream::ByteStream::collect) reads the complete ByteStream into memory and stores it in `AggregatedBytes`, + /// a non-contiguous ByteBuffer. + /// ```no_run + /// use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes}; + /// use aws_smithy_http::body::SdkBody; + /// use bytes::Buf; + /// async fn example() { + /// let stream = ByteStream::new(SdkBody::from("hello! This is some data")); + /// // Load data from the stream into memory: + /// let data = stream.collect().await.expect("error reading data"); + /// // collect returns a `bytes::Buf`: + /// println!("first chunk: {:?}", data.chunk()); + /// } + /// ``` + /// 2. Via [`impl Stream`](futures_core::Stream): + /// + /// _Note: An import of `StreamExt` is required to use `.try_next()`._ + /// + /// For use-cases where holding the entire ByteStream in memory is unnecessary, use the + /// `Stream` implementation: + /// ```no_run + /// # mod crc32 { + /// # pub struct Digest { } + /// # impl Digest { + /// # pub fn new() -> Self { Digest {} } + /// # pub fn write(&mut self, b: &[u8]) { } + /// # pub fn finish(&self) -> u64 { 6 } + /// # } + /// # } + /// use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes, Error}; + /// use aws_smithy_http::body::SdkBody; + /// use tokio_stream::StreamExt; + /// + /// async fn example() -> Result<(), Error> { + /// let mut stream = ByteStream::from(vec![1, 2, 3, 4, 5, 99]); + /// let mut digest = crc32::Digest::new(); + /// while let Some(bytes) = stream.try_next().await? { + /// digest.write(&bytes); + /// } + /// println!("digest: {}", digest.finish()); + /// Ok(()) + /// } + /// ``` + /// + /// 3. Via [`.into_async_read()`](crate::byte_stream::ByteStream::into_async_read): + /// + /// _Note: The `rt-tokio` feature must be active to use `.into_async_read()`._ + /// + /// It's possible to convert a `ByteStream` into a struct that implements [`tokio::io::AsyncRead`](tokio::io::AsyncRead). + /// Then, you can use pre-existing tools like [`tokio::io::BufReader`](tokio::io::BufReader): + /// ```no_run + /// use aws_smithy_http::byte_stream::ByteStream; + /// use aws_smithy_http::body::SdkBody; + /// use tokio::io::{AsyncBufReadExt, BufReader}; + /// #[cfg(feature = "rt-tokio")] + /// async fn example() -> std::io::Result<()> { + /// let stream = ByteStream::new(SdkBody::from("hello!\nThis is some data")); + /// // Wrap the stream in a BufReader + /// let buf_reader = BufReader::new(stream.into_async_read()); + /// let mut lines = buf_reader.lines(); + /// assert_eq!(lines.next_line().await?, Some("hello!".to_owned())); + /// assert_eq!(lines.next_line().await?, Some("This is some data".to_owned())); + /// assert_eq!(lines.next_line().await?, None); + /// Ok(()) + /// } + /// ``` + /// + /// ## Getting data into a ByteStream + /// ByteStreams can be created in one of three ways: + /// 1. **From in-memory binary data**: ByteStreams created from in-memory data are always retryable. Data + /// will be converted into `Bytes` enabling a cheap clone during retries. + /// ```no_run + /// use bytes::Bytes; + /// use aws_smithy_http::byte_stream::ByteStream; + /// let stream = ByteStream::from(vec![1,2,3]); + /// let stream = ByteStream::from(Bytes::from_static(b"hello!")); + /// ``` + /// + /// 2. **From a file**: ByteStreams created from a path can be retried. A new file descriptor will be opened if a retry occurs. + /// ```no_run + /// #[cfg(feature = "tokio-rt")] + /// # { + /// use aws_smithy_http::byte_stream::ByteStream; + /// let stream = ByteStream::from_path("big_file.csv"); + /// # } + /// ``` + /// + /// 3. **From an `SdkBody` directly**: For more advanced / custom use cases, a ByteStream can be created directly + /// from an SdkBody. **When created from an SdkBody, care must be taken to ensure retriability.** An SdkBody is retryable + /// when constructed from in-memory data or when using [`SdkBody::retryable`](crate::body::SdkBody::retryable). + /// ```no_run + /// use aws_smithy_http::byte_stream::ByteStream; + /// use aws_smithy_http::body::SdkBody; + /// use bytes::Bytes; + /// let (mut tx, channel_body) = hyper::Body::channel(); + /// // this will not be retryable because the SDK has no way to replay this stream + /// let stream = ByteStream::new(SdkBody::from(channel_body)); + /// tx.send_data(Bytes::from_static(b"hello world!")); + /// tx.send_data(Bytes::from_static(b"hello again!")); + /// // NOTE! You must ensure that `tx` is dropped to ensure that EOF is sent + /// ``` + /// + #[derive(Debug)] + pub struct ByteStream { + #[pin] + inner: Inner + } +} impl ByteStream { pub fn new(body: SdkBody) -> Self { - Self(Inner::new(body)) + Self { + inner: Inner::new(body), + } } pub fn from_static(bytes: &'static [u8]) -> Self { - Self(Inner::new(SdkBody::from(Bytes::from_static(bytes)))) + Self { + inner: Inner::new(SdkBody::from(Bytes::from_static(bytes))), + } } /// Consumes the ByteStream, returning the wrapped SdkBody @@ -270,7 +278,7 @@ impl ByteStream { // we will always be able to implement this method, even if we stop using // SdkBody as the internal representation pub fn into_inner(self) -> SdkBody { - self.0.body + self.inner.body } /// Read all the data from this `ByteStream` into memory @@ -290,7 +298,7 @@ impl ByteStream { /// } /// ``` pub async fn collect(self) -> Result { - self.0.collect().await.map_err(|err| Error(err)) + self.inner.collect().await.map_err(|err| Error(err)) } /// Returns a [`FsBuilder`](crate::byte_stream::FsBuilder), allowing you to build a `ByteStream` with @@ -368,7 +376,7 @@ impl ByteStream { /// throughout this `ByteStream`'s life cycle. See the [`BodyCallback`](BodyCallback) trait for /// more information. pub fn with_body_callback(&mut self, body_callback: Box) -> &mut Self { - self.0.with_body_callback(body_callback); + self.inner.with_body_callback(body_callback); self } @@ -392,13 +400,19 @@ impl ByteStream { pub fn into_async_read(self) -> impl tokio::io::AsyncRead { tokio_util::io::StreamReader::new(self) } + + pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Send + Sync + 'static) -> ByteStream { + ByteStream::new(self.into_inner().map(f)) + } } impl Default for ByteStream { fn default() -> Self { - Self(Inner { - body: SdkBody::from(""), - }) + Self { + inner: Inner { + body: SdkBody::from(""), + }, + } } } @@ -458,11 +472,11 @@ impl futures_core::stream::Stream for ByteStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().0.poll_next(cx).map_err(|e| Error(e)) + self.project().inner.poll_next(cx).map_err(|e| Error(e)) } fn size_hint(&self) -> (usize, Option) { - self.0.size_hint() + self.inner.size_hint() } } @@ -513,11 +527,12 @@ impl Buf for AggregatedBytes { } } -#[pin_project] -#[derive(Debug, Clone, PartialEq, Eq)] -struct Inner { - #[pin] - body: B, +pin_project! { + #[derive(Debug, Clone, PartialEq, Eq)] + struct Inner { + #[pin] + body: B, + } } impl Inner { diff --git a/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs b/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs index 23036030be..00b445c571 100644 --- a/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs +++ b/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs @@ -358,8 +358,8 @@ mod test { assert_eq!(some_data.len(), 16384); assert_eq!( - ByteStream::new(body1).collect().await.unwrap().remaining(), - file_length as usize - some_data.len() + ByteStream::new(body1).collect().await.unwrap().remaining() as u64, + file_length - some_data.len() as u64 ); } @@ -577,7 +577,7 @@ mod test { let file_size = file.as_file().metadata().unwrap().len(); // Check that our in-memory copy has the same size as the file - assert_eq!(file_size as usize, in_memory_copy_of_file_contents.len()); + assert_eq!(file_size, in_memory_copy_of_file_contents.len() as u64); let file_path = file.path().to_path_buf(); let chunks = 7; let chunk_size = file_size / chunks; diff --git a/rust-runtime/aws-smithy-http/src/event_stream/input.rs b/rust-runtime/aws-smithy-http/src/event_stream/input.rs index 7107146c8d..afbc320cdd 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream/input.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream/input.rs @@ -8,7 +8,6 @@ use crate::result::SdkError; use aws_smithy_eventstream::frame::{MarshallMessage, SignMessage}; use bytes::Bytes; use futures_core::Stream; -use pin_project::pin_project; use std::error::Error as StdError; use std::fmt; use std::marker::PhantomData; @@ -53,16 +52,16 @@ where /// /// This will yield an `Err(SdkError::ConstructionFailure)` if a message can't be /// marshalled into an Event Stream frame, (e.g., if the message payload was too large). -#[pin_project] pub struct MessageStreamAdapter { marshaller: Box + Send + Sync>, signer: Box, - #[pin] stream: Pin> + Send>>, end_signal_sent: bool, _phantom: PhantomData, } +impl Unpin for MessageStreamAdapter {} + impl MessageStreamAdapter where E: StdError + Send + Sync + 'static, @@ -88,18 +87,17 @@ where { type Item = Result>; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - match this.stream.poll_next(cx) { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.stream.as_mut().poll_next(cx) { Poll::Ready(message_option) => { if let Some(message_result) = message_option { let message_result = message_result.map_err(|err| SdkError::ConstructionFailure(err)); - let message = this + let message = self .marshaller .marshall(message_result?) .map_err(|err| SdkError::ConstructionFailure(Box::new(err)))?; - let message = this + let message = self .signer .sign(message) .map_err(|err| SdkError::ConstructionFailure(err))?; @@ -108,10 +106,10 @@ where .write_to(&mut buffer) .map_err(|err| SdkError::ConstructionFailure(Box::new(err)))?; Poll::Ready(Some(Ok(Bytes::from(buffer)))) - } else if !*this.end_signal_sent { - *this.end_signal_sent = true; + } else if !self.end_signal_sent { + self.end_signal_sent = true; let mut buffer = Vec::new(); - this.signer + self.signer .sign_empty() .map_err(|err| SdkError::ConstructionFailure(err))? .write_to(&mut buffer)