Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update: swap pin-project for pin-project-lite #1503

Merged
merged 8 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,32 @@ references = ["aws-sdk-rust#560", "smithy-rs#1487"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
author = "rcoh"

[[smithy-rs]]
message = """
Replaced use of `pin-project` with equivalent `pin-project-lite`. For pinned enum tuple variants and tuple structs, this
change requires that we switch to using enum struct variants and regular structs. Most of the structs and enums that
were updated had only private fields/variants and so have the same public API. However, this change does affect the
public API of `aws_smithy_http_tower::map_request::MapRequestFuture<F, E>`. The `Inner` and `Ready` variants contained a
single value. Each have been converted to struct variants and the inner value is now accessible by the `inner` field
instead of the `0` field.
"""
references = ["smithy-rs#932"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "Velfi"

[[aws-sdk-rust]]
message = """
Replaced use of `pin-project` with equivalent `pin-project-lite`. For pinned enum tuple variants and tuple structs, this
change requires that we switch to using enum struct variants and regular structs. Most of the structs and enums that
were updated had only private fields/variants and so have the same public API. However, this change does affect the
public API of `aws_smithy_http_tower::map_request::MapRequestFuture<F, E>`. The `Inner` and `Ready` variants contained a
single value. Each have been converted to struct variants and the inner value is now accessible by the `inner` field
instead of the `0` field.
"""
references = ["smithy-rs#932"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "Velfi"

[[aws-sdk-rust]]
message = "Add comments for docker settings needed when using this sdk"
references = ["aws-sdk-rust#540"]
Expand Down
13 changes: 4 additions & 9 deletions rust-runtime/aws-smithy-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ client-hyper = ["hyper"]
aws-smithy-async = { path = "../aws-smithy-async" }
aws-smithy-http = { path = "../aws-smithy-http" }
aws-smithy-http-tower = { path = "../aws-smithy-http-tower" }
aws-smithy-protocol-test = { path = "../aws-smithy-protocol-test", optional = true }
aws-smithy-types = { path = "../aws-smithy-types" }
bytes = "1"
fastrand = "1.4.0"
Expand All @@ -28,23 +29,17 @@ hyper-rustls = { version = "0.22.1", optional = true, features = ["rustls-native
hyper-tls = { version = "0.5.0", optional = true }
lazy_static = { version = "1", optional = true }
pin-project-lite = "0.2.7"
# tokio but with no features enabled (traits only)
serde = { version = "1", features = ["derive"], optional = true }
tokio = { version = "1"}
tower = { version = "0.4.6", features = ["util", "retry"] }

pin-project = "1"
tracing = "0.1"

aws-smithy-protocol-test = { path = "../aws-smithy-protocol-test", optional = true }
serde = { version = "1", features = ["derive"], optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["full", "test-util"] }
aws-smithy-async = { path = "../aws-smithy-async", features = ["rt-tokio"] }

tower-test = "0.4.0"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full", "test-util"] }
tower-test = "0.4.0"
tracing-test = "0.2.1"

[package.metadata.docs.rs]
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-http-tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repository = "https://github.com/awslabs/smithy-rs"
[dependencies]
aws-smithy-http = { path = "../aws-smithy-http" }
tower = { version = "0.4.4" }
pin-project = "1"
pin-project-lite = "0.2.9"
http = "0.2.3"
bytes = "1"
http-body = "0.4.4"
Expand Down
25 changes: 16 additions & 9 deletions rust-runtime/aws-smithy-http-tower/src/map_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use crate::SendOperationError;
use aws_smithy_http::middleware::{AsyncMapRequest, MapRequest};
use aws_smithy_http::operation;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -97,10 +97,15 @@ where
}
}

#[pin_project(project = EnumProj)]
pub enum MapRequestFuture<F, E> {
Inner(#[pin] F),
Ready(Option<E>),
pin_project! {
#[project = EnumProj]
pub enum MapRequestFuture<F, E> {
Inner {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really related to your changes, but the name Inner is kind of confusing to me here. Maybe we should have called it Pending or something? Is there a broader idiomatic pattern for this with futures?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at it, I would name the variants Pending and Complete. @rcoh what do you think?

#[pin]
inner: F
},
Ready { inner: Option<E> },
}
}

impl<O, F, E> Future for MapRequestFuture<F, E>
Expand All @@ -111,8 +116,8 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
EnumProj::Ready(e) => Poll::Ready(Err(e.take().unwrap())),
EnumProj::Inner(f) => f.poll(cx),
EnumProj::Inner { inner: f } => f.poll(cx),
EnumProj::Ready { inner: e } => Poll::Ready(Err(e.take().unwrap())),
}
}
}
Expand Down Expand Up @@ -143,8 +148,10 @@ where
.apply(req)
.map_err(|e| SendOperationError::RequestConstructionError(e.into()))
{
Err(e) => MapRequestFuture::Ready(Some(e)),
Ok(req) => MapRequestFuture::Inner(self.inner.call(req)),
Err(e) => MapRequestFuture::Ready { inner: Some(e) },
Ok(req) => MapRequestFuture::Inner {
inner: self.inner.call(req),
},
}
}
}
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ http = "0.2.3"
http-body = "0.4.4"
once_cell = "1.10"
percent-encoding = "2.1.0"
pin-project = "1"
pin-project-lite = "0.2.9"
tracing = "0.1"

