Skip to content

Commit

Permalink
allow more than channels and make crossbeam optional
Browse files Browse the repository at this point in the history
  • Loading branch information
0xpr03 committed Aug 6, 2022
1 parent c3acb63 commit be6ef8a
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 38 deletions.
7 changes: 5 additions & 2 deletions examples/debounced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ fn main() {
}
});

let (rx, mut watcher) = new_debouncer(Duration::from_secs(2), None).unwrap();
let (tx, rx) = std::sync::mpsc::channel();

watcher
let mut debouncer = new_debouncer(Duration::from_secs(2), None, tx).unwrap();

debouncer
.watcher()
.watch(Path::new("."), RecursiveMode::Recursive)
.unwrap();

Expand Down
31 changes: 31 additions & 0 deletions examples/debounced_full_custom.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::{path::Path, time::Duration};

use notify::{RecursiveMode, Watcher};
use notify_debouncer_mini::new_debouncer;

/// Debouncer with custom backend and waiting for exit
fn main() {
std::thread::spawn(|| {
let path = Path::new("test.txt");
let _ = std::fs::remove_file(&path);
loop {
std::fs::write(&path, b"Lorem ipsum").unwrap();
std::thread::sleep(Duration::from_millis(250));
}
});

let (tx, rx) = std::sync::mpsc::channel();

let mut debouncer = new_debouncer_opt::<_,notify::PollWatcher>(Duration::from_secs(2), None, tx).unwrap();

debouncer
.watcher()
.watch(Path::new("."), RecursiveMode::Recursive)
.unwrap();

for events in rx {
for e in events {
println!("{:?}", e);
}
}
}
15 changes: 14 additions & 1 deletion notify-debouncer-mini/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,26 @@
name = "notify-debouncer-mini"
version = "0.1.0"
edition = "2021"
rust-version = "1.56"
description = "notify mini debouncer for events"
documentation = "https://docs.rs/notify_debouncer_mini"
homepage = "https://github.com/notify-rs/notify"
repository = "https://github.com/notify-rs/notify.git"
authors = ["Aron Heinecke <[email protected]>"]
keywords = ["events", "filesystem", "notify", "watch"]
license = "CC0-1.0 OR Artistic-2.0"
readme = "README.md"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
name = "notify_debouncer_mini"
path = "src/lib.rs"

[features]
default = ["crossbeam-channel"]

[dependencies]
notify = "5.0.0-pre.15"
notify = "5.0.0-pre.15"
crossbeam-channel = { version = "0.5", optional = true }
serde = { version = "1.0.89", features = ["derive"], optional = true }
10 changes: 10 additions & 0 deletions notify-debouncer-mini/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Notify debouncer

Tiny debouncer for notify. Filters incoming events and emits only one event per timeframe per file.

## Features

- `crossbeam` enabled by default, for crossbeam channel support.
This may create problems used in tokio environments. See [#380](https://github.com/notify-rs/notify/issues/380).
Use someting like `notify-debouncer-mini = { version = "*", default-features = false }` to disable it.
- `serde` for serde support of event types, off by default
169 changes: 134 additions & 35 deletions notify-debouncer-mini/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,59 @@ use std::{
collections::HashMap,
path::PathBuf,
sync::{
mpsc::{self, Receiver},
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
};

use notify::{Error, ErrorKind, Event, RecommendedWatcher, Watcher};

/// The set of requirements for watcher debounce event handling functions.
///
/// # Example implementation
///
/// ```no_run
/// use notify::{Event, Result, EventHandler};
///
/// /// Prints received events
/// struct EventPrinter;
///
/// impl EventHandler for EventPrinter {
/// fn handle_event(&mut self, event: Result<Event>) {
/// if let Ok(event) = event {
/// println!("Event: {:?}", event);
/// }
/// }
/// }
/// ```
pub trait DebounceEventHandler: Send + 'static {
/// Handles an event.
fn handle_event(&mut self, event: DebouncedEvents);
}

impl<F> DebounceEventHandler for F
where
F: FnMut(DebouncedEvents) + Send + 'static,
{
fn handle_event(&mut self, event: DebouncedEvents) {
(self)(event);
}
}

#[cfg(feature = "crossbeam")]
impl DebounceEventHandler for crossbeam_channel::Sender<DebouncedEvents> {
fn handle_event(&mut self, event: DebouncedEvents) {
let _ = self.send(event);
}
}

impl DebounceEventHandler for std::sync::mpsc::Sender<DebouncedEvents> {
fn handle_event(&mut self, event: DebouncedEvents) {
let _ = self.send(event);
}
}

/// Deduplicate event data entry
struct EventData {
/// Insertion Time
Expand All @@ -31,7 +76,7 @@ impl EventData {
}
}

type DebounceChannelType = Result<Vec<DebouncedEvent>, Vec<Error>>;
type DebouncedEvents = Result<Vec<DebouncedEvent>, Vec<Error>>;

/// A debounced event kind.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
Expand Down Expand Up @@ -118,17 +163,59 @@ impl DebounceDataInner {
}
}

