From cd9769d438ed1917336ce13088b9502cea479b8e Mon Sep 17 00:00:00 2001 From: Esteban Borai Date: Mon, 16 Dec 2024 21:45:20 -0300 Subject: [PATCH] fix: refactor wait process --- src/lib.rs | 2 +- src/resource/file.rs | 25 ++++++++++++++++--------- src/resource/http.rs | 1 + src/resource/mod.rs | 1 + src/resource/tcp.rs | 1 + src/task.rs | 11 ++++++++++- 6 files changed, 30 insertions(+), 11 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 00ac2ef..1a8d900 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ use anyhow::Result; const SECONDS_IN_HOUR: u64 = 3600; /// Options available for waiting on a [`Waitable`]. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct WaitOptions { /// Timeout in milliseconds for the wait operation. pub timeout: Duration, diff --git a/src/resource/file.rs b/src/resource/file.rs index a57d7a9..eb4e4ba 100644 --- a/src/resource/file.rs +++ b/src/resource/file.rs @@ -1,11 +1,13 @@ +use std::path::PathBuf; +use std::sync::mpsc::Sender; use std::sync::mpsc::{channel, Receiver}; -use std::{path::PathBuf, sync::mpsc::Sender}; use anyhow::Result; use notify::{Event, EventHandler, Watcher}; use crate::{WaitOptions, Waitable}; +#[derive(Clone)] pub struct FileWaiter { pub path: PathBuf, } @@ -19,15 +21,20 @@ impl FileWaiter { impl Waitable for FileWaiter { async fn wait(&self, _: &WaitOptions) -> Result<()> { let (file_exists_handler, rx) = FileExistsHandler::new(); - let mut watcher = notify::recommended_watcher(file_exists_handler).unwrap(); - let parent = self.path.parent().unwrap(); + let mut watcher = notify::recommended_watcher(file_exists_handler)?; - watcher - .watch(parent, notify::RecursiveMode::NonRecursive) - .unwrap(); + if let Some(parent) = self.path.parent() { + watcher.watch(parent, notify::RecursiveMode::NonRecursive)?; - if rx.recv().is_ok() { - watcher.unwatch(parent).unwrap(); + if rx.recv().is_ok() { + watcher.unwatch(parent)?; + } + } else { + watcher.watch(&self.path, notify::RecursiveMode::NonRecursive)?; + + if rx.recv().is_ok() { + watcher.unwatch(&self.path)?; + } } Ok(()) @@ -50,7 +57,7 @@ impl EventHandler for FileExistsHandler { fn handle_event(&mut self, event: notify::Result) { if let Ok(event) = event { if let notify::EventKind::Create(_) = event.kind { - self.tx.send(()).unwrap(); + self.tx.send(()).expect("Channel dropped."); } } } diff --git a/src/resource/http.rs b/src/resource/http.rs index 43d3863..a1f5a44 100644 --- a/src/resource/http.rs +++ b/src/resource/http.rs @@ -6,6 +6,7 @@ use tokio::time::sleep; use crate::{WaitOptions, Waitable}; +#[derive(Clone)] pub struct HttpWaiter { pub method: Method, pub url: Url, diff --git a/src/resource/mod.rs b/src/resource/mod.rs index 27da151..28297d5 100644 --- a/src/resource/mod.rs +++ b/src/resource/mod.rs @@ -13,6 +13,7 @@ use self::file::FileWaiter; use self::http::HttpWaiter; use self::tcp::TcpWaiter; +#[derive(Clone)] pub enum Resource { File(FileWaiter), Http(HttpWaiter), diff --git a/src/resource/tcp.rs b/src/resource/tcp.rs index 7e3e22e..1379a44 100644 --- a/src/resource/tcp.rs +++ b/src/resource/tcp.rs @@ -8,6 +8,7 @@ use tokio::time::sleep; use crate::{WaitOptions, Waitable}; /// Listens on a specific IP Address and Port using TCP protocol +#[derive(Clone)] pub struct TcpWaiter { pub addr: IpAddr, pub port: u16, diff --git a/src/task.rs b/src/task.rs index 0a7439b..57087a6 100644 --- a/src/task.rs +++ b/src/task.rs @@ -17,11 +17,20 @@ impl WaitOnTask { pub async fn run(self) -> Result<()> { select! { - _ = self.resource.wait(&self.options) => Ok(()), + _ = self.watch() => Ok(()), _ = self.deadline() => bail!("Timeout reached"), } } + async fn watch(&self) -> Result<()> { + let resource = self.resource.clone(); + let options = self.options.clone(); + + tokio::spawn(async move { resource.wait(&options).await }).await??; + + Ok(()) + } + async fn deadline(&self) -> Result<()> { sleep(self.options.timeout).await; bail!("Timeout reached");