Skip to content

Commit

Permalink
Update: swap pin-project for pin-project-lite (#1503)
Browse files Browse the repository at this point in the history
* update: swap pin-project for pin-project-lite
* remove: pin attribute from `aws_smithy_http::body::Inner::Once` enum variant
* remove: unnecessary usage of pin-project
* Update CHANGELOG.next.toml

Co-authored-by: John DiSanti <[email protected]>
  • Loading branch information
Velfi and jdisanti authored Jun 28, 2022
1 parent 23ca395 commit 3f10816
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 220 deletions.
26 changes: 26 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,32 @@ references = ["aws-sdk-rust#560", "smithy-rs#1487"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
author = "rcoh"

[[smithy-rs]]
message = """
Replaced use of `pin-project` with equivalent `pin-project-lite`. For pinned enum tuple variants and tuple structs, this
change requires that we switch to using enum struct variants and regular structs. Most of the structs and enums that
were updated had only private fields/variants and so have the same public API. However, this change does affect the
public API of `aws_smithy_http_tower::map_request::MapRequestFuture<F, E>`. The `Inner` and `Ready` variants contained a
single value. Each have been converted to struct variants and the inner value is now accessible by the `inner` field
instead of the `0` field.
"""
references = ["smithy-rs#932"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "Velfi"

[[aws-sdk-rust]]
message = """
Replaced use of `pin-project` with equivalent `pin-project-lite`. For pinned enum tuple variants and tuple structs, this
change requires that we switch to using enum struct variants and regular structs. Most of the structs and enums that
were updated had only private fields/variants and so have the same public API. However, this change does affect the
public API of `aws_smithy_http_tower::map_request::MapRequestFuture<F, E>`. The `Inner` and `Ready` variants contained a
single value. Each have been converted to struct variants and the inner value is now accessible by the `inner` field
instead of the `0` field.
"""
references = ["smithy-rs#932"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "Velfi"

[[aws-sdk-rust]]
message = "Add comments for docker settings needed when using this sdk"
references = ["aws-sdk-rust#540"]
Expand Down
13 changes: 4 additions & 9 deletions rust-runtime/aws-smithy-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ client-hyper = ["hyper"]
aws-smithy-async = { path = "../aws-smithy-async" }
aws-smithy-http = { path = "../aws-smithy-http" }
aws-smithy-http-tower = { path = "../aws-smithy-http-tower" }
aws-smithy-protocol-test = { path = "../aws-smithy-protocol-test", optional = true }
aws-smithy-types = { path = "../aws-smithy-types" }
bytes = "1"
fastrand = "1.4.0"
Expand All @@ -28,23 +29,17 @@ hyper-rustls = { version = "0.22.1", optional = true, features = ["rustls-native
hyper-tls = { version = "0.5.0", optional = true }
lazy_static = { version = "1", optional = true }
pin-project-lite = "0.2.7"
# tokio but with no features enabled (traits only)
serde = { version = "1", features = ["derive"], optional = true }
tokio = { version = "1"}
tower = { version = "0.4.6", features = ["util", "retry"] }

pin-project = "1"
tracing = "0.1"

aws-smithy-protocol-test = { path = "../aws-smithy-protocol-test", optional = true }
serde = { version = "1", features = ["derive"], optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["full", "test-util"] }
aws-smithy-async = { path = "../aws-smithy-async", features = ["rt-tokio"] }

tower-test = "0.4.0"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full", "test-util"] }
tower-test = "0.4.0"
tracing-test = "0.2.1"

[package.metadata.docs.rs]
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-http-tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repository = "https://github.com/awslabs/smithy-rs"
[dependencies]
aws-smithy-http = { path = "../aws-smithy-http" }
tower = { version = "0.4.4" }
pin-project = "1"
pin-project-lite = "0.2.9"
http = "0.2.3"
bytes = "1"
http-body = "0.4.4"
Expand Down
25 changes: 16 additions & 9 deletions rust-runtime/aws-smithy-http-tower/src/map_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use crate::SendOperationError;
use aws_smithy_http::middleware::{AsyncMapRequest, MapRequest};
use aws_smithy_http::operation;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -97,10 +97,15 @@ where
}
}

#[pin_project(project = EnumProj)]
pub enum MapRequestFuture<F, E> {
Inner(#[pin] F),
Ready(Option<E>),
pin_project! {
#[project = EnumProj]
pub enum MapRequestFuture<F, E> {
Inner {
#[pin]
inner: F
},
Ready { inner: Option<E> },
}
}

impl<O, F, E> Future for MapRequestFuture<F, E>
Expand All @@ -111,8 +116,8 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
EnumProj::Ready(e) => Poll::Ready(Err(e.take().unwrap())),
EnumProj::Inner(f) => f.poll(cx),
EnumProj::Inner { inner: f } => f.poll(cx),
EnumProj::Ready { inner: e } => Poll::Ready(Err(e.take().unwrap())),
}
}
}
Expand Down Expand Up @@ -143,8 +148,10 @@ where
.apply(req)
.map_err(|e| SendOperationError::RequestConstructionError(e.into()))
{
Err(e) => MapRequestFuture::Ready(Some(e)),
Ok(req) => MapRequestFuture::Inner(self.inner.call(req)),
Err(e) => MapRequestFuture::Ready { inner: Some(e) },
Ok(req) => MapRequestFuture::Inner {
inner: self.inner.call(req),
},
}
}
}
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ http = "0.2.3"
http-body = "0.4.4"
once_cell = "1.10"
percent-encoding = "2.1.0"
pin-project = "1"
pin-project-lite = "0.2.9"
tracing = "0.1"

# We are using hyper for our streaming body implementation, but this is an internal detail.
Expand Down
137 changes: 81 additions & 56 deletions rust-runtime/aws-smithy-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body::{Body, SizeHint};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::error::Error as StdError;
use std::fmt::{self, Debug, Formatter};
use std::pin::Pin;
Expand All @@ -18,26 +18,27 @@ use crate::header::append_merge_header_maps;

pub type Error = Box<dyn StdError + Send + Sync>;

/// SdkBody type
///
/// This is the Body used for dispatching all HTTP Requests.
/// For handling responses, the type of the body will be controlled
/// by the HTTP stack.
///
/// TODO(naming): Consider renaming to simply `Body`, although I'm concerned about naming headaches
/// between hyper::Body and our Body
#[pin_project]
pub struct SdkBody {
#[pin]
inner: Inner,
/// An optional function to recreate the inner body
pin_project! {
/// SdkBody type
///
/// In the event of retry, this function will be called to generate a new body. See
/// [`try_clone()`](SdkBody::try_clone)
rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
/// A list of callbacks that will be called at various points of this `SdkBody`'s lifecycle
#[pin]
callbacks: Vec<Box<dyn BodyCallback>>,
/// This is the Body used for dispatching all HTTP Requests.
/// For handling responses, the type of the body will be controlled
/// by the HTTP stack.
///
/// TODO(naming): Consider renaming to simply `Body`, although I'm concerned about naming headaches
/// between hyper::Body and our Body
pub struct SdkBody {
#[pin]
inner: Inner,
// An optional function to recreate the inner body
//
// In the event of retry, this function will be called to generate a new body. See
// [`try_clone()`](SdkBody::try_clone)
rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
// A list of callbacks that will be called at various points of this `SdkBody`'s lifecycle
#[pin]
callbacks: Vec<Box<dyn BodyCallback>>,
}
}

impl Debug for SdkBody {
Expand All @@ -49,27 +50,39 @@ impl Debug for SdkBody {
}
}

type BoxBody = http_body::combinators::BoxBody<Bytes, Error>;

#[pin_project(project = InnerProj)]
enum Inner {
Once(#[pin] Option<Bytes>),
Streaming(#[pin] hyper::Body),
Dyn(#[pin] BoxBody),

/// When a streaming body is transferred out to a stream parser, the body is replaced with
/// `Taken`. This will return an Error when polled. Attempting to read data out of a `Taken`
/// Body is a bug.
Taken,
pub type BoxBody = http_body::combinators::BoxBody<Bytes, Error>;

pin_project! {
#[project = InnerProj]
enum Inner {
Once {
inner: Option<Bytes>
},
Streaming {
#[pin]
inner: hyper::Body
},
Dyn {
#[pin]
inner: BoxBody
},

/// When a streaming body is transferred out to a stream parser, the body is replaced with
/// `Taken`. This will return an Error when polled. Attempting to read data out of a `Taken`
/// Body is a bug.
Taken,
}
}

impl Debug for Inner {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match &self {
Inner::Once(once) => f.debug_tuple("Once").field(once).finish(),
Inner::Streaming(streaming) => f.debug_tuple("Streaming").field(streaming).finish(),
Inner::Once { inner: once } => f.debug_tuple("Once").field(once).finish(),
Inner::Streaming { inner: streaming } => {
f.debug_tuple("Streaming").field(streaming).finish()
}
Inner::Taken => f.debug_tuple("Taken").finish(),
Inner::Dyn(_) => write!(f, "BoxBody"),
Inner::Dyn { .. } => write!(f, "BoxBody"),
}
}
}
Expand All @@ -78,7 +91,7 @@ impl SdkBody {
/// Construct an SdkBody from a Boxed implementation of http::Body
pub fn from_dyn(body: BoxBody) -> Self {
Self {
inner: Inner::Dyn(body),
inner: Inner::Dyn { inner: body },
rebuild: None,
callbacks: Vec::new(),
}
Expand Down Expand Up @@ -111,8 +124,8 @@ impl SdkBody {

pub fn empty() -> Self {
Self {
inner: Inner::Once(None),
rebuild: Some(Arc::new(|| Inner::Once(None))),
inner: Inner::Once { inner: None },
rebuild: Some(Arc::new(|| Inner::Once { inner: None })),
callbacks: Vec::new(),
}
}
Expand All @@ -123,16 +136,16 @@ impl SdkBody {
) -> Poll<Option<Result<Bytes, Error>>> {
let mut this = self.project();
let polling_result = match this.inner.project() {
InnerProj::Once(ref mut opt) => {
let data = opt.take();
InnerProj::Once { ref mut inner } => {
let data = inner.take();
match data {
Some(bytes) if bytes.is_empty() => Poll::Ready(None),
Some(bytes) => Poll::Ready(Some(Ok(bytes))),
None => Poll::Ready(None),
}
}
InnerProj::Streaming(body) => body.poll_data(cx).map_err(|e| e.into()),
InnerProj::Dyn(box_body) => box_body.poll_data(cx),
InnerProj::Streaming { inner: body } => body.poll_data(cx).map_err(|e| e.into()),
InnerProj::Dyn { inner: box_body } => box_body.poll_data(cx),
InnerProj::Taken => {
Poll::Ready(Some(Err("A `Taken` body should never be polled".into())))
}
Expand Down Expand Up @@ -167,8 +180,8 @@ impl SdkBody {
/// If this SdkBody is streaming, this will return `None`
pub fn bytes(&self) -> Option<&[u8]> {
match &self.inner {
Inner::Once(Some(b)) => Some(b),
Inner::Once(None) => Some(&[]),
Inner::Once { inner: Some(b) } => Some(b),
Inner::Once { inner: None } => Some(&[]),
_ => None,
}
}
Expand All @@ -187,13 +200,21 @@ impl SdkBody {
}

pub fn content_length(&self) -> Option<u64> {
self.size_hint().exact()
http_body::Body::size_hint(self).exact()
}

pub fn with_callback(&mut self, callback: Box<dyn BodyCallback>) -> &mut Self {
self.callbacks.push(callback);
self
}

pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody {
if self.rebuild.is_some() {
SdkBody::retryable(move || f(self.try_clone().unwrap()))
} else {
f(self)
}
}
}

impl From<&str> for SdkBody {
Expand All @@ -205,8 +226,12 @@ impl From<&str> for SdkBody {
impl From<Bytes> for SdkBody {
fn from(bytes: Bytes) -> Self {
SdkBody {
inner: Inner::Once(Some(bytes.clone())),
rebuild: Some(Arc::new(move || Inner::Once(Some(bytes.clone())))),
inner: Inner::Once {
inner: Some(bytes.clone()),
},
rebuild: Some(Arc::new(move || Inner::Once {
inner: Some(bytes.clone()),
})),
callbacks: Vec::new(),
}
}
Expand All @@ -215,7 +240,7 @@ impl From<Bytes> for SdkBody {
impl From<hyper::Body> for SdkBody {
fn from(body: hyper::Body) -> Self {
SdkBody {
inner: Inner::Streaming(body),
inner: Inner::Streaming { inner: body },
rebuild: None,
callbacks: Vec::new(),
}
Expand Down Expand Up @@ -283,20 +308,20 @@ impl http_body::Body for SdkBody {

fn is_end_stream(&self) -> bool {
match &self.inner {
Inner::Once(None) => true,
Inner::Once(Some(bytes)) => bytes.is_empty(),
Inner::Streaming(hyper_body) => hyper_body.is_end_stream(),
Inner::Dyn(box_body) => box_body.is_end_stream(),
Inner::Once { inner: None } => true,
Inner::Once { inner: Some(bytes) } => bytes.is_empty(),
Inner::Streaming { inner: hyper_body } => hyper_body.is_end_stream(),
Inner::Dyn { inner: box_body } => box_body.is_end_stream(),
Inner::Taken => true,
}
}

fn size_hint(&self) -> SizeHint {
match &self.inner {
Inner::Once(None) => SizeHint::with_exact(0),
Inner::Once(Some(bytes)) => SizeHint::with_exact(bytes.len() as u64),
Inner::Streaming(hyper_body) => hyper_body.size_hint(),
Inner::Dyn(box_body) => box_body.size_hint(),
Inner::Once { inner: None } => SizeHint::with_exact(0),
Inner::Once { inner: Some(bytes) } => SizeHint::with_exact(bytes.len() as u64),
Inner::Streaming { inner: hyper_body } => hyper_body.size_hint(),
Inner::Dyn { inner: box_body } => box_body.size_hint(),
Inner::Taken => SizeHint::new(),
}
}
Expand Down
Loading

0 comments on commit 3f10816

Please sign in to comment.