Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add support for aws-chunked content encoding #1501

Merged
merged 15 commits into from
Jul 5, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 130 additions & 48 deletions aws/rust-runtime/aws-http/src/content_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,35 @@ pub struct AwsChunkedBodyOptions {
/// The total size of the stream. Because we only support unsigned encoding
/// this implies that there will only be a single chunk containing the
/// underlying payload.
pub stream_length: u64,
stream_length: u64,
/// The length of each trailer sent within an `AwsChunkedBody`. Necessary in
/// order to correctly calculate the total size of the body accurately.
pub trailer_lens: Vec<u64>,
trailer_lengths: Vec<u64>,
}

impl AwsChunkedBodyOptions {
/// Create a new [`AwsChunkedBodyOptions`][AwsChunkedBodyOptions]
pub fn new() -> Self {
Self::default()
pub fn new(stream_length: u64, trailer_lengths: Vec<u64>) -> Self {
Self {
stream_length,
trailer_lengths,
}
}

/// Set stream length
pub fn with_stream_length(mut self, stream_length: u64) -> Self {
self.stream_length = stream_length;
self
/// The total size of the stream. Because we only support unsigned encoding
/// this implies that there will only be a single chunk containing the
/// underlying payload.
pub fn stream_length(&self) -> u64 {
Velfi marked this conversation as resolved.
Show resolved Hide resolved
self.stream_length
}

fn total_trailer_length(&self) -> u64 {
self.trailer_lengths.iter().sum()
}

/// Set a trailer len
pub fn with_trailer_len(mut self, trailer_len: u64) -> Self {
self.trailer_lens.push(trailer_len);
self.trailer_lengths.push(trailer_len);
self
}
}
Expand All @@ -69,7 +77,8 @@ enum AwsChunkedBodyState {
}

pin_project! {
/// A request body compatible with `Content-Encoding: aws-chunked`
/// A request body compatible with `Content-Encoding: aws-chunked`. This implementation is only
/// capable of writing a single chunk and does not support signed chunks.
///
/// Chunked-Body grammar is defined in [ABNF] as:
///
Expand Down Expand Up @@ -108,7 +117,7 @@ impl<Inner> AwsChunkedBody<Inner> {
}
}

fn encoded_length(&self) -> Option<u64> {
fn encoded_length(&self) -> u64 {
let mut length = 0;
if self.options.stream_length != 0 {
length += get_unsigned_chunk_bytes_length(self.options.stream_length);
Expand All @@ -118,14 +127,14 @@ impl<Inner> AwsChunkedBody<Inner> {
length += CHUNK_TERMINATOR.len() as u64;

// Trailers
for len in self.options.trailer_lens.iter() {
for len in self.options.trailer_lengths.iter() {
length += len + CRLF.len() as u64;
}

// Encoding terminator
length += CRLF.len() as u64;

Some(length)
length
}
}

Expand All @@ -134,20 +143,10 @@ fn get_unsigned_chunk_bytes_length(payload_length: u64) -> u64 {
hex_repr_len + CRLF.len() as u64 + payload_length + CRLF.len() as u64
}

fn trailers_as_aws_chunked_bytes(
total_length_of_trailers_in_bytes: u64,
trailer_map: Option<HeaderMap>,
) -> bytes::Bytes {
fn trailers_as_aws_chunked_bytes(trailer_map: Option<HeaderMap>) -> Bytes {
use std::fmt::Write;

// On 32-bit operating systems, we might not be able to convert the u64 to a usize, so we just
// use `String::new` in that case.
let mut trailers = match usize::try_from(total_length_of_trailers_in_bytes) {
Ok(total_length_of_trailers_in_bytes) => {
String::with_capacity(total_length_of_trailers_in_bytes)
}
Err(_) => String::new(),
};
let mut trailers = String::new();
let mut already_wrote_first_trailer = false;

if let Some(trailer_map) = trailer_map {
Expand Down Expand Up @@ -177,15 +176,16 @@ fn trailers_as_aws_chunked_bytes(
// Write CRLF to end the body
trailers.write_str(CRLF).unwrap();
// If we wrote at least one trailer, we need to write an extra CRLF
if total_length_of_trailers_in_bytes != 0 {
if already_wrote_first_trailer {
trailers.write_str(CRLF).unwrap();
}

trailers.into()
}

impl<Inner: Body<Data = Bytes, Error = aws_smithy_http::body::Error>> Body
for AwsChunkedBody<Inner>
impl<Inner> Body for AwsChunkedBody<Inner>
where
Inner: Body<Data = Bytes, Error = aws_smithy_http::body::Error>,
{
type Data = Bytes;
type Error = aws_smithy_http::body::Error;
Expand All @@ -194,25 +194,35 @@ impl<Inner: Body<Data = Bytes, Error = aws_smithy_http::body::Error>> Body
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
tracing::trace!("polling AwsChunkedBody");
tracing::trace!("polling AwsChunkedBody (state = {:?})", self.state);
Velfi marked this conversation as resolved.
Show resolved Hide resolved
let mut this = self.project();

match *this.state {
AwsChunkedBodyState::WritingChunkSize => {
tracing::trace!("writing chunk size");
*this.state = AwsChunkedBodyState::WritingChunk;
// A chunk must be prefixed by chunk size in hexadecimal
let chunk_size = Bytes::from(format!("{:X?}\r\n", this.options.stream_length));
Poll::Ready(Some(Ok(chunk_size)))
if this.options.stream_length == 0 {
// If the stream is empty, we skip to writing trailers after writing the CHUNK_TERMINATOR.
*this.state = AwsChunkedBodyState::WritingTrailers;
tracing::trace!("stream is empty, writing chunk terminator");
Poll::Ready(Some(Ok(Bytes::from([CHUNK_TERMINATOR].concat()))))
} else {
*this.state = AwsChunkedBodyState::WritingChunk;
// A chunk must be prefixed by chunk size in hexadecimal
let chunk_size = format!("{:X?}{CRLF}", this.options.stream_length);
tracing::trace!("writing chunk size (size = 0x{})", &chunk_size);
Velfi marked this conversation as resolved.
Show resolved Hide resolved
let chunk_size = Bytes::from(chunk_size);
Poll::Ready(Some(Ok(chunk_size)))
}
}
AwsChunkedBodyState::WritingChunk => match this.inner.poll_data(cx) {
Poll::Ready(Some(Ok(data))) => {
tracing::trace!("writing chunk data");
tracing::trace!("writing chunk data (len = {})", data.len());
Poll::Ready(Some(Ok(data)))
}
Poll::Ready(None) => {
tracing::trace!("no more chunk data, writing CRLF and terminator chunk");
tracing::trace!("no more chunk data, writing CRLF and chunk terminator");
*this.state = AwsChunkedBodyState::WritingTrailers;
// Since we wrote chunk data, we end it with a CRLF and since we only write
// a single chunk, we write the CHUNK_TERMINATOR immediately after
Poll::Ready(Some(Ok(Bytes::from([CRLF, CHUNK_TERMINATOR].concat()))))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Expand All @@ -222,13 +232,13 @@ impl<Inner: Body<Data = Bytes, Error = aws_smithy_http::body::Error>> Body
return match this.inner.poll_trailers(cx) {
Poll::Ready(Ok(trailers)) => {
*this.state = AwsChunkedBodyState::Closed;
let total_length_of_trailers_in_bytes =
this.options.trailer_lens.iter().sum();
let expected_total_trailer_length =
total_rendered_length_of_trailers(trailers.as_ref());
let actual_total_trailer_length = this.options.total_trailer_length();
assert_eq!(expected_total_trailer_length, actual_total_trailer_length,
"while writing trailers, the expected length of trailers ({expected_total_trailer_length}) differed from the actual length ({actual_total_trailer_length})");
Velfi marked this conversation as resolved.
Show resolved Hide resolved

Poll::Ready(Some(Ok(trailers_as_aws_chunked_bytes(
total_length_of_trailers_in_bytes,
trailers,
))))
Poll::Ready(Some(Ok(trailers_as_aws_chunked_bytes(trailers))))
}
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Expand All @@ -251,11 +261,7 @@ impl<Inner: Body<Data = Bytes, Error = aws_smithy_http::body::Error>> Body
}

fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(
self.encoded_length()
.expect("Requests made with aws-chunked encoding must have known size")
as u64,
)
SizeHint::with_exact(self.encoded_length())
}
}

Expand All @@ -276,6 +282,30 @@ where
len
}

fn total_rendered_length_of_trailers(header_map: Option<&HeaderMap>) -> u64 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we should have a unit test that serializes the header map with the http crate and compares its length against this function. Especially with multiple headers of the same name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test that ensures our trailer serializer and trailer length calculator stay in sync

match header_map {
Some(header_map) => {
let mut total = 0;
for key in header_map.keys() {
total += key.as_str().len() + ": ".len();

let mut values = header_map.get_all(key).into_iter();

if let Some(value) = values.next() {
total += value.len() + CRLF.len();
}

for value in values {
total += value.len() + ",".len();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this match the actual serialization behavior for multiple values? I thought that hyper would add multiple of the same header name with a different value, but I'm not 100% sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it matches what Hyper would do in this situation but it I did test it with S3 and it accepted it. I can test an upload the hyper-style headers if you like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched this to work more similarly to how hyper serializes headers. The serialization is done manually.

}

total as u64
}
None => 0,
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -289,7 +319,7 @@ mod tests {
async fn test_aws_chunked_encoding() {
let test_fut = async {
let input_str = "Hello world";
let opts = AwsChunkedBodyOptions::new().with_stream_length(input_str.len() as u64);
let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);

let mut output = SegmentedBuf::new();
Expand Down Expand Up @@ -322,4 +352,56 @@ mod tests {
panic!("test_aws_chunked_encoding timed out after {timeout_duration:?}");
}
}

#[tokio::test]
#[should_panic = "assertion failed: `(left == right)`\n left: `0`,\n right: `42`: while writing trailers, the expected length of trailers (0) differed from the actual length (42)"]
async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
let input_str = "Hello world";
// Test body has no trailers, so this length is incorrect and will trigger an assert panic
let wrong_trailer_len = 42;
let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, vec![wrong_trailer_len]);
let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);

// We don't care about the body contents but we have to read it all before checking for trailers
while let Some(buf) = body.data().await {
drop(buf.unwrap());
}

assert!(
body.trailers()
.await
.expect("no errors occurred during trailer polling")
.is_none(),
"aws-chunked encoded bodies don't have normal HTTP trailers"
);
}

#[tokio::test]
async fn test_aws_chunked_encoding_empty_body() {
let input_str = "";
let opts = AwsChunkedBodyOptions::new(input_str.len() as u64, Vec::new());
let mut body = AwsChunkedBody::new(SdkBody::from(input_str), opts);

let mut output = SegmentedBuf::new();
while let Some(buf) = body.data().await {
output.push(buf.unwrap());
}

let mut actual_output = String::new();
output
.reader()
.read_to_string(&mut actual_output)
.expect("Doesn't cause IO errors");

let expected_output = [CHUNK_TERMINATOR, CRLF].concat();

assert_eq!(expected_output, actual_output);
assert!(
body.trailers()
.await
.expect("no errors occurred during trailer polling")
.is_none(),
"aws-chunked encoded bodies don't have normal HTTP trailers"
);
}
}