Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Event Stream message frame ser/de in smithy-eventstream #609

Merged
merged 10 commits into from
Jul 29, 2021
1 change: 1 addition & 0 deletions rust-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
members = [
"smithy-http",
"smithy-client",
"smithy-eventstream",
"smithy-http-tower",
"smithy-json",
"smithy-query",
Expand Down
15 changes: 15 additions & 0 deletions rust-runtime/smithy-eventstream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "smithy-eventstream"
version = "0.1.0"
authors = ["AWS Rust SDK Team <[email protected]>", "John DiSanti <[email protected]>"]
edition = "2018"

[features]
derive-arbitrary = ["arbitrary"]
default = []

[dependencies]
arbitrary = { version = "1", optional = true, features = ["derive"] }
bytes = "1"
crc32fast = "1"
smithy-types = { path = "../smithy-types" }
4 changes: 4 additions & 0 deletions rust-runtime/smithy-eventstream/fuzz/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

target
corpus
artifacts
48 changes: 48 additions & 0 deletions rust-runtime/smithy-eventstream/fuzz/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[package]
name = "smithy-eventstream-fuzz"
version = "0.1.0"
authors = ["AWS Rust SDK Team <[email protected]>", "John DiSanti <[email protected]>"]
publish = false
edition = "2018"

[package.metadata]
cargo-fuzz = true

[dependencies]
arbitrary = { version = "1", features = ["derive"] }
bytes = "1"
crc32fast = "1"
libfuzzer-sys = "0.4"
smithy-types = { path = "../../smithy-types" }

[dependencies.smithy-eventstream]
features = ["derive-arbitrary"]
path = ".."

# Prevent this from interfering with workspaces
[workspace]
members = ["."]

[[bin]]
name = "read_message"
path = "fuzz_targets/read_message.rs"
test = false
doc = false

[[bin]]
name = "roundtrip"
path = "fuzz_targets/roundtrip.rs"
test = false
doc = false

[[bin]]
name = "assisted_read"
path = "fuzz_targets/assisted_read.rs"
test = false
doc = false

[[bin]]
name = "mutated_read"
path = "fuzz_targets/mutated_read.rs"
test = false
doc = false
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
41 changes: 41 additions & 0 deletions rust-runtime/smithy-eventstream/fuzz/fuzz_targets/assisted_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#![no_main]

use arbitrary::Arbitrary;
use bytes::BufMut;
use crc32fast::Hasher as Crc;
use libfuzzer_sys::fuzz_target;
use smithy_eventstream::frame::Message;

#[derive(Arbitrary, Debug)]
struct Input {
headers: Vec<u8>,
payload: Vec<u8>,
}

// This fuzz test assists the fuzzer by creating a well formed prelude and message CRC
fuzz_target!(|input: Input| {
let total_len = (12 + input.headers.len() + input.payload.len() + 4) as u32;
let header_len = input.headers.len() as u32;

let mut message = Vec::<u8>::new();
message.put_u32(total_len);
message.put_u32(header_len);
message.put_u32(crc(&message));
message.put_slice(&input.headers);
message.put_slice(&input.payload);
message.put_u32(crc(&message));

let mut data = &mut &message[..];
let _ = Message::read_from(&mut data);
});

fn crc(input: &[u8]) -> u32 {
let mut crc = Crc::new();
crc.update(input);
crc.finalize()
}
83 changes: 83 additions & 0 deletions rust-runtime/smithy-eventstream/fuzz/fuzz_targets/mutated_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#![no_main]

use bytes::{Buf, BufMut};
use crc32fast::Hasher as Crc;
use libfuzzer_sys::{fuzz_mutator, fuzz_target};
use smithy_eventstream::frame::{Header, HeaderValue, Message};
use smithy_types::Instant;

fn mutate(data: &mut [u8], size: usize, max_size: usize) -> usize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment about this fuzz mutation strategy would be good

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

let input = &mut &data[..size];
let message = {
let result = Message::read_from(input);
if result.is_err() || result.as_ref().unwrap().is_none() {
Message::new(&b"some payload"[..])
.add_header(Header::new("true", HeaderValue::Bool(true)))
.add_header(Header::new("false", HeaderValue::Bool(false)))
.add_header(Header::new("byte", HeaderValue::Byte(50)))
.add_header(Header::new("short", HeaderValue::Int16(20_000)))
.add_header(Header::new("int", HeaderValue::Int32(500_000)))
.add_header(Header::new("long", HeaderValue::Int64(50_000_000_000)))
.add_header(Header::new(
"bytes",
HeaderValue::ByteArray((&b"some bytes"[..]).into()),
))
.add_header(Header::new(
"str",
HeaderValue::String((&b"some str"[..]).into()),
))
.add_header(Header::new(
"time",
HeaderValue::Timestamp(Instant::from_epoch_seconds(5_000_000_000)),
))
.add_header(Header::new(
"uuid",
HeaderValue::Uuid(0xb79bc914_de21_4e13_b8b2_bc47e85b7f0b),
))
} else {
result.unwrap().unwrap()
}
};

let mut bytes = Vec::new();
message.write_to(&mut bytes).unwrap();

let headers_len = (&bytes[4..8]).get_u32();
let non_header_len = bytes.len() - headers_len as usize;
let max_header_len = max_size - non_header_len;
let mut headers = (&bytes[12..(12 + headers_len as usize)]).to_vec();
headers.resize(max_header_len, 0);
let new_header_len =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so do we only mutate the headers? does the crc not apply to the payload? I don't really follow what's going on here overall I guess

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not a lot to test with the payload itself except that the message length and header length were accurate. I think there should probably be one more fuzz test that just plays with the prelude while keeping the CRCs valid to explore that validation logic.

libfuzzer_sys::fuzzer_mutate(&mut headers, headers_len as usize, max_header_len);

let mut mutated = Vec::<u8>::new();
mutated.put_u32((new_header_len + non_header_len) as u32);
mutated.put_u32(new_header_len as u32);
mutated.put_u32(crc(&mutated));
mutated.put_slice(&headers[..new_header_len]);
mutated.put_slice(message.payload());
mutated.put_u32(crc(&mutated));

data[..mutated.len()].copy_from_slice(&mutated);
mutated.len()
}

