-
Notifications
You must be signed in to change notification settings - Fork 286
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
599 additions
and
297 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,264 +1,11 @@ | ||
use std::os::unix::prelude::AsRawFd; | ||
use std::{collections::VecDeque, io, os::unix::net::UnixStream, time::Duration}; | ||
#[cfg(feature = "use-dev-tty")] | ||
pub(crate) mod tty; | ||
|
||
use signal_hook::low_level::pipe; | ||
#[cfg(not(feature = "use-dev-tty"))] | ||
pub(crate) mod mio; | ||
|
||
use crate::event::timeout::PollTimeout; | ||
use crate::event::Event; | ||
use crate::Result; | ||
#[cfg(feature = "use-dev-tty")] | ||
pub(crate) use self::tty::UnixInternalEventSource; | ||
|
||
use filedescriptor::{poll, pollfd, POLLIN}; | ||
|
||
#[cfg(feature = "event-stream")] | ||
use super::super::sys::Waker; | ||
|
||
use super::{ | ||
super::{ | ||
sys::unix::{ | ||
file_descriptor::{tty_fd, FileDesc}, | ||
parse::parse_event, | ||
}, | ||
InternalEvent, | ||
}, | ||
EventSource, | ||
}; | ||
|
||
/// Holds a prototypical Waker and a receiver we can wait on when doing select(). | ||
#[cfg(feature = "event-stream")] | ||
struct WakePipe { | ||
receiver: UnixStream, | ||
waker: Waker, | ||
} | ||
|
||
#[cfg(feature = "event-stream")] | ||
impl WakePipe { | ||
fn new() -> Result<Self> { | ||
let (receiver, sender) = nonblocking_unix_pair()?; | ||
Ok(WakePipe { | ||
receiver, | ||
waker: Waker::new(sender), | ||
}) | ||
} | ||
} | ||
|
||
// I (@zrzka) wasn't able to read more than 1_022 bytes when testing | ||
// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes | ||
// is enough. | ||
const TTY_BUFFER_SIZE: usize = 1_024; | ||
|
||
pub(crate) struct UnixInternalEventSource { | ||
parser: Parser, | ||
tty_buffer: [u8; TTY_BUFFER_SIZE], | ||
tty: FileDesc, | ||
winch_signal_receiver: UnixStream, | ||
#[cfg(feature = "event-stream")] | ||
wake_pipe: WakePipe, | ||
} | ||
|
||
fn nonblocking_unix_pair() -> Result<(UnixStream, UnixStream)> { | ||
let (receiver, sender) = UnixStream::pair()?; | ||
receiver.set_nonblocking(true)?; | ||
sender.set_nonblocking(true)?; | ||
Ok((receiver, sender)) | ||
} | ||
|
||
impl UnixInternalEventSource { | ||
pub fn new() -> Result<Self> { | ||
UnixInternalEventSource::from_file_descriptor(tty_fd()?) | ||
} | ||
|
||
pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result<Self> { | ||
Ok(UnixInternalEventSource { | ||
parser: Parser::default(), | ||
tty_buffer: [0u8; TTY_BUFFER_SIZE], | ||
tty: input_fd, | ||
winch_signal_receiver: { | ||
let (receiver, sender) = nonblocking_unix_pair()?; | ||
// Unregistering is unnecessary because EventSource is a singleton | ||
pipe::register(libc::SIGWINCH, sender)?; | ||
receiver | ||
}, | ||
#[cfg(feature = "event-stream")] | ||
wake_pipe: WakePipe::new()?, | ||
}) | ||
} | ||
} | ||
|
||
/// read_complete reads from a non-blocking file descriptor | ||
/// until the buffer is full or it would block. | ||
/// | ||
/// Similar to `std::io::Read::read_to_end`, except this function | ||
/// only fills the given buffer and does not read beyond that. | ||
fn read_complete(fd: &FileDesc, buf: &mut [u8]) -> Result<usize> { | ||
loop { | ||
match fd.read(buf, buf.len()) { | ||
Ok(x) => return Ok(x), | ||
Err(e) => match e.kind() { | ||
io::ErrorKind::WouldBlock => return Ok(0), | ||
io::ErrorKind::Interrupted => continue, | ||
_ => return Err(e), | ||
}, | ||
} | ||
} | ||
} | ||
|
||
impl EventSource for UnixInternalEventSource { | ||
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> { | ||
let timeout = PollTimeout::new(timeout); | ||
|
||
fn make_pollfd<F: AsRawFd>(fd: &F) -> pollfd { | ||
pollfd { | ||
fd: fd.as_raw_fd(), | ||
events: POLLIN, | ||
revents: 0, | ||
} | ||
} | ||
|
||
#[cfg(not(feature = "event-stream"))] | ||
let mut fds = [ | ||
make_pollfd(&self.tty), | ||
make_pollfd(&self.winch_signal_receiver), | ||
]; | ||
|
||
#[cfg(feature = "event-stream")] | ||
let mut fds = [ | ||
make_pollfd(&self.tty), | ||
make_pollfd(&self.winch_signal_receiver), | ||
make_pollfd(&self.wake_pipe.receiver), | ||
]; | ||
|
||
while timeout.leftover().map_or(true, |t| !t.is_zero()) { | ||
// check if there are buffered events from the last read | ||
if let Some(event) = self.parser.next() { | ||
return Ok(Some(event)); | ||
} | ||
match poll(&mut fds, timeout.leftover()) { | ||
Err(filedescriptor::Error::Io(e)) => return Err(e), | ||
res => res.expect("polling tty"), | ||
}; | ||
if fds[0].revents & POLLIN != 0 { | ||
loop { | ||
let read_count = read_complete(&self.tty, &mut self.tty_buffer)?; | ||
if read_count > 0 { | ||
self.parser.advance( | ||
&self.tty_buffer[..read_count], | ||
read_count == TTY_BUFFER_SIZE, | ||
); | ||
} | ||
|
||
if let Some(event) = self.parser.next() { | ||
return Ok(Some(event)); | ||
} | ||
|
||
if read_count == 0 { | ||
break; | ||
} | ||
} | ||
} | ||
if fds[1].revents & POLLIN != 0 { | ||
let fd = FileDesc::new(self.winch_signal_receiver.as_raw_fd(), false); | ||
// drain the pipe | ||
while read_complete(&fd, &mut [0; 1024])? != 0 {} | ||
// TODO Should we remove tput? | ||
// | ||
// This can take a really long time, because terminal::size can | ||
// launch new process (tput) and then it parses its output. It's | ||
// not a really long time from the absolute time point of view, but | ||
// it's a really long time from the mio, async-std/tokio executor, ... | ||
// point of view. | ||
let new_size = crate::terminal::size()?; | ||
return Ok(Some(InternalEvent::Event(Event::Resize( | ||
new_size.0, new_size.1, | ||
)))); | ||
} | ||
|
||
#[cfg(feature = "event-stream")] | ||
if fds[2].revents & POLLIN != 0 { | ||
let fd = FileDesc::new(self.wake_pipe.receiver.as_raw_fd(), false); | ||
// drain the pipe | ||
while read_complete(&fd, &mut [0; 1024])? != 0 {} | ||
|
||
return Err(std::io::Error::new( | ||
std::io::ErrorKind::Interrupted, | ||
"Poll operation was woken up by `Waker::wake`", | ||
)); | ||
} | ||
} | ||
Ok(None) | ||
} | ||
|
||
#[cfg(feature = "event-stream")] | ||
fn waker(&self) -> Waker { | ||
self.wake_pipe.waker.clone() | ||
} | ||
} | ||
|
||
// | ||
// Following `Parser` structure exists for two reasons: | ||
// | ||
// * mimic anes Parser interface | ||
// * move the advancing, parsing, ... stuff out of the `try_read` method | ||
// | ||
#[derive(Debug)] | ||
struct Parser { | ||
buffer: Vec<u8>, | ||
internal_events: VecDeque<InternalEvent>, | ||
} | ||
|
||
impl Default for Parser { | ||
fn default() -> Self { | ||
Parser { | ||
// This buffer is used for -> 1 <- ANSI escape sequence. Are we | ||
// aware of any ANSI escape sequence that is bigger? Can we make | ||
// it smaller? | ||
// | ||
// Probably not worth spending more time on this as "there's a plan" | ||
// to use the anes crate parser. | ||
buffer: Vec::with_capacity(256), | ||
// TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can | ||
// fit? What is an average sequence length? Let's guess here | ||
// and say that the average ANSI escape sequence length is 8 bytes. Thus | ||
// the buffer size should be 1024/8=128 to avoid additional allocations | ||
// when processing large amounts of data. | ||
// | ||
// There's no need to make it bigger, because when you look at the `try_read` | ||
// method implementation, all events are consumed before the next TTY_BUFFER | ||
// is processed -> events pushed. | ||
internal_events: VecDeque::with_capacity(128), | ||
} | ||
} | ||
} | ||
|
||
impl Parser { | ||
fn advance(&mut self, buffer: &[u8], more: bool) { | ||
for (idx, byte) in buffer.iter().enumerate() { | ||
let more = idx + 1 < buffer.len() || more; | ||
|
||
self.buffer.push(*byte); | ||
|
||
match parse_event(&self.buffer, more) { | ||
Ok(Some(ie)) => { | ||
self.internal_events.push_back(ie); | ||
self.buffer.clear(); | ||
} | ||
Ok(None) => { | ||
// Event can't be parsed, because we don't have enough bytes for | ||
// the current sequence. Keep the buffer and process next bytes. | ||
} | ||
Err(_) => { | ||
// Event can't be parsed (not enough parameters, parameter is not a number, ...). | ||
// Clear the buffer and continue with another sequence. | ||
self.buffer.clear(); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl Iterator for Parser { | ||
type Item = InternalEvent; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
self.internal_events.pop_front() | ||
} | ||
} | ||
#[cfg(not(feature = "use-dev-tty"))] | ||
pub(crate) use self::mio::UnixInternalEventSource; |
Oops, something went wrong.