Skip to content

Commit

Permalink
send errors in debouncer, add example, name thread
Browse files Browse the repository at this point in the history
  • Loading branch information
0xpr03 committed Aug 6, 2022
1 parent 4b8cc05 commit 8158395
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 18 deletions.
26 changes: 26 additions & 0 deletions examples/debounced.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::{path::Path, time::Duration};

use notify::{new_debouncer, RecursiveMode, Watcher};

fn main() {
std::thread::spawn(|| {
let path = Path::new("test.txt");
let _ = std::fs::remove_file(&path);
loop {
std::fs::write(&path, b"Lorem ipsum").unwrap();
std::thread::sleep(Duration::from_millis(250));
}
});

let (rx, mut watcher) = new_debouncer(Duration::from_secs(2)).unwrap();

watcher
.watch(Path::new("."), RecursiveMode::Recursive)
.unwrap();

for events in rx {
for e in events {
println!("{:?}", e);
}
}
}
65 changes: 47 additions & 18 deletions src/debouncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ impl EventData {
}
}

type DebounceChannelType = Result<Vec<DebouncedEvent>,Vec<Error>>;

/// A debounced event kind.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down Expand Up @@ -66,6 +68,7 @@ type DebounceData = Arc<Mutex<DebounceDataInner>>;
struct DebounceDataInner {
d: HashMap<PathBuf, EventData>,
timeout: Duration,
e: Vec<crate::Error>,
}

impl DebounceDataInner {
Expand All @@ -88,11 +91,24 @@ impl DebounceDataInner {
events_expired
}

/// Returns all currently stored errors
pub fn errors(&mut self) -> Vec<Error> {
let mut v = Vec::new();
std::mem::swap(&mut v, &mut self.e);
v
}

/// Add an error entry to re-send later on
pub fn add_error(&mut self, e: crate::Error) {
self.e.push(e);
}

/// Add new event to debouncer cache
pub fn add_event(&mut self, e: Event) {
for path in e.paths.into_iter() {
if let Some(v) = self.d.get_mut(&path) {
v.update = Instant::now();
println!("Exists");
} else {
self.d.insert(path, EventData::new_any());
}
Expand All @@ -103,7 +119,7 @@ impl DebounceDataInner {
/// Creates a new debounced watcher
pub fn new_debouncer(
timeout: Duration,
) -> Result<(Receiver<Vec<DebouncedEvent>>, RecommendedWatcher), Error> {
) -> Result<(Receiver<DebounceChannelType>, RecommendedWatcher), Error> {
let data = DebounceData::default();

let (tx, rx) = mpsc::channel();
Expand All @@ -118,27 +134,40 @@ pub fn new_debouncer(
timeout, tick_div
)))
})?;
std::thread::spawn(move || {
loop {
std::thread::sleep(tick);
let send_data;
{
let mut lock = data_c.lock().expect("Can't lock debouncer data!");
send_data = lock.debounced_events();
}
if send_data.len() > 0 {
// TODO: how do we efficiently detect an rx drop without sending data ?
if tx.send(send_data).is_err() {
break;
std::thread::Builder::new()
.name("notify-rs debouncer loop".to_string())
.spawn(move || {
loop {
std::thread::sleep(tick);
let send_data;
let errors: Vec<crate::Error>;
{
let mut lock = data_c.lock().expect("Can't lock debouncer data!");
send_data = lock.debounced_events();
errors = lock.errors();
}
if send_data.len() > 0 {
// channel shut down
if tx.send(Ok(send_data)).is_err() {
break;
}
}
if errors.len() > 0 {
// channel shut down
if tx.send(Err(errors)).is_err() {
break;
}
}
}
}
});
})?;

let watcher = RecommendedWatcher::new(move |e: Result<Event, Error>| {
if let Ok(e) = e {
let mut lock = data.lock().expect("Can't lock debouncer data!");
lock.add_event(e);
let mut lock = data.lock().expect("Can't lock debouncer data!");

match e {
Ok(e) => lock.add_event(e),
// can't have multiple TX, so we need to pipe that through our debouncer
Err(e) => lock.add_error(e),
}
})?;

Expand Down

0 comments on commit 8158395

Please sign in to comment.