Skip to content

Commit

Permalink
io: Add AsyncFd
Browse files Browse the repository at this point in the history
This adds AsyncFd, a unix-only structure to allow for read/writability states
to be monitored for arbitrary file descriptors.

Issue: tokio-rs#2728
  • Loading branch information
Bryan Donlan committed Oct 2, 2020
1 parent 1e585cc commit 7b9331d
Show file tree
Hide file tree
Showing 9 changed files with 623 additions and 7 deletions.
14 changes: 10 additions & 4 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ default = []

# enable everything
full = [
"async-fd",
"blocking",
"dns",
"fs",
Expand All @@ -52,7 +53,7 @@ dns = ["rt-core"]
fs = ["rt-core", "io-util"]
io-util = ["memchr"]
# stdin, stdout, stderr
io-std = ["rt-core"]
io-std = ["rt-core", "mio/os-util"]
macros = ["tokio-macros"]
net = ["dns", "tcp", "udp", "uds"]
process = [
Expand All @@ -76,16 +77,18 @@ signal = [
"libc",
"mio/os-poll",
"mio/uds",
"mio/os-util",
"signal-hook-registry",
"winapi/consoleapi",
]
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
tcp = ["lazy_static", "mio/tcp", "mio/os-poll"]
tcp = ["lazy_static", "mio/tcp", "mio/os-poll", "mio/os-util"]
time = ["slab"]
udp = ["lazy_static", "mio/udp", "mio/os-poll"]
uds = ["lazy_static", "libc", "mio/uds", "mio/os-poll"]
udp = ["lazy_static", "mio/udp", "mio/os-poll", "mio/os-util"]
uds = ["lazy_static", "libc", "mio/uds", "mio/os-poll", "mio/os-util"]
async-fd = ["lazy_static", "mio/udp", "mio/os-poll", "mio/os-util"]

[dependencies]
tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true }
Expand All @@ -108,6 +111,9 @@ tracing = { version = "0.1.16", default-features = false, features = ["std"], op
libc = { version = "0.2.42", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(unix)'.dev-dependencies]
libc = { version = "0.2.42" }

[target.'cfg(windows)'.dependencies.winapi]
version = "0.3.8"
default-features = false
Expand Down
269 changes: 269 additions & 0 deletions tokio/src/io/async_fd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
use std::os::unix::io::RawFd;

use std::io;

use mio::unix::SourceFd;

use crate::io::driver::{Handle, ReadyEvent, ScheduledIo};
use crate::util::slab;

/// Associates a Unix file descriptor with the tokio reactor, allowing for
/// readiness to be polled.
///
/// Creating an AsyncFd registers the file descriptor with the current tokio
/// Reactor, allowing you to directly await the file descriptor being readable
/// or writable. Once registered, the file descriptor remains registered until
/// the AsyncFd is dropped.
///
/// It is the responsibility of the caller to ensure that the AsyncFd is dropped
/// before the associated file descriptor is closed. Failing to do so may result
/// in spurious events or mysterious errors from other tokio IO calls.
///
/// Polling for readiness is done by calling the async functions [`readable`]
/// and [`writable`]. These functions complete when the associated readiness
/// condition is observed. Any number of tasks can query the same `AsyncFd`
/// in parallel, on the same or different conditions.
///
/// On some platforms, the readiness detecting mechanism relies on
/// edge-triggered notifications. This means that the OS will only notify Tokio
/// when the file descriptor transitions from not-ready to ready. Tokio
/// internally tracks when it has received a ready notification, and when
/// readiness checking functions like [`readable`] and [`writable`] are called,
/// if the readiness flag is set, these async functions will complete
/// immediately.
///
/// This however does mean that it is critical to ensure that this ready flag is
/// cleared when (and only when) the file descriptor ceases to be ready. The
/// [`ReadyGuard`] returned from readiness checking functions serves this
/// function; after calling a readiness-checking async function, you must use
/// this [`ReadyGuard`] to signal to tokio whether the file descriptor is no
/// longer in a ready state.
///
/// ## Converting to a poll-based API
///
/// In some cases it may be desirable to use `AsyncFd` from APIs similar to
/// [`TcpStream::poll_read_ready`]. One can do so by allocating a pinned future
/// to perform the poll:
///
/// ```
/// use tokio::io::{ReadyGuard, AsyncFd};
///
/// use std::future::Future;
/// use std::sync::Arc;
/// use std::pin::Pin;
/// use std::task::{Context, Poll};
///
/// use futures::ready;
///
/// struct MyIoStruct {
/// async_fd: Arc<AsyncFd>,
/// poller: Pin<Box<dyn Future<Output=()>>>
/// }
///
/// impl MyIoStruct {
/// fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<std::io::Result<ReadyGuard<'_>>> {
/// let mut result = Poll::Pending;
/// while result.is_pending() {
/// // Poll the saved future; if it's not ready, our context waker will be saved in the
/// // future and we can return.
/// ready!(self.poller.as_mut().poll(cx));
///
/// // Reset the poller future, since we consumed it.
/// let arc = self.async_fd.clone();
/// self.poller = Box::pin(async move {
/// let _ = arc.readable().await.map(|mut guard| guard.retain_ready());
/// });
///
/// // Because we need to bind the ReadyGuard to the lifetime of self, we have to re-poll here.
/// // It's possible that we might race with another thread clearing the ready state, so deal
/// // with that as well.
/// let fut = self.async_fd.readable();
/// tokio::pin!(fut);
/// result = fut.as_mut().poll(cx);
/// }
///
/// result
/// }
/// }
/// ```
///
/// [`readable`]: method@Self::readable
/// [`writable`]: method@Self::writable
/// [`ReadyGuard`]: struct@self::ReadyGuard
pub struct AsyncFd {
handle: Handle,
fd: RawFd,
shared: slab::Ref<ScheduledIo>,
}

impl std::fmt::Debug for AsyncFd {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncFd").field("fd", &self.fd).finish()
}
}

unsafe impl Send for AsyncFd {}
unsafe impl Sync for AsyncFd {}

const fn all_interest() -> mio::Interest {
mio::Interest::READABLE.add(mio::Interest::WRITABLE)
}

/// Represents an IO-ready event detected on a particular file descriptor, which
/// has not yet been acknowledged. This is a `must_use` structure to help ensure
/// that you do not forget to explicitly clear (or not clear) the event.
#[must_use]
pub struct ReadyGuard<'a> {
async_fd: &'a AsyncFd,
event: Option<ReadyEvent>,
}

impl<'a> std::fmt::Debug for ReadyGuard<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClearReady")
.field("async_fd", self.async_fd)
.finish()
}
}

