From 06613d9ef1d751e2018183bdd86ca2e0aa56b21b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 13 Sep 2024 12:34:51 +0100 Subject: [PATCH 1/4] watching basic version works --- Cargo.lock | 83 ++++++++++++++- crates/dht/src/dht.rs | 10 +- crates/librqbit/Cargo.toml | 2 + crates/librqbit/src/lib.rs | 2 + crates/librqbit/src/tests/e2e.rs | 2 +- crates/librqbit/src/watch.rs | 173 +++++++++++++++++++++++++++++++ crates/rqbit/Cargo.toml | 1 + crates/rqbit/src/main.rs | 21 +++- 8 files changed, 284 insertions(+), 10 deletions(-) create mode 100644 crates/librqbit/src/watch.rs diff --git a/Cargo.lock b/Cargo.lock index 366bb647c..c4f96994f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1378,6 +1378,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "funty" version = "2.0.0" @@ -2221,6 +2230,26 @@ dependencies = [ "cfb", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.13" @@ -2356,6 +2385,26 @@ dependencies = [ "serde_json", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "kuchikiki" version = "0.8.2" @@ -2462,6 +2511,7 @@ dependencies = [ "lru", "memmap2", "mime_guess", + "notify", "parking_lot", "rand 0.8.5", "regex", @@ -2842,6 +2892,18 @@ dependencies = [ "adler2", ] +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.0.2" @@ -2939,6 +3001,25 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.6.0", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -5204,7 +5285,7 @@ dependencies = [ "backtrace", "bytes", "libc", - "mio", + "mio 1.0.2", "parking_lot", "pin-project-lite", "signal-hook-registry", diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 2c1a6abc4..2ebfdaf28 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -265,7 +265,7 @@ impl RecursiveRequest { let request_one = |id, addr, depth| { req.request_one(id, addr, depth) .map_err(|e| { - debug!("error: {e:?}"); + debug!("error: {e:#}"); e }) .instrument(error_span!( @@ -341,7 +341,7 @@ impl RecursiveRequest { Ok(n) if n < 8 => REQUERY_INTERVAL / 8 * (n as u32), Ok(_) => REQUERY_INTERVAL, Err(e) => { - error!("error in get_peers_root(): {e:?}"); + error!("error in get_peers_root(): {e:#}"); return Err::<(), anyhow::Error>(e); } }; @@ -359,7 +359,7 @@ impl RecursiveRequest { let (id, addr, depth) = addr.unwrap(); futs.push( this.request_one(id, addr, depth) - .map_err(|e| debug!("error: {e:?}")) + .map_err(|e| debug!("error: {e:#}")) .instrument(error_span!("addr", addr=addr.to_string())) ); } @@ -996,7 +996,7 @@ impl DhtWorker { }, Err(e) => { self.dht.routing_table.write().mark_error(&id); - debug!("error: {e:?}"); + debug!("error: {e:#}"); } } }.instrument(error_span!("ping", addr=addr.to_string()))) @@ -1033,7 +1033,7 @@ impl DhtWorker { ) .unwrap(); if let Err(e) = socket.send_to(&buf, addr).await { - debug!("error sending to {addr}: {e:?}"); + debug!("error sending to {addr}: {e:#}"); if let Some(tid) = our_tid { self.on_send_error(tid, addr, e.into()); } diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index fc2874ce9..d9870b57a 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -36,6 +36,7 @@ storage_examples = [] tracing-subscriber-utils = ["tracing-subscriber"] postgres = ["sqlx"] async-bt = ["async-backtrace"] +watch = ["notify"] [dependencies] sqlx = { version = "0.8.2", features = [ @@ -105,6 +106,7 @@ mime_guess = { version = "2.0.5", default-features = false } tokio-socks = "0.5.2" async-trait = "0.1.81" async-backtrace = { version = "0.2", optional = true } +notify = { version = "6.1.1", optional = true } [build-dependencies] anyhow = "1" diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index f8543c9a4..455381cbf 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -67,6 +67,8 @@ pub mod tracing_subscriber_config_utils; mod type_aliases; #[cfg(all(feature = "http-api", feature = "upnp-serve-adapter"))] pub mod upnp_server_adapter; +#[cfg(feature = "watch")] +pub mod watch; pub use api::Api; pub use api_error::ApiError; diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 67b6fcb5c..3b8eda020 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -147,7 +147,7 @@ async fn _test_e2e_download(drop_checks: &DropChecks) { } Ok(true) } - crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"), + crate::ManagedTorrentState::Error(e) => bail!("error: {e:#}"), _ => bail!("broken state"), }) .context("error checking for torrent liveness")?; diff --git a/crates/librqbit/src/watch.rs b/crates/librqbit/src/watch.rs new file mode 100644 index 000000000..b435928c5 --- /dev/null +++ b/crates/librqbit/src/watch.rs @@ -0,0 +1,173 @@ +use std::{ + io::Read, + path::{Path, PathBuf}, + sync::{Arc, Weak}, +}; + +use anyhow::{bail, Context}; +use buffers::ByteBuf; +use librqbit_core::torrent_metainfo::torrent_from_bytes; +use notify::Watcher; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tracing::{debug, error, error_span, trace, warn}; + +use crate::{AddTorrent, AddTorrentOptions, AddTorrentResponse, Session}; + +struct ThreadCancelEvent { + mutex: parking_lot::Mutex, + condvar: parking_lot::Condvar, +} + +impl ThreadCancelEvent { + fn new() -> Arc { + Arc::new(Self { + mutex: parking_lot::Mutex::new(false), + condvar: parking_lot::Condvar::new(), + }) + } + + fn cancel(&self) { + let mut g = self.mutex.lock(); + *g = true; + self.condvar.notify_all(); + } + + fn wait_until_cancelled(&self) { + let mut g = self.mutex.lock(); + while !*g { + self.condvar.wait(&mut g); + } + } +} + +async fn watch_adder(session_w: Weak, mut rx: UnboundedReceiver>) { + async fn add_one( + session_w: &Weak, + add_torrent: AddTorrent<'static>, + ) -> anyhow::Result<()> { + let session = match session_w.upgrade() { + Some(s) => s, + None => return Ok(()), + }; + let res = session + .add_torrent( + add_torrent, + Some(AddTorrentOptions { + overwrite: true, + ..Default::default() + }), + ) + .await?; + match res { + AddTorrentResponse::Added(_, _) => {} + AddTorrentResponse::AlreadyManaged(_, _) => { + debug!("already managed"); + } + AddTorrentResponse::ListOnly(..) => bail!("bug: unexpected list only"), + } + Ok(()) + } + + while let Some(add_torrent) = rx.recv().await { + if let Err(e) = add_one(&session_w, add_torrent).await { + warn!("error adding torrent: {e:#}"); + } + } +} + +fn watch_thread( + folder: PathBuf, + tx: UnboundedSender>, + cancel_event: &ThreadCancelEvent, +) -> anyhow::Result<()> { + fn read_and_validate_torrent(path: &Path) -> anyhow::Result> { + let mut buf = Vec::new(); + std::fs::File::open(path) + .context("error opening")? + .read_to_end(&mut buf) + .context("error reading")?; + torrent_from_bytes::(&buf).context("invalid .torrent file")?; + Ok(AddTorrent::from_bytes(buf)) + } + + fn watch_cb( + ev: notify::Result, + tx: &UnboundedSender>, + ) -> anyhow::Result<()> { + trace!(event=?ev, "watch event"); + let ev = ev.context("error event")?; + match ev.kind { + notify::EventKind::Create(_) | notify::EventKind::Modify(_) => {} + other => { + debug!(kind=?other, paths=?ev.paths, "ignoring event"); + return Ok(()); + } + } + for path in ev.paths { + if path.extension().and_then(|e| e.to_str()) != Some("torrent") { + trace!(?path, "ignoring path"); + continue; + } + let add = match read_and_validate_torrent(&path) { + Ok(add) => add, + Err(e) => { + debug!(?path, "error validating torrent: {e:#}"); + continue; + } + }; + + if tx.send(add).is_err() { + return Ok(()); + } + } + Ok(()) + } + + let mut watcher = notify::recommended_watcher(move |ev| { + if let Err(e) = watch_cb(ev, &tx) { + warn!("error processing watch event: {e:#}"); + } + }) + .context("error creating watcher")?; + watcher + .watch(&folder, notify::RecursiveMode::Recursive) + .context("error watching")?; + cancel_event.wait_until_cancelled(); + debug!(?folder, "watcher thread done"); + Ok(()) +} + +impl Session { + pub fn watch_folder(self: &Arc, watch_folder: &Path) { + let session_w = Arc::downgrade(self); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + self.spawn(error_span!("watch_adder", ?watch_folder), async move { + watch_adder(session_w, rx).await; + Ok(()) + }); + + let cancel_event = ThreadCancelEvent::new(); + let cancel_event_2 = cancel_event.clone(); + let cancel_token = self.cancellation_token().clone(); + crate::spawn_utils::spawn( + "watch_cancel", + error_span!("watch_cancel", ?watch_folder), + async move { + cancel_token.cancelled().await; + trace!("canceling watcher"); + cancel_event.cancel(); + Ok(()) + }, + ); + + let watch_folder = PathBuf::from(watch_folder); + let session_span = self.rs(); + std::thread::spawn(move || { + let span = error_span!(parent: session_span, "watcher", folder=?watch_folder); + let _ = span.enter(); + if let Err(e) = watch_thread(watch_folder, tx, &cancel_event_2) { + error!("error in watcher thread: {e:#}"); + } + }); + } +} diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index 8865888b3..1a0c0469d 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -27,6 +27,7 @@ librqbit = { version = "7.1.0-beta.0", path = "../librqbit", default-features = "http-api", "tracing-subscriber-utils", "upnp-serve-adapter", + "watch", ] } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } console-subscriber = { version = "0.4", optional = true } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 4e37de20a..561d52d37 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -1,4 +1,11 @@ -use std::{io, net::SocketAddr, path::PathBuf, sync::Arc, thread, time::Duration}; +use std::{ + io, + net::SocketAddr, + path::{Path, PathBuf}, + sync::Arc, + thread, + time::Duration, +}; use anyhow::{bail, Context}; use clap::{CommandFactory, Parser, ValueEnum}; @@ -236,6 +243,11 @@ struct ServerStartOptions { /// [Experimental] if set, will try to resume quickly after restart and skip checksumming. #[arg(long = "fastresume", env = "RQBIT_FASTRESUME")] fastresume: bool, + + /// The folder to watch for added .torrent files. All files in this folder will be automatically added + /// to the session. + #[arg(long = "watch-folder", env = "RQBIT_WATCH_FOLDER")] + watch_folder: Option, } #[derive(Parser)] @@ -580,7 +592,7 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> }; let api = Api::new( - session, + session.clone(), Some(log_config.rust_log_reload_tx), Some(log_config.line_broadcast), ); @@ -595,6 +607,10 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> let upnp_router = upnp_server.as_mut().and_then(|s| s.take_router().ok()); let http_api_fut = http_api.make_http_api_and_run(tcp_listener, upnp_router); + if let Some(watch_folder) = start_opts.watch_folder.as_ref() { + session.watch_folder(Path::new(watch_folder)); + } + let res = match upnp_server { Some(srv) => { let upnp_fut = srv.run_ssdp_forever(); @@ -609,7 +625,6 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> r = http_api_fut => r, }, }; - res.context("error running server") } }, From a73f921c5f0446790807fd4f31febf70f3ef6fa3 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 13 Sep 2024 12:36:32 +0100 Subject: [PATCH 2/4] one line anyhow error formatting --- crates/librqbit/src/api.rs | 4 ++-- crates/librqbit/src/session.rs | 10 +++++----- crates/librqbit/src/torrent_state/streaming.rs | 2 +- crates/peer_binary_protocol/src/lib.rs | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 2fc78e4c5..4dfc0d509 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -72,7 +72,7 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash { macro_rules! visit_int { ($v:expr) => {{ - let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:?}")))?; + let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:#}")))?; Ok(TorrentIdOrHash::from(tid)) }}; } @@ -118,7 +118,7 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash { { TorrentIdOrHash::parse(v).map_err(|e| { E::custom(format!( - "expected integer or 40 byte info hash, couldn't parse string: {e:?}" + "expected integer or 40 byte info hash, couldn't parse string: {e:#}" )) }) } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 0ddf83e0a..3b8c59a4f 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -673,7 +673,7 @@ impl Session { tokio::select! { Some(res) = futs.next(), if !futs.is_empty() => { if let Err(e) = res { - error!("error adding torrent to session: {e:?}"); + error!("error adding torrent to session: {e:#}"); } } st = ps.next(), if !added_all => { @@ -1248,7 +1248,7 @@ impl Session { .with_context(|| format!("torrent with id {} did not exist", id))?; if let Err(e) = removed.pause() { - debug!("error pausing torrent before deletion: {e:?}") + debug!("error pausing torrent before deletion: {e:#}") } let storage = removed @@ -1259,7 +1259,7 @@ impl Session { .pause() // inspect_err not available in 1.75 .map_err(|e| { - warn!("error pausing torrent: {e:?}"); + warn!("error pausing torrent: {e:#}"); e }) .ok() @@ -1285,7 +1285,7 @@ impl Session { if removed.shared().options.output_folder != self.output_folder { if let Err(e) = storage.remove_directory_if_empty(Path::new("")) { warn!( - "error removing {:?}: {e:?}", + "error removing {:?}: {e:#}", removed.shared().options.output_folder ) } @@ -1398,7 +1398,7 @@ fn remove_files_and_dirs(info: &ManagedTorrentShared, files: &dyn TorrentStorage }; for dir in all_dirs { if let Err(e) = files.remove_directory_if_empty(dir) { - warn!("error removing {dir:?}: {e:?}"); + warn!("error removing {dir:?}: {e:#}"); } else { debug!("removed {dir:?}") } diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index deadf3bd0..73fb9ea05 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -154,7 +154,7 @@ macro_rules! poll_try_io { match e { Ok(r) => r, Err(e) => { - debug!("stream error {e:?}"); + debug!("stream error {e:#}"); return Poll::Ready(Err(e)); } } diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index 27926dff2..bd9a83ed3 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -163,7 +163,7 @@ impl std::fmt::Display for MessageDeserializeError { len_prefix, } => write!( f, - "error deserializing {name} (msg_id={msg_id}, len_prefix={len_prefix}): {error:?}" + "error deserializing {name} (msg_id={msg_id}, len_prefix={len_prefix}): {error:#}" ), MessageDeserializeError::Other(e) => write!(f, "{e}"), } From dedee2ef08998ec3376084ee657d961bd749dd82 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 13 Sep 2024 12:58:09 +0100 Subject: [PATCH 3/4] Watching works fine --- Cargo.lock | 1 + crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/session.rs | 4 ++ crates/librqbit/src/torrent_state/live/mod.rs | 2 +- crates/librqbit/src/watch.rs | 52 ++++++++++++++----- 5 files changed, 46 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4f96994f..00f23c059 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2535,6 +2535,7 @@ dependencies = [ "url", "urlencoding", "uuid", + "walkdir", ] [[package]] diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index d9870b57a..68eb2a5bc 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -107,6 +107,7 @@ tokio-socks = "0.5.2" async-trait = "0.1.81" async-backtrace = { version = "0.2", optional = true } notify = { version = "6.1.1", optional = true } +walkdir = "2.5.0" [build-dependencies] anyhow = "1" diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 3b8c59a4f..b9e2d0c4c 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1207,6 +1207,10 @@ impl Session { .context("error starting torrent")?; } + if let Some(name) = managed_torrent.shared().info.name.as_ref() { + info!(?name, id, "added torrent"); + } + Ok(AddTorrentResponse::Added(id, managed_torrent)) } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 8ed82c629..2d8d81974 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1519,7 +1519,7 @@ impl PeerHandler { match state.file_ops().write_chunk(addr, piece, chunk_info) { Ok(()) => {} Err(e) => { - error!("FATAL: error writing chunk to disk: {:?}", e); + error!("FATAL: error writing chunk to disk: {e:#}"); return state.on_fatal_error(e); } }; diff --git a/crates/librqbit/src/watch.rs b/crates/librqbit/src/watch.rs index b435928c5..41f1fa60a 100644 --- a/crates/librqbit/src/watch.rs +++ b/crates/librqbit/src/watch.rs @@ -10,6 +10,7 @@ use librqbit_core::torrent_metainfo::torrent_from_bytes; use notify::Watcher; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tracing::{debug, error, error_span, trace, warn}; +use walkdir::WalkDir; use crate::{AddTorrent, AddTorrentOptions, AddTorrentResponse, Session}; @@ -40,10 +41,14 @@ impl ThreadCancelEvent { } } -async fn watch_adder(session_w: Weak, mut rx: UnboundedReceiver>) { +async fn watch_adder( + session_w: Weak, + mut rx: UnboundedReceiver<(AddTorrent<'static>, PathBuf)>, +) { async fn add_one( session_w: &Weak, add_torrent: AddTorrent<'static>, + path: PathBuf, ) -> anyhow::Result<()> { let session = match session_w.upgrade() { Some(s) => s, @@ -57,19 +62,20 @@ async fn watch_adder(session_w: Weak, mut rx: UnboundedReceiver {} AddTorrentResponse::AlreadyManaged(_, _) => { - debug!("already managed"); + debug!(?path, "already managed"); } AddTorrentResponse::ListOnly(..) => bail!("bug: unexpected list only"), } Ok(()) } - while let Some(add_torrent) = rx.recv().await { - if let Err(e) = add_one(&session_w, add_torrent).await { + while let Some((add_torrent, path)) = rx.recv().await { + if let Err(e) = add_one(&session_w, add_torrent, path).await { warn!("error adding torrent: {e:#}"); } } @@ -77,7 +83,7 @@ async fn watch_adder(session_w: Weak, mut rx: UnboundedReceiver>, + tx: UnboundedSender<(AddTorrent<'static>, PathBuf)>, cancel_event: &ThreadCancelEvent, ) -> anyhow::Result<()> { fn read_and_validate_torrent(path: &Path) -> anyhow::Result> { @@ -92,7 +98,7 @@ fn watch_thread( fn watch_cb( ev: notify::Result, - tx: &UnboundedSender>, + tx: &UnboundedSender<(AddTorrent<'static>, PathBuf)>, ) -> anyhow::Result<()> { trace!(event=?ev, "watch event"); let ev = ev.context("error event")?; @@ -111,18 +117,37 @@ fn watch_thread( let add = match read_and_validate_torrent(&path) { Ok(add) => add, Err(e) => { - debug!(?path, "error validating torrent: {e:#}"); + warn!(?path, "error validating torrent: {e:#}"); continue; } }; - if tx.send(add).is_err() { + if tx.send((add, path.to_owned())).is_err() { return Ok(()); } } Ok(()) } + for entry in WalkDir::new(&folder) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .filter(|e| e.path().extension().and_then(|e| e.to_str()) == Some("torrent")) + { + let t = match read_and_validate_torrent(entry.path()) { + Ok(t) => t, + Err(e) => { + warn!(path=?entry.path(), "error validating torrent: {e:#}"); + continue; + } + }; + if tx.send((t, entry.path().to_owned())).is_err() { + debug!(?folder, "watcher thread done"); + return Ok(()); + } + } + let mut watcher = notify::recommended_watcher(move |ev| { if let Err(e) = watch_cb(ev, &tx) { warn!("error processing watch event: {e:#}"); @@ -164,10 +189,11 @@ impl Session { let session_span = self.rs(); std::thread::spawn(move || { let span = error_span!(parent: session_span, "watcher", folder=?watch_folder); - let _ = span.enter(); - if let Err(e) = watch_thread(watch_folder, tx, &cancel_event_2) { - error!("error in watcher thread: {e:#}"); - } + span.in_scope(move || { + if let Err(e) = watch_thread(watch_folder, tx, &cancel_event_2) { + error!("error in watcher thread: {e:#}"); + } + }) }); } } From a38385f3b5033a7757e000af5ecd83874bdfba42 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 13 Sep 2024 13:04:37 +0100 Subject: [PATCH 4/4] use walkdir create in another place instead of custom code --- crates/librqbit/src/create_torrent_file.rs | 29 ++++++---------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/crates/librqbit/src/create_torrent_file.rs b/crates/librqbit/src/create_torrent_file.rs index dcd6e5851..caecaef5b 100644 --- a/crates/librqbit/src/create_torrent_file.rs +++ b/crates/librqbit/src/create_torrent_file.rs @@ -20,28 +20,13 @@ pub struct CreateTorrentOptions<'a> { } fn walk_dir_find_paths(dir: &Path, out: &mut Vec>) -> anyhow::Result<()> { - let mut stack = vec![Cow::Borrowed(dir)]; - while let Some(dir) = stack.pop() { - let rd = std::fs::read_dir(&dir).with_context(|| format!("error reading {:?}", dir))?; - for element in rd { - let element = - element.with_context(|| format!("error reading DirEntry from {:?}", dir))?; - let ft = element.file_type().with_context(|| { - format!( - "error determining filetype of DirEntry {:?} while reading {:?}", - element.file_name(), - dir - ) - })?; - - let full_path = Cow::Owned(dir.join(element.file_name())); - if ft.is_dir() { - stack.push(full_path); - } else { - out.push(full_path); - } - } - } + out.extend( + walkdir::WalkDir::new(dir) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .map(|e| e.path().to_owned().into()), + ); Ok(()) }