Skip to content

Commit

Permalink
replace mio polling with filedescriptor
Browse files Browse the repository at this point in the history
since mio doesn't handle ptys correctly macOS, use filedescriptor's
poll() which falls back to select().
  • Loading branch information
yyogo committed Jan 9, 2023
1 parent 81eb00f commit e9fa5db
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 128 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ crossterm_winapi = "0.9"
#
[target.'cfg(unix)'.dependencies]
libc = "0.2"
mio = { version = "0.8", features = ["os-poll"] }
signal-hook = { version = "0.3.13" }
signal-hook-mio = { version = "0.2.3", features = ["support-v0_8"] }
filedescriptor = "0.8"

#
# Dev dependencies (examples, ...)
Expand Down
251 changes: 137 additions & 114 deletions src/event/source/unix.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,65 @@
use std::{collections::VecDeque, io, time::Duration};
use std::os::unix::prelude::AsRawFd;
use std::{collections::VecDeque, io, os::unix::net::UnixStream, time::Duration};

use mio::{unix::SourceFd, Events, Interest, Poll, Token};
use signal_hook_mio::v0_8::Signals;
use signal_hook::low_level::pipe;

use crate::event::timeout::PollTimeout;
use crate::event::Event;
use crate::Result;

use filedescriptor::{poll, pollfd, POLLIN};