/// Creates a new debounced watcher.
///
/// Debouncer guard, stops the debouncer on drop
pub struct Debouncer<T: Watcher> {
stop: Arc<AtomicBool>,
watcher: T,
debouncer_thread: Option<std::thread::JoinHandle<()>>,
}

impl<T: Watcher> Debouncer<T> {
/// Stop the debouncer, waits for the event thread to finish.
/// May block for the duration of one tick_rate.
pub fn stop(mut self) {
self.set_stop();
if let Some(t) = self.debouncer_thread.take() {
let _ = t.join();
}
}

/// Stop the debouncer, does not wait for the event thread to finish.
pub fn stop_nonblocking(self) {
self.set_stop();
}

fn set_stop(&self) {
self.stop.store(true, Ordering::Relaxed);
}

/// Access to the internally used notify Watcher backend
pub fn watcher(&mut self) -> &mut dyn Watcher {
&mut self.watcher
}
}

impl<T: Watcher> Drop for Debouncer<T> {
fn drop(&mut self) {
// don't imitate c++ async futures and block on drop
self.set_stop();
}
}

/// Creates a new debounced watcher with custom configuration.
///
/// Timeout is the amount of time after which a debounced event is emitted or a Continuous event is send, if there still are events incoming for the specific path.
///
///
/// If tick_rate is None, notify will select a tick rate that is less than the provided timeout.
pub fn new_debouncer(
pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher>(
timeout: Duration,
tick_rate: Option<Duration>,
) -> Result<(Receiver<DebounceChannelType>, RecommendedWatcher), Error> {
mut event_handler: F,
) -> Result<Debouncer<T>, Error> {
let data = DebounceData::default();

let stop = Arc::new(AtomicBool::new(false));

let tick_div = 4;
let tick = match tick_rate {
Some(v) => {
Expand All @@ -153,38 +240,31 @@ pub fn new_debouncer(
data_w.timeout = timeout;
}

let (tx, rx) = mpsc::channel();

let data_c = data.clone();

std::thread::Builder::new()
let stop_c = stop.clone();
let thread = std::thread::Builder::new()
.name("notify-rs debouncer loop".to_string())
.spawn(move || {
loop {
std::thread::sleep(tick);
let send_data;
let errors: Vec<crate::Error>;
{
let mut lock = data_c.lock().expect("Can't lock debouncer data!");
send_data = lock.debounced_events();
errors = lock.errors();
}
if send_data.len() > 0 {
// channel shut down
if tx.send(Ok(send_data)).is_err() {
break;
}
}
if errors.len() > 0 {
// channel shut down
if tx.send(Err(errors)).is_err() {
break;
}
}
.spawn(move || loop {
if stop_c.load(Ordering::Acquire) {
break;
}
std::thread::sleep(tick);
let send_data;
let errors: Vec<crate::Error>;
{
let mut lock = data_c.lock().expect("Can't lock debouncer data!");
send_data = lock.debounced_events();
errors = lock.errors();
}
if send_data.len() > 0 {
event_handler.handle_event(Ok(send_data));
}
if errors.len() > 0 {
event_handler.handle_event(Err(errors));
}
})?;

let watcher = RecommendedWatcher::new(move |e: Result<Event, Error>| {
let watcher = T::new(move |e: Result<Event, Error>| {
let mut lock = data.lock().expect("Can't lock debouncer data!");

match e {
Expand All @@ -194,5 +274,24 @@ pub fn new_debouncer(
}
})?;

Ok((rx, watcher))
let guard = Debouncer {
watcher,
debouncer_thread: Some(thread),
stop,
};

Ok(guard)
}

/// Short function to create a new debounced watcher with the recommended debouncer.
///
/// Timeout is the amount of time after which a debounced event is emitted or a Continuous event is send, if there still are events incoming for the specific path.
///
/// If tick_rate is None, notify will select a tick rate that is less than the provided timeout.
pub fn new_debouncer<F: DebounceEventHandler>(
timeout: Duration,
tick_rate: Option<Duration>,
event_handler: F,
) -> Result<Debouncer<RecommendedWatcher>, Error> {
new_debouncer_opt::<F, RecommendedWatcher>(timeout, tick_rate, event_handler)
}

0 comments on commit be6ef8a

Please sign in to comment.