Skip to content

Commit

Permalink
fix: refactor wait process
Browse files Browse the repository at this point in the history
  • Loading branch information
LeoBorai committed Dec 17, 2024
1 parent ee00dd7 commit cd9769d
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use anyhow::Result;
const SECONDS_IN_HOUR: u64 = 3600;

/// Options available for waiting on a [`Waitable`].
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct WaitOptions {
/// Timeout in milliseconds for the wait operation.
pub timeout: Duration,
Expand Down
25 changes: 16 additions & 9 deletions src/resource/file.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::path::PathBuf;
use std::sync::mpsc::Sender;
use std::sync::mpsc::{channel, Receiver};
use std::{path::PathBuf, sync::mpsc::Sender};

use anyhow::Result;
use notify::{Event, EventHandler, Watcher};

use crate::{WaitOptions, Waitable};

#[derive(Clone)]
pub struct FileWaiter {
pub path: PathBuf,
}
Expand All @@ -19,15 +21,20 @@ impl FileWaiter {
impl Waitable for FileWaiter {
async fn wait(&self, _: &WaitOptions) -> Result<()> {
let (file_exists_handler, rx) = FileExistsHandler::new();
let mut watcher = notify::recommended_watcher(file_exists_handler).unwrap();
let parent = self.path.parent().unwrap();
let mut watcher = notify::recommended_watcher(file_exists_handler)?;

watcher
.watch(parent, notify::RecursiveMode::NonRecursive)
.unwrap();
if let Some(parent) = self.path.parent() {
watcher.watch(parent, notify::RecursiveMode::NonRecursive)?;

if rx.recv().is_ok() {
watcher.unwatch(parent).unwrap();
if rx.recv().is_ok() {
watcher.unwatch(parent)?;
}
} else {
watcher.watch(&self.path, notify::RecursiveMode::NonRecursive)?;

if rx.recv().is_ok() {
watcher.unwatch(&self.path)?;
}
}

Ok(())
Expand All @@ -50,7 +57,7 @@ impl EventHandler for FileExistsHandler {
fn handle_event(&mut self, event: notify::Result<Event>) {
if let Ok(event) = event {
if let notify::EventKind::Create(_) = event.kind {
self.tx.send(()).unwrap();
self.tx.send(()).expect("Channel dropped.");
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/resource/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::time::sleep;

use crate::{WaitOptions, Waitable};

#[derive(Clone)]
pub struct HttpWaiter {
pub method: Method,
pub url: Url,
Expand Down
1 change: 1 addition & 0 deletions src/resource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use self::file::FileWaiter;
use self::http::HttpWaiter;
use self::tcp::TcpWaiter;

#[derive(Clone)]
pub enum Resource {
File(FileWaiter),
Http(HttpWaiter),
Expand Down
1 change: 1 addition & 0 deletions src/resource/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::time::sleep;
use crate::{WaitOptions, Waitable};

/// Listens on a specific IP Address and Port using TCP protocol
#[derive(Clone)]
pub struct TcpWaiter {
pub addr: IpAddr,
pub port: u16,
Expand Down
11 changes: 10 additions & 1 deletion src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,20 @@ impl WaitOnTask {

pub async fn run(self) -> Result<()> {
select! {
_ = self.resource.wait(&self.options) => Ok(()),
_ = self.watch() => Ok(()),
_ = self.deadline() => bail!("Timeout reached"),
}
}

async fn watch(&self) -> Result<()> {
let resource = self.resource.clone();
let options = self.options.clone();

tokio::spawn(async move { resource.wait(&options).await }).await??;

Ok(())
}

async fn deadline(&self) -> Result<()> {
sleep(self.options.timeout).await;
bail!("Timeout reached");
Expand Down

0 comments on commit cd9769d

Please sign in to comment.