Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add support for aws-chunked content encoding #1501

Merged
merged 15 commits into from
Jul 5, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 149 additions & 3 deletions aws/rust-runtime/aws-http/src/content_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pin_project! {
#[pin]
state: AwsChunkedBodyState,
options: AwsChunkedBodyOptions,
inner_body_bytes_read_so_far: usize,
}
}

Expand All @@ -108,6 +109,7 @@ impl<Inner> AwsChunkedBody<Inner> {
inner: body,
state: AwsChunkedBodyState::WritingChunkSize,
options,
inner_body_bytes_read_so_far: 0,
}
}

Expand Down Expand Up @@ -226,9 +228,20 @@ where
AwsChunkedBodyState::WritingChunk => match this.inner.poll_data(cx) {
Poll::Ready(Some(Ok(data))) => {
tracing::trace!(len = data.len(), "writing chunk data");
*this.inner_body_bytes_read_so_far += data.len();
Poll::Ready(Some(Ok(data)))
}
Poll::Ready(None) => {
let actual_stream_length = *this.inner_body_bytes_read_so_far as u64;
let expected_stream_length = this.options.stream_length;
if actual_stream_length != expected_stream_length {
let err = Box::new(AwsChunkedBodyError::StreamLengthMismatch {
actual: actual_stream_length,
expected: expected_stream_length,
});
return Poll::Ready(Some(Err(err)));
};

tracing::trace!("no more chunk data, writing CRLF and chunk terminator");
*this.state = AwsChunkedBodyState::WritingTrailers;
// Since we wrote chunk data, we end it with a CRLF and since we only write
Expand Down Expand Up @@ -290,10 +303,14 @@ where
#[derive(Debug)]
enum AwsChunkedBodyError {
/// Error that occurs when the sum of `trailer_lengths` set when creating an `AwsChunkedBody` is
/// not equal to the actual length of the trailers returned by the inner `http::Body`
/// not equal to the actual length of the trailers returned by the inner `http_body::Body`
/// implementor. These trailer lengths are necessary in order to correctly calculate the total
/// size of the body for setting the content length header.
ReportedTrailerLengthMismatch { actual: u64, expected: u64 },
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see @rcoh suggested making an error for this, but I think this is a case where we have a bug in our code, and there's little to no value for a customer in matching on this error. It's a case that simply should not happen if the code is correct, so an assertion was warranted.

On the flip side, with an assertion, it would silently allow the lengths to mismatch when compiled in release mode, which would likely result in a server-side error response. I think this is OK though, since a bug with this should not have been released in the first place if it was adequately tested.

We might want to proptest header names/values for the size prediction vs. the rendering to be absolutely certain they cannot mismatch. I think they can right now in the case where a header value has Unicode characters in it since the length predictor function is using string length instead of byte length.

Copy link
Collaborator

@jdisanti jdisanti Jun 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing we could consider is just getting rid of the length prediction entirely. Just render the trailers to a BytesMut when the length is initially needed, and store that BytesMut in the AwsChunkedBody for later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdisanti only debug_assert doesn't fail in release mode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new error for when stream_length is set incorrectly

/// Error that occurs when the `stream_length` set when creating an `AwsChunkedBody` is not
/// equal to the actual length of the body returned by the inner `http_body::Body` implementor.
/// `stream_length` must be correct in order to set an accurate content length header.
StreamLengthMismatch { actual: u64, expected: u64 },
}

impl std::fmt::Display for AwsChunkedBodyError {
Expand All @@ -302,6 +319,9 @@ impl std::fmt::Display for AwsChunkedBodyError {
Self::ReportedTrailerLengthMismatch { actual, expected } => {
write!(f, "When creating this AwsChunkedBody, length of trailers was reported as {expected}. However, when double checking during trailer encoding, length was found to be {actual} instead.")
}
Self::StreamLengthMismatch { actual, expected } => {
write!(f, "When creating this AwsChunkedBody, stream length was reported as {expected}. However, when double checking during body encoding, length was found to be {actual} instead.")
}
}
}
}
Expand All @@ -327,13 +347,86 @@ where

#[cfg(test)]
mod tests {
use super::*;
use super::{
total_rendered_length_of_trailers, trailers_as_aws_chunked_bytes, AwsChunkedBody,
AwsChunkedBodyOptions, CHUNK_TERMINATOR, CRLF,
};

use aws_smithy_http::body::SdkBody;
use bytes::Buf;
use bytes::{Buf, Bytes};
use bytes_utils::SegmentedBuf;
use http::{HeaderMap, HeaderValue};
use http_body::{Body, SizeHint};
use pin_project_lite::pin_project;

use std::io::Read;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

pin_project! {
struct SputteringBody {
parts: Vec<Option<Bytes>>,
cursor: usize,
delay_in_millis: u64,
}
}

impl SputteringBody {
fn len(&self) -> usize {
self.parts.iter().flat_map(|b| b).map(|b| b.len()).sum()
}
}

Comment on lines +367 to +380
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is cool!

impl Body for SputteringBody {
type Data = Bytes;
type Error = aws_smithy_http::body::Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
if self.cursor == self.parts.len() {
return Poll::Ready(None);
}

let this = self.project();
let delay_in_millis = *this.delay_in_millis;
let next_part = this.parts.get_mut(*this.cursor).unwrap().take();

match next_part {
None => {
*this.cursor += 1;
let waker = cx.waker().clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(delay_in_millis)).await;
waker.wake();
});
Poll::Pending
}
Some(data) => {
*this.cursor += 1;
Poll::Ready(Some(Ok(data)))
}
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
Poll::Ready(Ok(None))
}

