Skip to content

Commit

Permalink
Merge pull request #17 from cbs228/feature/write_backpressure
Browse files Browse the repository at this point in the history
Add backpressure to FramedWrite senders
  • Loading branch information
matthunz authored Sep 6, 2019
2 parents 0e96bd4 + 7f05fc8 commit 5f492b7
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 7 deletions.
63 changes: 58 additions & 5 deletions src/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,40 @@ where
}
}

/// High-water mark for writes, in bytes
///
/// The send *high-water mark* prevents the `FramedWrite`
/// from accepting additional messages to send when its
/// buffer exceeds this length, in bytes. Attempts to enqueue
/// additional messages will be deferred until progress is
/// made on the underlying `AsyncWrite`. This applies
/// back-pressure on fast senders and prevents unbounded
/// buffer growth.
///
/// See [`set_send_high_water_mark()`](#method.set_send_high_water_mark).
pub fn send_high_water_mark(&self) -> usize {
return self.inner.high_water_mark;
}

/// Sets high-water mark for writes, in bytes
///
/// The send *high-water mark* prevents the `FramedWrite`
/// from accepting additional messages to send when its
/// buffer exceeds this length, in bytes. Attempts to enqueue
/// additional messages will be deferred until progress is
/// made on the underlying `AsyncWrite`. This applies
/// back-pressure on fast senders and prevents unbounded
/// buffer growth.
///
/// The default high-water mark is 2^17 bytes. Applications
/// which desire low latency may wish to reduce this value.
/// There is little point to increasing this value beyond
/// your socket's `SO_SNDBUF` size. On linux, this defaults
/// to 212992 bytes but is user-adjustable.
pub fn set_send_high_water_mark(&mut self, hwm: usize) {
self.inner.high_water_mark = hwm;
}

/// Release the I/O and Encoder
pub fn release(self) -> (T, E) {
let fuse = self.inner.release();
Expand Down Expand Up @@ -73,12 +107,18 @@ where

pub struct FramedWrite2<T> {
pub inner: T,
pub high_water_mark: usize,
buffer: BytesMut,
}

// 2^17 bytes, which is slightly over 60% of the default
// TCP send buffer size (SO_SNDBUF)
const DEFAULT_SEND_HIGH_WATER_MARK: usize = 131072;

pub fn framed_write_2<T>(inner: T) -> FramedWrite2<T> {
FramedWrite2 {
inner,
high_water_mark: DEFAULT_SEND_HIGH_WATER_MARK,
buffer: BytesMut::with_capacity(1028 * 8),
}
}
Expand All @@ -101,7 +141,18 @@ where
{
type Error = T::Error;

fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let this = &mut *self;
while this.buffer.len() >= this.high_water_mark {
let num_write = ready!(Pin::new(&mut this.inner).poll_write(cx, &this.buffer))?;

if num_write == 0 {
return Poll::Ready(Err(err_eof().into()));
}

this.buffer.advance(num_write);
}

Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, item: T::Item) -> Result<(), Self::Error> {
Expand All @@ -115,12 +166,10 @@ where
let num_write = ready!(Pin::new(&mut this.inner).poll_write(cx, &this.buffer))?;

if num_write == 0 {
return Poll::Ready(Err(
Error::new(ErrorKind::UnexpectedEof, "End of stream").into()
));
return Poll::Ready(Err(err_eof().into()));
}

this.buffer.split_to(num_write);
this.buffer.advance(num_write);
}

Pin::new(&mut this.inner).poll_flush(cx).map_err(Into::into)
Expand All @@ -136,3 +185,7 @@ impl<T> FramedWrite2<T> {
self.inner
}
}

fn err_eof() -> Error {
Error::new(ErrorKind::UnexpectedEof, "End of file")
}
78 changes: 76 additions & 2 deletions tests/framed_write.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,58 @@
use futures::executor;
use bytes::Bytes;
use core::iter::Iterator;
use futures::io::AsyncWrite;
use futures::sink::SinkExt;
use futures_codec::{FramedWrite, LinesCodec};
use futures::task::Context;
use futures::Poll;
use futures::{executor, stream};
use futures_codec::{BytesCodec, FramedWrite, LinesCodec};
use std::io::Cursor;
use std::pin::Pin;

// An iterator which outputs a single zero byte up to limit times
struct ZeroBytes {
pub count: usize,
pub limit: usize,
}
impl Iterator for ZeroBytes {
type Item = Bytes;
fn next(&mut self) -> Option<Self::Item> {
if self.count >= self.limit {
None
} else {
self.count += 1;
Some(Bytes::from_static(b"\0"))
}
}
}

// An AsyncWrite which is always ready and just consumes the data
struct AsyncWriteNull {
// number of poll_write calls
pub num_poll_write: usize,

// size of the last poll_write
pub last_write_size: usize,
}
impl AsyncWrite for AsyncWriteNull {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.num_poll_write += 1;
self.last_write_size = buf.len();
Poll::Ready(Ok(buf.len()))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}

#[test]
fn line_write() {
Expand All @@ -24,3 +75,26 @@ fn line_write_to_eof() {
assert_eq!(curs.position(), 16);
assert_eq!(&curs.get_ref()[0..16], b"This will fill u");
}

#[test]
fn send_high_water_mark() {
// stream will output 999 bytes, 1 at at a time, and will always be ready
let mut stream = stream::iter(ZeroBytes {
count: 0,
limit: 999,
});

// sink will eat whatever it receives
let io = AsyncWriteNull {
num_poll_write: 0,
last_write_size: 0,
};

// expect two sends
let mut framer = FramedWrite::new(io, BytesCodec {});
framer.set_send_high_water_mark(500);
executor::block_on(framer.send_all(&mut stream)).unwrap();
let (io, _) = framer.release();
assert_eq!(io.num_poll_write, 2);
assert_eq!(io.last_write_size, 499);
}

0 comments on commit 5f492b7

Please sign in to comment.