From 81583950fdf135608a2d7475cc3c06c35a523b1d Mon Sep 17 00:00:00 2001 From: Aron Heinecke Date: Fri, 17 Jun 2022 22:00:24 +0200 Subject: [PATCH] send errors in debouncer, add example, name thread --- examples/debounced.rs | 26 +++++++++++++++++ src/debouncer.rs | 65 +++++++++++++++++++++++++++++++------------ 2 files changed, 73 insertions(+), 18 deletions(-) create mode 100644 examples/debounced.rs diff --git a/examples/debounced.rs b/examples/debounced.rs new file mode 100644 index 00000000..249f354c --- /dev/null +++ b/examples/debounced.rs @@ -0,0 +1,26 @@ +use std::{path::Path, time::Duration}; + +use notify::{new_debouncer, RecursiveMode, Watcher}; + +fn main() { + std::thread::spawn(|| { + let path = Path::new("test.txt"); + let _ = std::fs::remove_file(&path); + loop { + std::fs::write(&path, b"Lorem ipsum").unwrap(); + std::thread::sleep(Duration::from_millis(250)); + } + }); + + let (rx, mut watcher) = new_debouncer(Duration::from_secs(2)).unwrap(); + + watcher + .watch(Path::new("."), RecursiveMode::Recursive) + .unwrap(); + + for events in rx { + for e in events { + println!("{:?}", e); + } + } +} diff --git a/src/debouncer.rs b/src/debouncer.rs index 90f55ec3..5daf725e 100644 --- a/src/debouncer.rs +++ b/src/debouncer.rs @@ -31,6 +31,8 @@ impl EventData { } } +type DebounceChannelType = Result,Vec>; + /// A debounced event kind. #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] @@ -66,6 +68,7 @@ type DebounceData = Arc>; struct DebounceDataInner { d: HashMap, timeout: Duration, + e: Vec, } impl DebounceDataInner { @@ -88,11 +91,24 @@ impl DebounceDataInner { events_expired } + /// Returns all currently stored errors + pub fn errors(&mut self) -> Vec { + let mut v = Vec::new(); + std::mem::swap(&mut v, &mut self.e); + v + } + + /// Add an error entry to re-send later on + pub fn add_error(&mut self, e: crate::Error) { + self.e.push(e); + } + /// Add new event to debouncer cache pub fn add_event(&mut self, e: Event) { for path in e.paths.into_iter() { if let Some(v) = self.d.get_mut(&path) { v.update = Instant::now(); + println!("Exists"); } else { self.d.insert(path, EventData::new_any()); } @@ -103,7 +119,7 @@ impl DebounceDataInner { /// Creates a new debounced watcher pub fn new_debouncer( timeout: Duration, -) -> Result<(Receiver>, RecommendedWatcher), Error> { +) -> Result<(Receiver, RecommendedWatcher), Error> { let data = DebounceData::default(); let (tx, rx) = mpsc::channel(); @@ -118,27 +134,40 @@ pub fn new_debouncer( timeout, tick_div ))) })?; - std::thread::spawn(move || { - loop { - std::thread::sleep(tick); - let send_data; - { - let mut lock = data_c.lock().expect("Can't lock debouncer data!"); - send_data = lock.debounced_events(); - } - if send_data.len() > 0 { - // TODO: how do we efficiently detect an rx drop without sending data ? - if tx.send(send_data).is_err() { - break; + std::thread::Builder::new() + .name("notify-rs debouncer loop".to_string()) + .spawn(move || { + loop { + std::thread::sleep(tick); + let send_data; + let errors: Vec; + { + let mut lock = data_c.lock().expect("Can't lock debouncer data!"); + send_data = lock.debounced_events(); + errors = lock.errors(); + } + if send_data.len() > 0 { + // channel shut down + if tx.send(Ok(send_data)).is_err() { + break; + } + } + if errors.len() > 0 { + // channel shut down + if tx.send(Err(errors)).is_err() { + break; + } } } - } - }); + })?; let watcher = RecommendedWatcher::new(move |e: Result| { - if let Ok(e) = e { - let mut lock = data.lock().expect("Can't lock debouncer data!"); - lock.add_event(e); + let mut lock = data.lock().expect("Can't lock debouncer data!"); + + match e { + Ok(e) => lock.add_event(e), + // can't have multiple TX, so we need to pipe that through our debouncer + Err(e) => lock.add_error(e), } })?;