fn is_end_stream(&self) -> bool {
false
}

fn size_hint(&self) -> SizeHint {
SizeHint::new()
}
}

#[tokio::test]
async fn test_aws_chunked_encoding() {
let test_fut = async {
Expand Down Expand Up @@ -372,6 +465,59 @@ mod tests {
}
}

#[tokio::test]
async fn test_aws_chunked_encoding_sputtering_body() {
let test_fut = async {
let input = SputteringBody {
parts: vec![
Some(Bytes::from_static(b"chunk 1, ")),
None,
Some(Bytes::from_static(b"chunk 2, ")),
Some(Bytes::from_static(b"chunk 3, ")),
None,
None,
Some(Bytes::from_static(b"chunk 4, ")),
Some(Bytes::from_static(b"chunk 5, ")),
Some(Bytes::from_static(b"chunk 6")),
],
cursor: 0,
delay_in_millis: 500,
};
let opts = AwsChunkedBodyOptions::new(input.len() as u64, Vec::new());
let mut body = AwsChunkedBody::new(input, opts);

let mut output = SegmentedBuf::new();
while let Some(buf) = body.data().await {
output.push(buf.unwrap());
}

let mut actual_output = String::new();
output
.reader()
.read_to_string(&mut actual_output)
.expect("Doesn't cause IO errors");

let expected_output =
"34\r\nchunk 1, chunk 2, chunk 3, chunk 4, chunk 5, chunk 6\r\n0\r\n\r\n";

assert_eq!(expected_output, actual_output);
assert!(
body.trailers()
.await
.expect("no errors occurred during trailer polling")
.is_none(),
"aws-chunked encoded bodies don't have normal HTTP trailers"
);
};

let timeout_duration = Duration::from_secs(3);
if let Err(_) = tokio::time::timeout(timeout_duration, test_fut).await {
panic!(
"test_aws_chunked_encoding_sputtering_body timed out after {timeout_duration:?}"
);
}
}

#[tokio::test]
#[should_panic = "called `Result::unwrap()` on an `Err` value: ReportedTrailerLengthMismatch { actual: 42, expected: 0 }"]
async fn test_aws_chunked_encoding_incorrect_trailer_length_panic() {
Expand Down