impl<'a> ReadyGuard<'a> {
/// Indicates to tokio that the file descriptor is no longer ready. The
/// internal readiness flag will be cleared, and tokio will wait for the
/// next edge-triggered readiness notification from the OS.
///
/// It is critical that this function not be called unless your code
/// _actually observes_ that the file descriptor is _not_ ready. Do not call
/// it simply because, for example, a read succeeded; it should be called
/// when a read is observed to block.
///
/// [`drop`]: method@std::mem::drop
pub fn clear_ready(&mut self) {
if let Some(event) = self.event.take() {
self.async_fd.shared.clear_readiness(event);
}
}

/// This function should be invoked when you intentionally want to keep the
/// ready flag asserted.
///
/// While this function is itself a no-op, it satisfies the `#[must_use]`
/// constraint on the [`ReadyGuard`] type.
pub fn retain_ready(&mut self) {
// no-op
}

/// Performs the IO operation `f`; if `f` returns a [`WouldBlock`] error,
/// the readiness state associated with this file descriptor is cleared.
///
/// This method helps ensure that the readiness state of the underlying file
/// descriptor remains in sync with the tokio-side readiness state, by
/// clearing the tokio-side state only when a [`WouldBlock`] condition
/// occurs. It is the responsibility of the caller to ensure that `f`
/// returns [`WouldBlock`] only if the file descriptor that originated this
/// `ReadyGuard` no longer expresses the readiness state that was queried to
/// create this `ReadyGuard`.
///
/// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
pub fn with_io<R, E>(&mut self, f: impl FnOnce()->Result<R,E>) -> Result<R,E>
where E: std::error::Error + 'static
{
use std::error::Error;

let result = f();

if let Err(e) = result.as_ref() {
// Is this a WouldBlock error?
let mut error_ref : Option<&(dyn Error + 'static)> = Some(e);

while let Some(current) = error_ref {
if let Some(e) = Error::downcast_ref::<std::io::Error>(current) {
if e.kind() == std::io::ErrorKind::WouldBlock {
self.clear_ready();
break;
}
}
error_ref = current.source();
}
}

result
}

/// Performs the IO operation `f`; if `f` returns [`Pending`], the readiness
/// state associated with this file descriptor is cleared.
///
/// This method helps ensure that the readiness state of the underlying file
/// descriptor remains in sync with the tokio-side readiness state, by
/// clearing the tokio-side state only when a [`Pending`] condition occurs.
/// It is the responsibility of the caller to ensure that `f` returns
/// [`Pending`] only if the file descriptor that originated this
/// `ReadyGuard` no longer expresses the readiness state that was queried to
/// create this `ReadyGuard`.
///
/// [`Pending`]: std::task::Poll::Pending
pub fn with_poll<R>(&mut self, f: impl FnOnce()->std::task::Poll<R>) -> std::task::Poll<R> {
let result = f();

if result.is_pending() {
self.clear_ready();
}

result
}
}

impl Drop for AsyncFd {
fn drop(&mut self) {
if let Some(inner) = self.handle.inner() {
let _ = inner.deregister_source(&mut SourceFd(&self.fd));
}
}
}

impl AsyncFd {
/// Constructs a new AsyncFd, binding this file descriptor to the current tokio Reactor.
///
/// This function must be called in the context of a tokio runtime.
pub fn new(fd: RawFd) -> io::Result<Self> {
Self::new_with_handle(fd, Handle::current())
}

pub(crate) fn new_with_handle(fd: RawFd, handle: Handle) -> io::Result<Self> {
let shared = if let Some(inner) = handle.inner() {
inner.add_source(&mut SourceFd(&fd), all_interest())?
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to find event loop",
));
};

Ok(AsyncFd { handle, fd, shared })
}

