Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Event Stream message frame ser/de in smithy-eventstream #609

Merged
merged 10 commits into from
Jul 29, 2021
Merged

Implement Event Stream message frame ser/de in smithy-eventstream #609

merged 10 commits into from
Jul 29, 2021

Conversation

jdisanti
Copy link
Collaborator

This implements the Event Stream message frame format for #121.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@jdisanti jdisanti requested a review from rcoh July 27, 2021 00:52
Copy link
Collaborator

@rcoh rcoh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Super clean implementation! Some notes inline. Lets discuss ByteStr and whether it's worth the complexity especially since it becomes part of the public interface

/// A [`Buf`] implementation that counts bytes read.
pub struct CountBuf<'a> {
buffer: &'a mut dyn Buf,
count: AtomicUsize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need to be atomic usize instead of just usize? since advance has &mut self it seems like it could just be a usize?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, I was increasing the count in chunk, which meant I needed interior mutability since chunk takes &self. But then I realized that was a bug since chunk can be called multiple times before advancing, so I fixed that and forgot to reduce the AtomicUsize back down. Will fix.

/// UTF-8 string byte buffer representation with validation amortization.
#[non_exhaustive]
#[derive(Debug)]
pub struct StrBytes {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would probably simplify this to do what the Http crate does: maintain an invariant that StrBytes can only contain valid UTF-8.

https://github.com/hyperium/http/blob/master/src/byte_str.rs

Assuming this is solely to be used in header names (which we know are always valid utf8?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified to do validation up front.

Int32(i32),
Int64(i64),
ByteArray(Bytes),
String(StrBytes),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if StrBytes pre-validates (as it probably should, then users don't need another layer of checking here)

}
let bytes = buffer.copy_to_bytes(len);
if value_type == TYPE_STRING {
Ok(HeaderValue::String(bytes.into()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably actually validate that this is UTF-8 at this point

}
}

pub(super) fn write_to(&self, buffer: &mut dyn BufMut) -> Result<(), Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd probably use impl BufMut here? although I'm generally pro dyn, these methods are going to be extremely hot so getting it to be fully optimized is maybe worth it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


fuzz_target!(|message: Message| {
let mut buffer = Vec::new();
if message.write_to(&mut buffer).is_ok() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fails only when the message is too long, right? would be good to have a little bit of sanity checking that the error case only happens when the buffer is too long


/// Reads a message from the given `buffer`. If the buffer doesn't have the whole
/// message in it yet (as is the case when streaming), then it returns `Ok(None)`.
pub fn read_from(buffer: &mut dyn Buf) -> Result<Option<Message>, Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference for explicit enums instead of Option here:

enum ParsedMessage {
  Complete(Message),
  Incomplete
}

naming bikeshedding welcome

Copy link
Collaborator Author

@jdisanti jdisanti Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my instinct, but then I saw Tokio's Decoder use Result<Option<...>>: https://docs.rs/tokio-codec/0.2.0-alpha.5/tokio_codec/trait.Decoder.html#tymethod.decode

I can diverge though if it makes the code clearer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can follow tokio, although Tokio also needs special docs to clarify what it means...since None can imply end of stream OR partial message it might be worth a distinction


#[test]
fn message_not_fully_available_yet() {
let message = include_bytes!("../test_data/valid_with_all_headers_and_payload");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are callers supposed to deal with the partially mutated buffer that results from a call to read_from with a partial buffer? It seems like it actually reads through the prelude right? So you couldn't re-use the same bytes later.

would be good to formalize this behavior and document how it should be used in the context of something that is impl Stream<Bytes>

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine the caller must have to do something to preserve the initial position in the event it returns None. There's no peeking with Buf, so there is no other way to do that. I should double check this though instead of assuming.

@jdisanti jdisanti requested a review from rcoh July 28, 2021 01:44
Copy link
Collaborator

@rcoh rcoh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! a few possible improvements inline but this looks awesome.

use smithy_eventstream::frame::{Header, HeaderValue, Message};
use smithy_types::Instant;

fn mutate(data: &mut [u8], size: usize, max_size: usize) -> usize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment about this fuzz mutation strategy would be good

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

let max_header_len = max_size - non_header_len;
let mut headers = (&bytes[12..(12 + headers_len as usize)]).to_vec();
headers.resize(max_header_len, 0);
let new_header_len =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so do we only mutate the headers? does the crc not apply to the payload? I don't really follow what's going on here overall I guess

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not a lot to test with the payload itself except that the message length and header length were accurate. I think there should probably be one more fuzz test that just plays with the prelude while keeping the CRCs valid to explore that validation logic.

}
}

/// UTF-8 string byte buffer representation with validation amortization.
#[non_exhaustive]
#[derive(Debug)]
pub struct StrBytes {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lost the doc comment on this structure. A usage example might be helpful

@@ -88,7 +45,6 @@ impl Clone for StrBytes {
fn clone(&self) -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be derived

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Fixed.

StrBytes {
bytes: Bytes::from(value),
valid_utf8: AtomicU8::new(State::Unknown.as_u8()),
impl TryFrom<&'static [u8]> for StrBytes {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually I've seen this as an explicit from_static inherent method because if we ever want to add a TryFrom<&'a [u8]> these would conflict

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm following bytes's lead here. Added try_copy_from_slice and copy_from_str functions for the non-static use-case.

impl TryFrom<Bytes> for StrBytes {
type Error = Utf8Error;

fn try_from(bytes: Bytes) -> Result<Self, Self::Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you delegate these to all go through the same place? Either TryFrom<Bytes> or a new function?

That way we can audit that there is only one location that actually instantiates StrBytes { bytes } with the private constructor

let mut decoder = MessageFrameDecoder::new();
let mut segmented = SegmentedBuf::new();
let mut decoded = Vec::new();
for window in repeated.chunks(3) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slight preference for one byte at a time to make sure that it works with only a single byte

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could also just repeat this test with a range of window sizes

/// Returns `Ok(None)` if there's not enough data, or `Some(remaining)` where
/// `remaining` is the number of bytes after the prelude that belong to the
/// message that's in the buffer.
fn remaining_ready<B: Buf>(&self, buffer: &B) -> Result<Option<usize>, Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found this function name a little hard to grok. maybe try_next_end_of_frame? don't really love that either

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this one was really hard to come up with a good name for. I'll go for remaining_bytes_if_frame_available. It's slightly better at least:

if let Some(remaining_len) = self.remaining_bytes_if_frame_available(&buffer)? {

} else {
result.unwrap().unwrap()
}
let message = if let Ok(message) = Message::read_from(input) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slight preference for using the FrameDecoder api in all the fuzz tests mostly just for the coverage. Could add a #[cfg(test)] helper to make it easy to use?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is adequately covered in the unit tests since there's only one u32 worth of data that is actually relevant to the MessageFrameDecoder. Fuzzing that u32 wouldn't give us much benefit as the result is either Complete or Incomplete without a whole lot of logic in between.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's fair. Maybe more relevant for the round trip test.

@jdisanti jdisanti merged commit 9fef09a into smithy-lang:main Jul 29, 2021
@jdisanti jdisanti deleted the event-stream-framing branch August 6, 2021 20:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants