Skip to content
This repository has been archived by the owner on May 6, 2018. It is now read-only.

Commit

Permalink
Add AllowSyncIo wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
cramertj committed Oct 28, 2017
1 parent d0ea1a2 commit 96cd292
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 0 deletions.
85 changes: 85 additions & 0 deletions src/allow_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use {AsyncRead, AsyncWrite};
use bytes::{Buf, BufMut};
use futures::{Async, Poll};
use std::{fmt, io};

/// A simple wrapper type which allows types which implement only `Read` or `Write`
/// to be used in contexts which expect an `AsyncRead` or `AsyncWrite`.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct AllowSyncIo<T>(pub T);

impl<T> io::Write for AllowSyncIo<T> where T: io::Write {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.0.write_all(buf)
}
fn write_fmt(&mut self, fmt: fmt::Arguments) -> io::Result<()> {
self.0.write_fmt(fmt)
}
}

impl<T> AsyncWrite for AllowSyncIo<T> where T: io::Write {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(Async::Ready(()))
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if !buf.has_remaining() {
return Ok(Async::Ready(0));
}

// Note: this line purposefully does not use `try_nb!`.
// Because `T` is supposedly a synchronous IO output,
// if EAGAIN appears it cannot be guaranteed that the
// current task will be notified and rescheduled appropriately.
let n = io::Write::write(self, buf.bytes())?;

buf.advance(n);
Ok(Async::Ready(n))
}
}

impl<T> io::Read for AllowSyncIo<T> where T: io::Read {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
// TODO: implement the `initializer` fn when it stabilizes.
// See rust-lang/rust #42788
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
self.0.read_to_end(buf)
}
fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
self.0.read_to_string(buf)
}
fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
self.0.read_exact(buf)
}
}

impl<T> AsyncRead for AllowSyncIo<T> where T: io::Read {
// TODO: override prepare_unitialized_buffer once `Read::initializer` is stable.
// See rust-lang/rust #42788
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if !buf.has_remaining_mut() {
return Ok(Async::Ready(0));
}

unsafe {
let n = {
let b = buf.bytes_mut();

self.prepare_uninitialized_buffer(b);

// Note: as above, purposefully avoid `try_nb!`.
io::Read::read(self, b)?
};

buf.advance_mut(n);
Ok(Async::Ready(n))
}
}
}
1 change: 1 addition & 0 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! [found online]: https://tokio.rs/docs/getting-started/core/
//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/

pub use allow_sync::AllowSyncIo;
pub use copy::{copy, Copy};
pub use flush::{flush, Flush};
pub use lines::{lines, Lines};
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ macro_rules! try_nb {
pub mod io;
pub mod codec;

mod allow_sync;
mod copy;
mod flush;
mod framed;
Expand Down

0 comments on commit 96cd292

Please sign in to comment.