async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyGuard<'_>> {
let event = self.shared.readiness(interest).await;
Ok(ReadyGuard {
async_fd: self,
event: Some(event),
})
}

/// Waits for the file descriptor to become readable, returning a
/// [`ReadyGuard`] that must be dropped to resume read-readiness polling.
///
/// [`ReadyGuard`]: struct@self::ReadyGuard
#[must_use]
pub async fn readable(&self) -> io::Result<ReadyGuard<'_>> {
self.readiness(mio::Interest::READABLE).await
}

/// Waits for the file descriptor to become writable, returning a
/// [`ReadyGuard`] that must be dropped to resume write-readiness polling.
///
/// [`ReadyGuard`]: struct@self::ReadyGuard
#[must_use]
pub async fn writable(&self) -> io::Result<ReadyGuard<'_>> {
self.readiness(mio::Interest::WRITABLE).await
}
}
4 changes: 2 additions & 2 deletions tokio/src/io/driver/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ cfg_io_readiness! {

#[derive(Debug, Default)]
struct Waiters {
#[cfg(any(feature = "udp", feature = "uds"))]
#[cfg(any(feature = "udp", feature = "uds", feature = "async-fd"))]
/// List of all current waiters
list: WaitList,

Expand Down Expand Up @@ -203,7 +203,7 @@ impl ScheduledIo {
}
}

#[cfg(any(feature = "udp", feature = "uds"))]
#[cfg(any(feature = "udp", feature = "uds", feature = "async-fd"))]
{
// check list of waiters
for waiter in waiters.list.drain_filter(|w| ready.satisfies(w.interest)) {
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ cfg_io_driver! {
mod registration;
}

cfg_async_fd_unix! {
mod async_fd;

pub use self::async_fd::{AsyncFd, ReadyGuard};
}

cfg_io_std! {
mod stdio_common;

Expand Down
1 change: 1 addition & 0 deletions tokio/src/io/poll_evented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl<E: Source> PollEvented<E> {
/// Returns a shared reference to the underlying I/O object this readiness
/// stream is wrapping.
#[cfg(any(
feature = "async-fd",
feature = "process",
feature = "tcp",
feature = "udp",
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
//! - `tcp`: Enables all `tokio::net::tcp` types.
//! - `udp`: Enables all `tokio::net::udp` types.
//! - `uds`: Enables all `tokio::net::unix` types.
//! - `async-fd`: Enables the `tokio::io::AsyncFd` and associated types (available on
//! unix-like systems only).
//! - `time`: Enables `tokio::time` types and allows the schedulers to enable
//! the built in timer.
//! - `process`: Enables `tokio::process` types.
Expand Down
Loading

0 comments on commit 7b9331d

Please sign in to comment.