From 149b27f73ad2b4c636d237ebf75e94cc1002cd91 Mon Sep 17 00:00:00 2001 From: roblabla Date: Fri, 22 Jan 2021 13:16:30 +0000 Subject: [PATCH 1/2] Update to mio 0.7, remove mio-extras --- Cargo.toml | 3 +-- src/error.rs | 7 ------ src/inotify.rs | 62 ++++++++++++++++++++++++-------------------------- 3 files changed, 31 insertions(+), 41 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dd3c4bdc..e9759752 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,8 +32,7 @@ walkdir = "2.0.1" [target.'cfg(target_os="linux")'.dependencies] inotify = { version = "0.8", default-features = false } -mio = "0.6.15" -mio-extras = "2.0.5" +mio = { version = "0.7.7", features = ["os-ext"] } [target.'cfg(target_os="macos")'.dependencies] fsevent = "2.0.1" diff --git a/src/error.rs b/src/error.rs index 31505d85..5d9867ba 100644 --- a/src/error.rs +++ b/src/error.rs @@ -145,13 +145,6 @@ impl From> for Error { } } -#[cfg(target_os = "linux")] -impl From> for Error { - fn from(err: mio_extras::channel::SendError) -> Self { - Error::generic(&format!("internal channel error: {:?}", err)) - } -} - #[test] fn display_formatted_errors() { let expected = "Some error"; diff --git a/src/inotify.rs b/src/inotify.rs index 4d68391d..13cf56df 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -15,7 +15,7 @@ use std::ffi::OsStr; use std::fs::metadata; use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use std::sync::Arc; use std::thread; use std::time::Duration; use walkdir::WalkDir; @@ -31,8 +31,9 @@ const MESSAGE: mio::Token = mio::Token(1); struct EventLoop { running: bool, poll: mio::Poll, - event_loop_tx: mio_extras::channel::Sender, - event_loop_rx: mio_extras::channel::Receiver, + event_loop_waker: Arc, + event_loop_tx: crossbeam_channel::Sender, + event_loop_rx: crossbeam_channel::Receiver, inotify: Option, event_fn: Box, watches: HashMap, @@ -41,7 +42,10 @@ struct EventLoop { } /// Watcher implementation based on inotify -pub struct INotifyWatcher(Mutex>); +pub struct INotifyWatcher { + channel: crossbeam_channel::Sender, + waker: Arc, +} enum EventLoopMsg { AddWatch(PathBuf, RecursiveMode, Sender>), @@ -93,30 +97,21 @@ fn remove_watch_by_event( impl EventLoop { pub fn new(inotify: Inotify, event_fn: Box) -> Result { - let (event_loop_tx, event_loop_rx) = mio_extras::channel::channel::(); + let (event_loop_tx, event_loop_rx) = crossbeam_channel::unbounded::(); let poll = mio::Poll::new()?; - poll.register( - &event_loop_rx, - MESSAGE, - mio::Ready::readable(), - mio::PollOpt::edge(), - )?; + + let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?); let inotify_fd = inotify.as_raw_fd(); - let evented_inotify = mio::unix::EventedFd(&inotify_fd); - poll.register( - &evented_inotify, - INOTIFY, - mio::Ready::readable(), - // Use level-sensitive polling (issue #267): `Inotify::read_events` - // only consumes one buffer's worth of events at a time, so events - // may remain in the inotify fd after calling handle_inotify. - mio::PollOpt::level(), - )?; + let mut evented_inotify = mio::unix::SourceFd(&inotify_fd); + // TODO: Fix #267 + poll.registry() + .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?; let event_loop = EventLoop { running: true, poll, + event_loop_waker, event_loop_tx, event_loop_rx, inotify: Some(inotify), @@ -151,12 +146,8 @@ impl EventLoop { } } - fn channel(&self) -> mio_extras::channel::Sender { - self.event_loop_tx.clone() - } - // Handle a single event. - fn handle_event(&mut self, event: &mio::Event) { + fn handle_event(&mut self, event: &mio::event::Event) { match event.token() { MESSAGE => { // The channel is readable - handle messages. @@ -382,12 +373,14 @@ impl EventLoop { if let Some(ref rename_event) = self.rename_event { let event_loop_tx = self.event_loop_tx.clone(); + let waker = self.event_loop_waker.clone(); let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie thread::spawn(move || { thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event event_loop_tx .send(EventLoopMsg::RenameTimeout(cookie)) .unwrap(); + waker.wake().unwrap(); }); } } @@ -520,9 +513,10 @@ impl INotifyWatcher { fn from_event_fn(event_fn: Box) -> Result { let inotify = Inotify::init()?; let event_loop = EventLoop::new(inotify, event_fn)?; - let channel = event_loop.channel(); + let channel = event_loop.event_loop_tx.clone(); + let waker = event_loop.event_loop_waker.clone(); event_loop.run(); - Ok(INotifyWatcher(Mutex::new(channel))) + Ok(INotifyWatcher { channel, waker }) } fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { @@ -536,7 +530,8 @@ impl INotifyWatcher { let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx); // we expect the event loop to live and reply => unwraps must not panic - self.0.lock().unwrap().send(msg).unwrap(); + self.channel.send(msg).unwrap(); + self.waker.wake().unwrap(); rx.recv().unwrap() } @@ -551,7 +546,8 @@ impl INotifyWatcher { let msg = EventLoopMsg::RemoveWatch(pb, tx); // we expect the event loop to live and reply => unwraps must not panic - self.0.lock().unwrap().send(msg).unwrap(); + self.channel.send(msg).unwrap(); + self.waker.wake().unwrap(); rx.recv().unwrap() } } @@ -571,7 +567,8 @@ impl Watcher for INotifyWatcher { fn configure(&mut self, config: Config) -> Result { let (tx, rx) = bounded(1); - self.0.lock()?.send(EventLoopMsg::Configure(config, tx))?; + self.channel.send(EventLoopMsg::Configure(config, tx))?; + self.waker.wake()?; rx.recv()? } } @@ -579,6 +576,7 @@ impl Watcher for INotifyWatcher { impl Drop for INotifyWatcher { fn drop(&mut self) { // we expect the event loop to live => unwrap must not panic - self.0.lock().unwrap().send(EventLoopMsg::Shutdown).unwrap(); + self.channel.send(EventLoopMsg::Shutdown).unwrap(); + self.waker.wake().unwrap(); } } From 5c48373a5d72f4f5fe09e892247d1d0fcee4d29b Mon Sep 17 00:00:00 2001 From: roblabla Date: Fri, 22 Jan 2021 13:53:10 +0000 Subject: [PATCH 2/2] Fix #267 by repeatedly calling INotify read_buffer When INotify::read_buffer finally returns an empty iterator, we break out of the loop. This should allow polling with edge triggering. --- src/inotify.rs | 315 +++++++++++++++++++++++++------------------------ 1 file changed, 162 insertions(+), 153 deletions(-) diff --git a/src/inotify.rs b/src/inotify.rs index 13cf56df..e9e66e6b 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -104,7 +104,6 @@ impl EventLoop { let inotify_fd = inotify.as_raw_fd(); let mut evented_inotify = mio::unix::SourceFd(&inotify_fd); - // TODO: Fix #267 poll.registry() .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?; @@ -203,48 +202,65 @@ impl EventLoop { if let Some(ref mut inotify) = self.inotify { let mut buffer = [0; 1024]; - match inotify.read_events(&mut buffer) { - Ok(events) => { - for event in events { - if event.mask.contains(EventMask::Q_OVERFLOW) { - let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan)); - (self.event_fn)(ev); - } + // Read all buffers available. + loop { + match inotify.read_events(&mut buffer) { + Ok(events) => { + let mut num_events = 0; + for event in events { + num_events += 1; + if event.mask.contains(EventMask::Q_OVERFLOW) { + let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan)); + (self.event_fn)(ev); + } - let path = match event.name { - Some(name) => self.paths.get(&event.wd).map(|root| root.join(&name)), - None => self.paths.get(&event.wd).cloned(), - }; - - if event.mask.contains(EventMask::MOVED_FROM) { - send_pending_rename_event(&mut self.rename_event, &*self.event_fn); - remove_watch_by_event(&path, &self.watches, &mut remove_watches); - self.rename_event = Some( - Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::From))) - .add_some_path(path.clone()) - .set_tracker(event.cookie as usize), - ); - } else { - let mut evs = Vec::new(); - if event.mask.contains(EventMask::MOVED_TO) { - if let Some(e) = self.rename_event.take() { - if e.tracker() == Some(event.cookie as usize) { - (self.event_fn)(Ok(e.clone())); - evs.push( - Event::new(EventKind::Modify(ModifyKind::Name( - RenameMode::To, - ))) - .set_tracker(event.cookie as usize) - .add_some_path(path.clone()), - ); - evs.push( - Event::new(EventKind::Modify(ModifyKind::Name( - RenameMode::Both, - ))) - .set_tracker(event.cookie as usize) - .add_some_path(e.paths.first().cloned()) - .add_some_path(path.clone()), - ); + let path = match event.name { + Some(name) => self.paths.get(&event.wd).map(|root| root.join(&name)), + None => self.paths.get(&event.wd).cloned(), + }; + + if event.mask.contains(EventMask::MOVED_FROM) { + send_pending_rename_event(&mut self.rename_event, &*self.event_fn); + remove_watch_by_event(&path, &self.watches, &mut remove_watches); + self.rename_event = Some( + Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::From))) + .add_some_path(path.clone()) + .set_tracker(event.cookie as usize), + ); + } else { + let mut evs = Vec::new(); + if event.mask.contains(EventMask::MOVED_TO) { + if let Some(e) = self.rename_event.take() { + if e.tracker() == Some(event.cookie as usize) { + (self.event_fn)(Ok(e.clone())); + evs.push( + Event::new(EventKind::Modify(ModifyKind::Name( + RenameMode::To, + ))) + .set_tracker(event.cookie as usize) + .add_some_path(path.clone()), + ); + evs.push( + Event::new(EventKind::Modify(ModifyKind::Name( + RenameMode::Both, + ))) + .set_tracker(event.cookie as usize) + .add_some_path(e.paths.first().cloned()) + .add_some_path(path.clone()), + ); + } else { + // TODO should it be rename? + evs.push( + Event::new(EventKind::Create( + if event.mask.contains(EventMask::ISDIR) { + CreateKind::Folder + } else { + CreateKind::File + }, + )) + .add_some_path(path.clone()), + ); + } } else { // TODO should it be rename? evs.push( @@ -258,8 +274,20 @@ impl EventLoop { .add_some_path(path.clone()), ); } - } else { - // TODO should it be rename? + add_watch_by_event(&path, &event, &self.watches, &mut add_watches); + } + if event.mask.contains(EventMask::MOVE_SELF) { + evs.push( + Event::new(EventKind::Modify(ModifyKind::Name( + RenameMode::From, + ))) + .add_some_path(path.clone()), + ); + // TODO stat the path and get to new path + // - emit To and Both events + // - change prefix for further events + } + if event.mask.contains(EventMask::CREATE) { evs.push( Event::new(EventKind::Create( if event.mask.contains(EventMask::ISDIR) { @@ -270,122 +298,103 @@ impl EventLoop { )) .add_some_path(path.clone()), ); + add_watch_by_event(&path, &event, &self.watches, &mut add_watches); + } + if event.mask.contains(EventMask::DELETE_SELF) + || event.mask.contains(EventMask::DELETE) + { + evs.push( + Event::new(EventKind::Remove( + if event.mask.contains(EventMask::ISDIR) { + RemoveKind::Folder + } else { + RemoveKind::File + }, + )) + .add_some_path(path.clone()), + ); + remove_watch_by_event(&path, &self.watches, &mut remove_watches); + } + if event.mask.contains(EventMask::MODIFY) { + evs.push( + Event::new(EventKind::Modify(ModifyKind::Data( + DataChange::Any, + ))) + .add_some_path(path.clone()), + ); + } + if event.mask.contains(EventMask::CLOSE_WRITE) { + evs.push( + Event::new(EventKind::Access(AccessKind::Close( + AccessMode::Write, + ))) + .add_some_path(path.clone()), + ); + } + if event.mask.contains(EventMask::CLOSE_NOWRITE) { + evs.push( + Event::new(EventKind::Access(AccessKind::Close( + AccessMode::Read, + ))) + .add_some_path(path.clone()), + ); + } + if event.mask.contains(EventMask::ATTRIB) { + evs.push( + Event::new(EventKind::Modify(ModifyKind::Metadata( + MetadataKind::Any, + ))) + .add_some_path(path.clone()), + ); + } + if event.mask.contains(EventMask::OPEN) { + evs.push( + Event::new(EventKind::Access(AccessKind::Open( + AccessMode::Any, + ))) + .add_some_path(path.clone()), + ); } - add_watch_by_event(&path, &event, &self.watches, &mut add_watches); - } - if event.mask.contains(EventMask::MOVE_SELF) { - evs.push( - Event::new(EventKind::Modify(ModifyKind::Name( - RenameMode::From, - ))) - .add_some_path(path.clone()), - ); - // TODO stat the path and get to new path - // - emit To and Both events - // - change prefix for further events - } - if event.mask.contains(EventMask::CREATE) { - evs.push( - Event::new(EventKind::Create( - if event.mask.contains(EventMask::ISDIR) { - CreateKind::Folder - } else { - CreateKind::File - }, - )) - .add_some_path(path.clone()), - ); - add_watch_by_event(&path, &event, &self.watches, &mut add_watches); - } - if event.mask.contains(EventMask::DELETE_SELF) - || event.mask.contains(EventMask::DELETE) - { - evs.push( - Event::new(EventKind::Remove( - if event.mask.contains(EventMask::ISDIR) { - RemoveKind::Folder - } else { - RemoveKind::File - }, - )) - .add_some_path(path.clone()), - ); - remove_watch_by_event(&path, &self.watches, &mut remove_watches); - } - if event.mask.contains(EventMask::MODIFY) { - evs.push( - Event::new(EventKind::Modify(ModifyKind::Data( - DataChange::Any, - ))) - .add_some_path(path.clone()), - ); - } - if event.mask.contains(EventMask::CLOSE_WRITE) { - evs.push( - Event::new(EventKind::Access(AccessKind::Close( - AccessMode::Write, - ))) - .add_some_path(path.clone()), - ); - } - if event.mask.contains(EventMask::CLOSE_NOWRITE) { - evs.push( - Event::new(EventKind::Access(AccessKind::Close( - AccessMode::Read, - ))) - .add_some_path(path.clone()), - ); - } - if event.mask.contains(EventMask::ATTRIB) { - evs.push( - Event::new(EventKind::Modify(ModifyKind::Metadata( - MetadataKind::Any, - ))) - .add_some_path(path.clone()), - ); - } - if event.mask.contains(EventMask::OPEN) { - evs.push( - Event::new(EventKind::Access(AccessKind::Open( - AccessMode::Any, - ))) - .add_some_path(path.clone()), - ); - } - if !evs.is_empty() { - send_pending_rename_event(&mut self.rename_event, &*self.event_fn); - } + if !evs.is_empty() { + send_pending_rename_event(&mut self.rename_event, &*self.event_fn); + } - for ev in evs { - (self.event_fn)(Ok(ev)); + for ev in evs { + (self.event_fn)(Ok(ev)); + } } } - } - // When receiving only the first part of a move event (IN_MOVED_FROM) it is unclear - // whether the second part (IN_MOVED_TO) will arrive because the file or directory - // could just have been moved out of the watched directory. So it's necessary to wait - // for possible subsequent events in case it's a complete move event but also to make sure - // that the first part of the event is handled in a timely manner in case no subsequent events arrive. - // TODO: don't do this here, instead leave it entirely to the debounce - // -> related to some rename events being reported as creates. - - if let Some(ref rename_event) = self.rename_event { - let event_loop_tx = self.event_loop_tx.clone(); - let waker = self.event_loop_waker.clone(); - let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie - thread::spawn(move || { - thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event - event_loop_tx - .send(EventLoopMsg::RenameTimeout(cookie)) - .unwrap(); - waker.wake().unwrap(); - }); + // All events read. Break out. + if num_events == 0 { + break; + } + + // When receiving only the first part of a move event (IN_MOVED_FROM) it is unclear + // whether the second part (IN_MOVED_TO) will arrive because the file or directory + // could just have been moved out of the watched directory. So it's necessary to wait + // for possible subsequent events in case it's a complete move event but also to make sure + // that the first part of the event is handled in a timely manner in case no subsequent events arrive. + // TODO: don't do this here, instead leave it entirely to the debounce + // -> related to some rename events being reported as creates. + + if let Some(ref rename_event) = self.rename_event { + let event_loop_tx = self.event_loop_tx.clone(); + let waker = self.event_loop_waker.clone(); + let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie + thread::spawn(move || { + thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event + event_loop_tx + .send(EventLoopMsg::RenameTimeout(cookie)) + .unwrap(); + waker.wake().unwrap(); + }); + } + } + Err(e) => { + (self.event_fn)(Err(Error::io(e))); } - } - Err(e) => { - (self.event_fn)(Err(Error::io(e))); } } }