fuzz_mutator!(
|data: &mut [u8], size: usize, max_size: usize, _seed: u32| { mutate(data, size, max_size) }
);

fuzz_target!(|data: &[u8]| {
let mut message = data;
let _ = Message::read_from(&mut message);
});

fn crc(input: &[u8]) -> u32 {
let mut crc = Crc::new();
crc.update(input);
crc.finalize()
}
14 changes: 14 additions & 0 deletions rust-runtime/smithy-eventstream/fuzz/fuzz_targets/read_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#![no_main]

use libfuzzer_sys::fuzz_target;
use smithy_eventstream::frame::Message;

fuzz_target!(|data: &[u8]| {
let mut message = data;
let _ = Message::read_from(&mut message);
});
17 changes: 17 additions & 0 deletions rust-runtime/smithy-eventstream/fuzz/fuzz_targets/roundtrip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#![no_main]
use libfuzzer_sys::fuzz_target;
use smithy_eventstream::frame::Message;

fuzz_target!(|message: Message| {
let mut buffer = Vec::new();
if message.write_to(&mut buffer).is_ok() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fails only when the message is too long, right? would be good to have a little bit of sanity checking that the error case only happens when the buffer is too long

let mut data = &buffer[..];
let parsed = Message::read_from(&mut data).unwrap().unwrap();
assert_eq!(message, parsed);
}
});
7 changes: 7 additions & 0 deletions rust-runtime/smithy-eventstream/src/buf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

pub mod count;
pub mod crc;
84 changes: 84 additions & 0 deletions rust-runtime/smithy-eventstream/src/buf/count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

//! A [`Buf`] implementation that counts bytes read.

use bytes::Buf;
use std::sync::atomic::{AtomicUsize, Ordering};

/// A [`Buf`] implementation that counts bytes read.
pub struct CountBuf<'a> {
buffer: &'a mut dyn Buf,
count: AtomicUsize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need to be atomic usize instead of just usize? since advance has &mut self it seems like it could just be a usize?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, I was increasing the count in chunk, which meant I needed interior mutability since chunk takes &self. But then I realized that was a bug since chunk can be called multiple times before advancing, so I fixed that and forgot to reduce the AtomicUsize back down. Will fix.

}

impl<'a> CountBuf<'a> {
/// Creates a new `CountBuf` by wrapping the given `buffer`.
pub fn new(buffer: &'a mut dyn Buf) -> Self {
CountBuf {
buffer,
count: AtomicUsize::new(0),
}
}

/// Consumes the `CountBuf` and returns the number of bytes read.
pub fn into_count(self) -> usize {
self.count.load(Ordering::Relaxed)
}
}

impl<'a> Buf for CountBuf<'a> {
fn remaining(&self) -> usize {
self.buffer.remaining()
}

fn chunk(&self) -> &[u8] {
self.buffer.chunk()
}

fn advance(&mut self, cnt: usize) {
self.count.fetch_add(cnt, Ordering::Relaxed);
self.buffer.advance(cnt);
}
}

#[cfg(test)]
mod tests {
use super::CountBuf;
use bytes::Buf;

#[test]
fn count_no_data_read() {
let mut data: &[u8] = &[];
let buf = CountBuf::new(&mut data);
assert_eq!(0, buf.into_count());
}

#[test]
fn count_data_read() {
let mut data: &[u8] = &[0, 0, 0, 5, 0, 10u8];
let mut buf = CountBuf::new(&mut data);
assert_eq!(5, buf.get_i32());
assert_eq!(4, buf.into_count());

let mut data: &[u8] = &[0, 0, 0, 5, 0, 10u8];
let mut buf = CountBuf::new(&mut data);
assert_eq!(5, buf.get_i32());
assert_eq!(10, buf.get_i16());
assert_eq!(6, buf.into_count());
}

#[test]
fn chunk_called_multiple_times_before_advance() {
let mut data: &[u8] = &[0, 0, 0, 5, 0, 10u8];
let mut buf = CountBuf::new(&mut data);
for _ in 0..3 {
buf.chunk();
}
buf.advance(4);
assert_eq!(10, buf.get_i16());
assert_eq!(6, buf.into_count());
}
}
Loading