Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: signal driver now uses I/O driver directly #5125

Merged
merged 2 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@
//! [`Sink`]: https://docs.rs/futures/0.3/futures/sink/trait.Sink.html
//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
//! [`Write`]: std::io::Write

#![cfg_attr(
not(all(feature = "rt", feature = "net")),
allow(dead_code, unused_imports)
)]

cfg_io_blocking! {
pub(crate) mod blocking;
}
Expand Down
6 changes: 1 addition & 5 deletions tokio/src/io/poll_evented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ impl<E: Source> PollEvented<E> {
}

/// Returns a reference to the registration.
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
#[cfg(any(feature = "net"))]
pub(crate) fn registration(&self) -> &Registration {
&self.registration
}
Expand Down
29 changes: 27 additions & 2 deletions tokio/src/runtime/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
#![cfg_attr(not(all(feature = "rt", feature = "net")), allow(dead_code))]

mod registration;
pub(crate) use registration::Registration;
Expand Down Expand Up @@ -26,6 +26,9 @@ pub(crate) struct Driver {
/// as it is mostly used to determine when to call `compact()`.
tick: u8,

/// True when an event with the signal token is received
signal_ready: bool,

/// Reuse the `mio::Events` value across calls to poll.
events: mio::Events,

Expand Down Expand Up @@ -86,6 +89,7 @@ enum Tick {
// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
const TOKEN_SIGNAL: mio::Token = mio::Token(1 + (1 << 31));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the reserved space starting at 1 << 31.

I expect a later I/O driver refactor will change this.


const ADDRESS: bit::Pack = bit::Pack::least_significant(24);

Expand Down Expand Up @@ -119,6 +123,7 @@ impl Driver {

Ok(Driver {
tick: 0,
signal_ready: false,
events: mio::Events::with_capacity(1024),
poll,
resources: slab,
Expand Down Expand Up @@ -193,7 +198,11 @@ impl Driver {
for event in events.iter() {
let token = event.token();

if token != TOKEN_WAKEUP {
if token == TOKEN_WAKEUP {
// Nothing to do, the event is used to unblock the I/O driver
} else if token == TOKEN_SIGNAL {
self.signal_ready = true;
} else {
Self::dispatch(
&mut self.resources,
self.tick,
Expand Down Expand Up @@ -378,3 +387,19 @@ impl Direction {
}
}
}

// Signal handling
cfg_signal_internal_and_unix! {
impl Driver {
pub(crate) fn register_signal_receiver(&mut self, receiver: &mut mio::net::UnixStream) -> io::Result<()> {
self.inner.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
Ok(())
}

pub(crate) fn consume_signal_ready(&mut self) -> bool {
let ret = self.signal_ready;
self.signal_ready = false;
ret
}
}
}
64 changes: 21 additions & 43 deletions tokio/src/runtime/signal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@

//! Signal driver

use crate::io::interest::Interest;
use crate::io::PollEvented;
use crate::runtime::io;
use crate::signal::registry::globals;

use mio::net::UnixStream;
use std::io::{self as std_io, Read};
use std::ptr;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;

/// Responsible for registering wakeups when an OS signal is received, and
Expand All @@ -22,10 +18,10 @@ use std::time::Duration;
#[derive(Debug)]
pub(crate) struct Driver {
/// Thread parker. The `Driver` park implementation delegates to this.
park: io::Driver,
io: io::Driver,

/// A pipe for receiving wake events from the signal handler
receiver: PollEvented<UnixStream>,
receiver: UnixStream,

/// Shared state
inner: Arc<Inner>,
Expand All @@ -43,7 +39,7 @@ pub(super) struct Inner(());

impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(park: io::Driver) -> std_io::Result<Self> {
pub(crate) fn new(mut io: io::Driver) -> std_io::Result<Self> {
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};

Expand All @@ -55,12 +51,11 @@ impl Driver {
// I'm not sure if the second (failed) registration simply doesn't end
// up receiving wake up notifications, or there could be some race
// condition when consuming readiness events, but having distinct
// descriptors for distinct PollEvented instances appears to mitigate
// this.
// descriptors appears to mitigate this.
//
// Unfortunately we cannot just use a single global PollEvented instance
// either, since we can't compare Handles or assume they will always
// point to the exact same reactor.
// Unfortunately we cannot just use a single global UnixStream instance
// either, since we can't assume they will always be registered with the
// exact same reactor.
//
// Mio 0.7 removed `try_clone()` as an API due to unexpected behavior
// with registering dups with the same reactor. In this case, duping is
Expand All @@ -73,12 +68,12 @@ impl Driver {
// safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe.
let original =
ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
let receiver = UnixStream::from_std(original.try_clone()?);
let receiver =
PollEvented::new_with_interest_and_handle(receiver, Interest::READABLE, park.handle())?;
let mut receiver = UnixStream::from_std(original.try_clone()?);

io.register_signal_receiver(&mut receiver)?;

Ok(Self {
park,
io,
receiver,
inner: Arc::new(Inner(())),
})
Expand All @@ -93,60 +88,43 @@ impl Driver {
}

pub(crate) fn park(&mut self) {
self.park.park();
self.io.park();
self.process();
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.park.park_timeout(duration);
self.io.park_timeout(duration);
self.process();
}

pub(crate) fn shutdown(&mut self) {
self.park.shutdown()
self.io.shutdown()
}

fn process(&self) {
// Check if the pipe is ready to read and therefore has "woken" us up
//
// To do so, we will `poll_read_ready` with a noop waker, since we don't
// need to actually be notified when read ready...
let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) };
let mut cx = Context::from_waker(&waker);

let ev = match self.receiver.registration().poll_read_ready(&mut cx) {
Poll::Ready(Ok(ev)) => ev,
Poll::Ready(Err(e)) => panic!("reactor gone: {}", e),
Poll::Pending => return, // No wake has arrived, bail
};
fn process(&mut self) {
// If the signal pipe has not recieved a readiness event, then there is
// nothing else to do.
if !self.io.consume_signal_ready() {
return;
}

// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
loop {
match (&*self.receiver).read(&mut buf) {
match self.receiver.read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {}", e),
}
}

self.receiver.registration().clear_readiness(ev);

// Broadcast any signals which were received
globals().broadcast();
}
}

const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);

unsafe fn noop_clone(_data: *const ()) -> RawWaker {
RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)
}

unsafe fn noop(_data: *const ()) {}

// ===== impl Handle =====

impl Handle {
Expand Down
8 changes: 3 additions & 5 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,20 +375,18 @@ fn io_driver_fd_count() {
let rt = current_thread();
let metrics = rt.metrics();

// Since this is enabled w/ the process driver we always
// have 1 fd registered.
assert_eq!(metrics.io_driver_fd_registered_count(), 1);
assert_eq!(metrics.io_driver_fd_registered_count(), 0);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nice side effect: this is actually correct now.


let stream = tokio::net::TcpStream::connect("google.com:80");
let stream = rt.block_on(async move { stream.await.unwrap() });

assert_eq!(metrics.io_driver_fd_registered_count(), 2);
assert_eq!(metrics.io_driver_fd_registered_count(), 1);
assert_eq!(metrics.io_driver_fd_deregistered_count(), 0);

drop(stream);

assert_eq!(metrics.io_driver_fd_deregistered_count(), 1);
assert_eq!(metrics.io_driver_fd_registered_count(), 2);
assert_eq!(metrics.io_driver_fd_registered_count(), 1);
}

#[cfg(any(target_os = "linux", target_os = "macos"))]
Expand Down