Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new encoder for client and server #158

Merged
merged 10 commits into from
Nov 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ edition = "2018"
httparse = "1.3.3"
async-std = { version = "1.6.0", features = ["unstable"] }
http-types = "2.0.0"
pin-project-lite = "0.1.1"
byte-pool = "0.2.1"
lazy_static = "1.4.0"
futures-core = "0.3.1"
log = "0.4"
pin-project = "1.0.2"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in theory, pin-project-lite 0.2 has enum pinning, but i couldn't figure out how to get it to work correctly


[dev-dependencies]
pretty_assertions = "0.6.1"
Expand Down
38 changes: 38 additions & 0 deletions src/body_encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

use async_std::io::Read;
use http_types::Body;
use pin_project::pin_project;

use crate::chunked::ChunkedEncoder;

#[pin_project(project=BodyEncoderProjection)]
#[derive(Debug)]
pub(crate) enum BodyEncoder {
Chunked(#[pin] ChunkedEncoder<Body>),
Fixed(#[pin] Body),
}

impl BodyEncoder {
pub(crate) fn new(body: Body) -> Self {
match body.len() {
Some(_) => Self::Fixed(body),
None => Self::Chunked(ChunkedEncoder::new(body)),
}
}
}

impl Read for BodyEncoder {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match self.project() {
BodyEncoderProjection::Chunked(encoder) => encoder.poll_read(cx, buf),
BodyEncoderProjection::Fixed(body) => body.poll_read(cx, buf),
}
}
}
308 changes: 87 additions & 221 deletions src/chunked/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,250 +3,116 @@ use std::pin::Pin;
use async_std::io;
use async_std::io::prelude::*;
use async_std::task::{Context, Poll};
use http_types::Response;

const CR: u8 = b'\r';
const LF: u8 = b'\n';
const CRLF_LEN: usize = 2;

/// The encoder state.
#[derive(Debug)]
enum State {
/// Starting state.
Start,
/// Streaming out chunks.
EncodeChunks,
/// No more chunks to stream, mark the end.
EndOfChunks,
/// Receiving trailers from a channel.
ReceiveTrailers,
/// Streaming out trailers, if we received any.
EncodeTrailers,
/// Writing out the final CRLF.
EndOfStream,
/// The stream has finished.
End,
}
use futures_core::ready;

/// An encoder for chunked encoding.
#[derive(Debug)]
pub(crate) struct ChunkedEncoder {
/// How many bytes we've written to the buffer so far.
bytes_written: usize,
/// The internal encoder state.
state: State,
pub(crate) struct ChunkedEncoder<R> {
reader: R,
done: bool,
}

impl ChunkedEncoder {
impl<R: Read + Unpin> ChunkedEncoder<R> {
/// Create a new instance.
pub(crate) fn new() -> Self {
pub(crate) fn new(reader: R) -> Self {
Self {
state: State::Start,
bytes_written: 0,
reader,
done: false,
}
}
}

/// Encode an AsyncBufRead using "chunked" framing. This is used for streams
/// whose length is not known up front.
///
/// # Format
///
/// Each "chunk" uses the following encoding:
///
/// ```txt
/// 1. {byte length of `data` as hex}\r\n
/// 2. {data}\r\n
/// ```
///
/// A chunk stream is finalized by appending the following:
///
/// ```txt
/// 1. 0\r\n
/// 2. {trailing header}\r\n (can be repeated)
/// 3. \r\n
/// ```
pub(crate) fn encode(
&mut self,
res: &mut Response,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.bytes_written = 0;
let res = self.run(res, cx, buf);
log::trace!("ChunkedEncoder {} bytes written", self.bytes_written);
res
}

/// Execute the right method for the current state.
fn run(
&mut self,
res: &mut Response,
impl<R: Read + Unpin> Read for ChunkedEncoder<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match self.state {
State::Start => self.dispatch(State::EncodeChunks, res, cx, buf),
State::EncodeChunks => self.encode_chunks(res, cx, buf),
State::EndOfChunks => self.encode_chunks_eos(res, cx, buf),
State::ReceiveTrailers => self.receive_trailers(res, cx, buf),
State::EncodeTrailers => self.encode_trailers(res, cx, buf),
State::EndOfStream => self.encode_eos(res, cx, buf),
State::End => Poll::Ready(Ok(self.bytes_written)),
if self.done {
return Poll::Ready(Ok(0));
}
}
let reader = &mut self.reader;

/// Switch the internal state to a new state.
fn dispatch(
&mut self,
state: State,
res: &mut Response,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
use State::*;
log::trace!("ChunkedEncoder state: {:?} -> {:?}", self.state, state);
let max_bytes_to_read = max_bytes_to_read(buf.len());

#[cfg(debug_assertions)]
match self.state {
Start => assert!(matches!(state, EncodeChunks)),
EncodeChunks => assert!(matches!(state, EndOfChunks)),
EndOfChunks => assert!(matches!(state, ReceiveTrailers)),
ReceiveTrailers => assert!(matches!(state, EncodeTrailers | EndOfStream)),
EncodeTrailers => assert!(matches!(state, EndOfStream)),
EndOfStream => assert!(matches!(state, End)),
End => panic!("No state transitions allowed after the ChunkedEncoder has ended"),
let bytes = ready!(Pin::new(reader).poll_read(cx, &mut buf[..max_bytes_to_read]))?;
if bytes == 0 {
self.done = true;
}

self.state = state;
self.run(res, cx, buf)
let start = format!("{:X}\r\n", bytes);
let start_length = start.as_bytes().len();
let total = bytes + start_length + 2;
buf.copy_within(..bytes, start_length);
buf[..start_length].copy_from_slice(start.as_bytes());
buf[total - 2..total].copy_from_slice(b"\r\n");
Poll::Ready(Ok(total))
}
}

/// Stream out data using chunked encoding.
fn encode_chunks(
&mut self,
mut res: &mut Response,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
// Get bytes from the underlying stream. If the stream is not ready yet,
// return the header bytes if we have any.
let src = match Pin::new(&mut res).poll_fill_buf(cx) {
Poll::Ready(Ok(n)) => n,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => match self.bytes_written {
0 => return Poll::Pending,
n => return Poll::Ready(Ok(n)),
},
};

// If the stream doesn't have any more bytes left to read we're done
// sending chunks and it's time to move on.
if src.len() == 0 {
return self.dispatch(State::EndOfChunks, res, cx, buf);
}

// Each chunk is prefixed with the length of the data in hex, then a
// CRLF, then the content, then another CRLF. Calculate how many bytes
// each part should be.
let buf_len = buf.len().saturating_sub(self.bytes_written);
let msg_len = src.len().min(buf_len);
// Calculate the max char count encoding the `len_prefix` statement
// as hex would take. This is done by rounding up `log16(amt + 1)`.
let hex_len = ((msg_len + 1) as f64).log(16.0).ceil() as usize;
let framing_len = hex_len + CRLF_LEN * 2;
let buf_upper = buf_len.saturating_sub(framing_len);
let msg_len = msg_len.min(buf_upper);
let len_prefix = format!("{:X}", msg_len).into_bytes();

// Request a new buf if the current buf is too small to write any data
// into. Empty frames should only be sent to mark the end of a stream.
if buf.len() <= framing_len {
cx.waker().wake_by_ref();
return Poll::Ready(Ok(self.bytes_written));
}

// Write our frame header to the buffer.
let lower = self.bytes_written;
let upper = self.bytes_written + len_prefix.len();
buf[lower..upper].copy_from_slice(&len_prefix);
buf[upper] = CR;
buf[upper + 1] = LF;
self.bytes_written += len_prefix.len() + 2;

// Copy the bytes from our source into the output buffer.
let lower = self.bytes_written;
let upper = self.bytes_written + msg_len;
buf[lower..upper].copy_from_slice(&src[0..msg_len]);
Pin::new(&mut res).consume(msg_len);
self.bytes_written += msg_len;

// Finalize the chunk with a closing CRLF.
let idx = self.bytes_written;
buf[idx] = CR;
buf[idx + 1] = LF;
self.bytes_written += CRLF_LEN;

// Finally return how many bytes we've written to the buffer.
Poll::Ready(Ok(self.bytes_written))
fn max_bytes_to_read(buf_len: usize) -> usize {
if buf_len < 6 {
// the minimum read size is of 6 represents one byte of
// content from the body. the other five bytes are 1\r\n_\r\n
// where _ is the actual content in question
panic!("buffers of length {} are too small for this implementation. if this is a problem for you, please open an issue", buf_len);
}

fn encode_chunks_eos(
&mut self,
res: &mut Response,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
// Request a new buf if the current buf is too small to write into.
if buf.len() < 3 {
cx.waker().wake_by_ref();
return Poll::Ready(Ok(self.bytes_written));
}

// Write out the final empty chunk
let idx = self.bytes_written;
buf[idx] = b'0';
buf[idx + 1] = CR;
buf[idx + 2] = LF;
self.bytes_written += 1 + CRLF_LEN;
let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;

self.dispatch(State::ReceiveTrailers, res, cx, buf)
}

/// Receive trailers sent to the response, and store them in an internal
/// buffer.
fn receive_trailers(
&mut self,
res: &mut Response,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
// TODO: actually wait for trailers to be received.
self.dispatch(State::EncodeTrailers, res, cx, buf)
}
// the maximum number of bytes that the hex representation of remaining bytes might take
let max_bytes_of_hex_framing = bytes_remaining_after_two_cr_lns.log2() / 4f64;

/// Send trailers to the buffer.
fn encode_trailers(
&mut self,
res: &mut Response,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
// TODO: actually encode trailers here.
self.dispatch(State::EndOfStream, res, cx, buf)
}
(bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
}

/// Encode the end of the stream.
fn encode_eos(
&mut self,
res: &mut Response,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let idx = self.bytes_written;
// Write the final CRLF
buf[idx] = CR;
buf[idx + 1] = LF;
self.bytes_written += CRLF_LEN;
self.dispatch(State::End, res, cx, buf)
#[cfg(test)]
mod test_bytes_to_read {
#[test]
fn simple_check_of_known_values() {
// the marked rows are the most important part of this test,
// and a nonobvious but intentional consequence of the
// implementation. in order to avoid overflowing, we must use
// one fewer than the available buffer bytes because
// increasing the read size increase the number of framed
// bytes by two. This occurs when the hex representation of
// the content bytes is near an increase in order of magnitude
// (F->10, FF->100, FFF-> 1000, etc)
let values = vec![
(6, 1), // 1
(7, 2), // 2
(20, 15), // F
(21, 15), // F <-
(22, 16), // 10
(23, 17), // 11
(260, 254), // FE
(261, 254), // FE <-
(262, 255), // FF <-
(263, 256), // 100
(4100, 4093), // FFD
(4101, 4093), // FFD <-
(4102, 4094), // FFE <-
(4103, 4095), // FFF <-
(4104, 4096), // 1000
];

for (input, expected) in values {
let actual = super::max_bytes_to_read(input);
assert_eq!(
actual, expected,
"\n\nexpected max_bytes_to_read({}) to be {}, but it was {}",
input, expected, actual
);

// testing the test:
let used_bytes = expected + 4 + format!("{:X}", expected).len();
assert!(
used_bytes == input || used_bytes == input - 1,
"\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
input,
input,
input - 1,
used_bytes
);
}
}
}
Loading