# We are using hyper for our streaming body implementation, but this is an internal detail.
Expand Down
137 changes: 81 additions & 56 deletions rust-runtime/aws-smithy-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body::{Body, SizeHint};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::error::Error as StdError;
use std::fmt::{self, Debug, Formatter};
use std::pin::Pin;
Expand All @@ -18,26 +18,27 @@ use crate::header::append_merge_header_maps;

pub type Error = Box<dyn StdError + Send + Sync>;

/// SdkBody type
///
/// This is the Body used for dispatching all HTTP Requests.
/// For handling responses, the type of the body will be controlled
/// by the HTTP stack.
///
/// TODO(naming): Consider renaming to simply `Body`, although I'm concerned about naming headaches
/// between hyper::Body and our Body
#[pin_project]
pub struct SdkBody {
#[pin]
inner: Inner,
/// An optional function to recreate the inner body
pin_project! {
/// SdkBody type
///
/// In the event of retry, this function will be called to generate a new body. See
/// [`try_clone()`](SdkBody::try_clone)
rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
/// A list of callbacks that will be called at various points of this `SdkBody`'s lifecycle
#[pin]
callbacks: Vec<Box<dyn BodyCallback>>,
/// This is the Body used for dispatching all HTTP Requests.
/// For handling responses, the type of the body will be controlled
/// by the HTTP stack.
///
/// TODO(naming): Consider renaming to simply `Body`, although I'm concerned about naming headaches
/// between hyper::Body and our Body
pub struct SdkBody {
#[pin]
inner: Inner,
// An optional function to recreate the inner body
//
// In the event of retry, this function will be called to generate a new body. See
// [`try_clone()`](SdkBody::try_clone)
rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
// A list of callbacks that will be called at various points of this `SdkBody`'s lifecycle
#[pin]
callbacks: Vec<Box<dyn BodyCallback>>,
}
}

impl Debug for SdkBody {
Expand All @@ -49,27 +50,39 @@ impl Debug for SdkBody {
}
}

type BoxBody = http_body::combinators::BoxBody<Bytes, Error>;

#[pin_project(project = InnerProj)]
enum Inner {
Once(#[pin] Option<Bytes>),
Streaming(#[pin] hyper::Body),
Dyn(#[pin] BoxBody),

/// When a streaming body is transferred out to a stream parser, the body is replaced with
/// `Taken`. This will return an Error when polled. Attempting to read data out of a `Taken`
/// Body is a bug.
Taken,
pub type BoxBody = http_body::combinators::BoxBody<Bytes, Error>;

pin_project! {
#[project = InnerProj]
enum Inner {
Once {
inner: Option<Bytes>
},
Streaming {
#[pin]
inner: hyper::Body
},
Dyn {
#[pin]
inner: BoxBody
},

/// When a streaming body is transferred out to a stream parser, the body is replaced with
/// `Taken`. This will return an Error when polled. Attempting to read data out of a `Taken`
/// Body is a bug.
Taken,
}
}

impl Debug for Inner {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match &self {
Inner::Once(once) => f.debug_tuple("Once").field(once).finish(),
Inner::Streaming(streaming) => f.debug_tuple("Streaming").field(streaming).finish(),
Inner::Once { inner: once } => f.debug_tuple("Once").field(once).finish(),
Inner::Streaming { inner: streaming } => {
f.debug_tuple("Streaming").field(streaming).finish()
}
Inner::Taken => f.debug_tuple("Taken").finish(),
Inner::Dyn(_) => write!(f, "BoxBody"),
Inner::Dyn { .. } => write!(f, "BoxBody"),
}
}
}
Expand All @@ -78,7 +91,7 @@ impl SdkBody {
/// Construct an SdkBody from a Boxed implementation of http::Body
pub fn from_dyn(body: BoxBody) -> Self {
Self {
inner: Inner::Dyn(body),
inner: Inner::Dyn { inner: body },
rebuild: None,
callbacks: Vec::new(),
}
Expand Down Expand Up @@ -111,8 +124,8 @@ impl SdkBody {

pub fn empty() -> Self {
Self {
inner: Inner::Once(None),
rebuild: Some(Arc::new(|| Inner::Once(None))),
inner: Inner::Once { inner: None },
rebuild: Some(Arc::new(|| Inner::Once { inner: None })),
callbacks: Vec::new(),
}
}
Expand All @@ -123,16 +136,16 @@ impl SdkBody {
) -> Poll<Option<Result<Bytes, Error>>> {
let mut this = self.project();
let polling_result = match this.inner.project() {
InnerProj::Once(ref mut opt) => {
let data = opt.take();
InnerProj::Once { ref mut inner } => {
let data = inner.take();
match data {
Some(bytes) if bytes.is_empty() => Poll::Ready(None),
Some(bytes) => Poll::Ready(Some(Ok(bytes))),
None => Poll::Ready(None),
}
}
InnerProj::Streaming(body) => body.poll_data(cx).map_err(|e| e.into()),
InnerProj::Dyn(box_body) => box_body.poll_data(cx),
InnerProj::Streaming { inner: body } => body.poll_data(cx).map_err(|e| e.into()),
InnerProj::Dyn { inner: box_body } => box_body.poll_data(cx),
InnerProj::Taken => {
Poll::Ready(Some(Err("A `Taken` body should never be polled".into())))
}
Expand Down Expand Up @@ -167,8 +180,8 @@ impl SdkBody {
/// If this SdkBody is streaming, this will return `None`
pub fn bytes(&self) -> Option<&[u8]> {
match &self.inner {
Inner::Once(Some(b)) => Some(b),
Inner::Once(None) => Some(&[]),
Inner::Once { inner: Some(b) } => Some(b),
Inner::Once { inner: None } => Some(&[]),
_ => None,
}
}
Expand All @@ -187,13 +200,21 @@ impl SdkBody {
}

pub fn content_length(&self) -> Option<u64> {
self.size_hint().exact()
http_body::Body::size_hint(self).exact()
}

pub fn with_callback(&mut self, callback: Box<dyn BodyCallback>) -> &mut Self {
self.callbacks.push(callback);
self
}

pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody {
if self.rebuild.is_some() {
SdkBody::retryable(move || f(self.try_clone().unwrap()))
} else {
f(self)
}
}
}

impl From<&str> for SdkBody {
Expand All @@ -205,8 +226,12 @@ impl From<&str> for SdkBody {
impl From<Bytes> for SdkBody {
fn from(bytes: Bytes) -> Self {
SdkBody {
inner: Inner::Once(Some(bytes.clone())),
rebuild: Some(Arc::new(move || Inner::Once(Some(bytes.clone())))),
inner: Inner::Once {
inner: Some(bytes.clone()),
},
rebuild: Some(Arc::new(move || Inner::Once {
inner: Some(bytes.clone()),
})),
callbacks: Vec::new(),
}
}
Expand All @@ -215,7 +240,7 @@ impl From<Bytes> for SdkBody {
impl From<hyper::Body> for SdkBody {
fn from(body: hyper::Body) -> Self {
SdkBody {
inner: Inner::Streaming(body),
inner: Inner::Streaming { inner: body },
rebuild: None,
callbacks: Vec::new(),
}
Expand Down Expand Up @@ -283,20 +308,20 @@ impl http_body::Body for SdkBody {

fn is_end_stream(&self) -> bool {
match &self.inner {
Inner::Once(None) => true,
Inner::Once(Some(bytes)) => bytes.is_empty(),
Inner::Streaming(hyper_body) => hyper_body.is_end_stream(),
Inner::Dyn(box_body) => box_body.is_end_stream(),
Inner::Once { inner: None } => true,
Inner::Once { inner: Some(bytes) } => bytes.is_empty(),
Inner::Streaming { inner: hyper_body } => hyper_body.is_end_stream(),
Inner::Dyn { inner: box_body } => box_body.is_end_stream(),
Inner::Taken => true,
}
}

fn size_hint(&self) -> SizeHint {
match &self.inner {
Inner::Once(None) => SizeHint::with_exact(0),
Inner::Once(Some(bytes)) => SizeHint::with_exact(bytes.len() as u64),
Inner::Streaming(hyper_body) => hyper_body.size_hint(),
Inner::Dyn(box_body) => box_body.size_hint(),
Inner::Once { inner: None } => SizeHint::with_exact(0),
Inner::Once { inner: Some(bytes) } => SizeHint::with_exact(bytes.len() as u64),
Inner::Streaming { inner: hyper_body } => hyper_body.size_hint(),
Inner::Dyn { inner: box_body } => box_body.size_hint(),
Inner::Taken => SizeHint::new(),
}
}
Expand Down
Loading