#[cfg(feature = "event-stream")]
use super::super::sys::Waker;
use super::super::{
source::EventSource,
sys::unix::{
file_descriptor::{tty_fd, FileDesc},
parse::parse_event,

use super::{
super::{
sys::unix::{
file_descriptor::{tty_fd, FileDesc},
parse::parse_event,
},
InternalEvent,
},
timeout::PollTimeout,
Event, InternalEvent,
EventSource,
};

// Tokens to identify file descriptor
const TTY_TOKEN: Token = Token(0);
const SIGNAL_TOKEN: Token = Token(1);
/// Holds a prototypical Waker and a receiver we can wait on when doing select().
#[cfg(feature = "event-stream")]
const WAKE_TOKEN: Token = Token(2);
struct WakePipe {
receiver: UnixStream,
waker: Waker,
}

#[cfg(feature = "event-stream")]
impl WakePipe {
fn new() -> Result<Self> {
let (receiver, sender) = nonblocking_unix_pair()?;
Ok(WakePipe {
receiver,
waker: Waker::new(sender),
})
}
}

// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
// is enough.
const TTY_BUFFER_SIZE: usize = 1_024;

pub(crate) struct UnixInternalEventSource {
poll: Poll,
events: Events,
parser: Parser,
tty_buffer: [u8; TTY_BUFFER_SIZE],
tty_fd: FileDesc,
signals: Signals,
tty: FileDesc,
winch_signal_receiver: UnixStream,
#[cfg(feature = "event-stream")]
waker: Waker,
wake_pipe: WakePipe,
}

fn nonblocking_unix_pair() -> Result<(UnixStream, UnixStream)> {
let (receiver, sender) = UnixStream::pair()?;
receiver.set_nonblocking(true)?;
sender.set_nonblocking(true)?;
Ok((receiver, sender))
}

impl UnixInternalEventSource {
Expand All @@ -45,128 +68,128 @@ impl UnixInternalEventSource {
}

pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result<Self> {
let poll = Poll::new()?;
let registry = poll.registry();

let tty_raw_fd = input_fd.raw_fd();
let mut tty_ev = SourceFd(&tty_raw_fd);
registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?;

let mut signals = Signals::new(&[signal_hook::consts::SIGWINCH])?;
registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?;

#[cfg(feature = "event-stream")]
let waker = Waker::new(registry, WAKE_TOKEN)?;

Ok(UnixInternalEventSource {
poll,
events: Events::with_capacity(3),
parser: Parser::default(),
tty_buffer: [0u8; TTY_BUFFER_SIZE],
tty_fd: input_fd,
signals,
tty: input_fd,
winch_signal_receiver: {
let (receiver, sender) = nonblocking_unix_pair()?;
// Unregistering is unnecessary because EventSource is a singleton
pipe::register(libc::SIGWINCH, sender)?;
receiver
},
#[cfg(feature = "event-stream")]
waker,
wake_pipe: WakePipe::new()?,
})
}
}

/// read_complete reads from a non-blocking file descriptor
/// until the buffer is full or it would block.
///
/// Similar to `std::io::Read::read_to_end`, except this function
/// only fills the given buffer and does not read beyond that.
fn read_complete(fd: &FileDesc, buf: &mut [u8]) -> Result<usize> {
loop {
match fd.read(buf, buf.len()) {
Ok(x) => return Ok(x),
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => return Ok(0),
io::ErrorKind::Interrupted => continue,
_ => return Err(e),
},
}
}
}

impl EventSource for UnixInternalEventSource {
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> {
if let Some(event) = self.parser.next() {
return Ok(Some(event));
let timeout = PollTimeout::new(timeout);

fn make_pollfd<F: AsRawFd>(fd: &F) -> pollfd {
pollfd {
fd: fd.as_raw_fd(),
events: POLLIN,
revents: 0,
}
}

let timeout = PollTimeout::new(timeout);
#[cfg(not(feature = "event-stream"))]
let mut fds = [
make_pollfd(&self.tty),
make_pollfd(&self.winch_signal_receiver),
];

loop {
if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) {
// Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds.
// Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned).
// https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes
if e.kind() == io::ErrorKind::Interrupted {
continue;
} else {
return Err(e);
}
};
#[cfg(feature = "event-stream")]
let mut fds = [
make_pollfd(&self.tty),
make_pollfd(&self.winch_signal_receiver),
make_pollfd(&self.wake_pipe.receiver),
];

if self.events.is_empty() {
// No readiness events = timeout
return Ok(None);
while timeout.leftover().map_or(true, |t| !t.is_zero()) {
// check if there are buffered events from the last read
if let Some(event) = self.parser.next() {
return Ok(Some(event));
}

for token in self.events.iter().map(|x| x.token()) {
match token {
TTY_TOKEN => {
loop {
match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) {
Ok(read_count) => {
if read_count > 0 {
self.parser.advance(
&self.tty_buffer[..read_count],
read_count == TTY_BUFFER_SIZE,
);
}
}
Err(e) => {
// No more data to read at the moment. We will receive another event
if e.kind() == io::ErrorKind::WouldBlock {
break;
}
// once more data is available to read.
else if e.kind() == io::ErrorKind::Interrupted {
continue;
}
}
};

if let Some(event) = self.parser.next() {
return Ok(Some(event));
}
}
match poll(&mut fds, timeout.leftover()) {
Err(filedescriptor::Error::Io(e)) => return Err(e),
res => res.expect("polling tty"),
};
if fds[0].events & POLLIN != 0 {
loop {
let read_count = read_complete(&self.tty, &mut self.tty_buffer)?;
if read_count > 0 {
self.parser.advance(
&self.tty_buffer[..read_count],
read_count == TTY_BUFFER_SIZE,
);
}
SIGNAL_TOKEN => {
for signal in self.signals.pending() {
match signal {
signal_hook::consts::SIGWINCH => {
// TODO Should we remove tput?
//
// This can take a really long time, because terminal::size can
// launch new process (tput) and then it parses its output. It's
// not a really long time from the absolute time point of view, but
// it's a really long time from the mio, async-std/tokio executor, ...
// point of view.
let new_size = crate::terminal::size()?;
return Ok(Some(InternalEvent::Event(Event::Resize(
new_size.0, new_size.1,
))));
}
_ => unreachable!("Synchronize signal registration & handling"),
};
}

if let Some(event) = self.parser.next() {
return Ok(Some(event));
}
#[cfg(feature = "event-stream")]
WAKE_TOKEN => {
return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Poll operation was woken up by `Waker::wake`",
));

if read_count == 0 {
break;
}
_ => unreachable!("Synchronize Evented handle registration & token handling"),
}
}
if fds[1].events & POLLIN != 0 {
let fd = FileDesc::new(self.winch_signal_receiver.as_raw_fd(), false);
// drain the pipe
while read_complete(&fd, &mut [0; 1024])? != 0 {}
// TODO Should we remove tput?
//
// This can take a really long time, because terminal::size can
// launch new process (tput) and then it parses its output. It's
// not a really long time from the absolute time point of view, but
// it's a really long time from the mio, async-std/tokio executor, ...
// point of view.
let new_size = crate::terminal::size()?;
return Ok(Some(InternalEvent::Event(Event::Resize(
new_size.0, new_size.1,
))));
}

#[cfg(feature = "event-stream")]
if fds[2].events & POLLIN != 0 {
let fd = FileDesc::new(self.wake_pipe.receiver.as_raw_fd(), false);
// drain the pipe
while read_complete(&fd, &mut [0; 1024])? != 0 {}

// Processing above can take some time, check if timeout expired
if timeout.elapsed() {
return Ok(None);
return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Poll operation was woken up by `Waker::wake`",
));
}
}
Ok(None)
}

#[cfg(feature = "event-stream")]
fn waker(&self) -> Waker {
self.waker.clone()
self.wake_pipe.waker.clone()
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/event/sys/unix/file_descriptor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{
fs, io,
os::unix::io::{IntoRawFd, RawFd},
os::unix::{
io::{IntoRawFd, RawFd},
prelude::AsRawFd,
},
};

use libc::size_t;
Expand Down Expand Up @@ -63,6 +66,12 @@ impl Drop for FileDesc {
}
}

impl AsRawFd for FileDesc {
fn as_raw_fd(&self) -> RawFd {
self.raw_fd()
}
}

/// Creates a file descriptor pointing to the standard input or `/dev/tty`.
pub fn tty_fd() -> Result<FileDesc> {
let (fd, close_on_drop) = if unsafe { libc::isatty(libc::STDIN_FILENO) == 1 } {
Expand Down
24 changes: 13 additions & 11 deletions src/event/sys/unix/waker.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
use std::sync::{Arc, Mutex};

use mio::{Registry, Token};
use std::{
io::Write,
os::unix::net::UnixStream,
sync::{Arc, Mutex},
};

use crate::Result;

/// Allows to wake up the `mio::Poll::poll()` method.
/// This type wraps `mio::Waker`, for more information see its documentation.
/// Allows to wake up the EventSource::try_read() method.
#[derive(Clone, Debug)]
pub(crate) struct Waker {
inner: Arc<Mutex<mio::Waker>>,
inner: Arc<Mutex<UnixStream>>,
}

impl Waker {
/// Create a new `Waker`.
pub(crate) fn new(registry: &Registry, waker_token: Token) -> Result<Self> {
Ok(Self {
inner: Arc::new(Mutex::new(mio::Waker::new(registry, waker_token)?)),
})
pub(crate) fn new(writer: UnixStream) -> Self {
Self {
inner: Arc::new(Mutex::new(writer)),
}
}

/// Wake up the [`Poll`] associated with this `Waker`.
///
/// Readiness is set to `Ready::readable()`.
pub(crate) fn wake(&self) -> Result<()> {
self.inner.lock().unwrap().wake()
self.inner.lock().unwrap().write(&[0])?;
Ok(())
}

/// Resets the state so the same waker can be reused.
Expand Down

0 comments on commit e9fa5db

Please sign in to comment.