diff --git a/Cargo.lock b/Cargo.lock index 366bb647c..00f23c059 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1378,6 +1378,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "funty" version = "2.0.0" @@ -2221,6 +2230,26 @@ dependencies = [ "cfb", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.13" @@ -2356,6 +2385,26 @@ dependencies = [ "serde_json", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "kuchikiki" version = "0.8.2" @@ -2462,6 +2511,7 @@ dependencies = [ "lru", "memmap2", "mime_guess", + "notify", "parking_lot", "rand 0.8.5", "regex", @@ -2485,6 +2535,7 @@ dependencies = [ "url", "urlencoding", "uuid", + "walkdir", ] [[package]] @@ -2842,6 +2893,18 @@ dependencies = [ "adler2", ] +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.0.2" @@ -2939,6 +3002,25 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.6.0", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -5204,7 +5286,7 @@ dependencies = [ "backtrace", "bytes", "libc", - "mio", + "mio 1.0.2", "parking_lot", "pin-project-lite", "signal-hook-registry", diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 2c1a6abc4..2ebfdaf28 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -265,7 +265,7 @@ impl RecursiveRequest { let request_one = |id, addr, depth| { req.request_one(id, addr, depth) .map_err(|e| { - debug!("error: {e:?}"); + debug!("error: {e:#}"); e }) .instrument(error_span!( @@ -341,7 +341,7 @@ impl RecursiveRequest { Ok(n) if n < 8 => REQUERY_INTERVAL / 8 * (n as u32), Ok(_) => REQUERY_INTERVAL, Err(e) => { - error!("error in get_peers_root(): {e:?}"); + error!("error in get_peers_root(): {e:#}"); return Err::<(), anyhow::Error>(e); } }; @@ -359,7 +359,7 @@ impl RecursiveRequest { let (id, addr, depth) = addr.unwrap(); futs.push( this.request_one(id, addr, depth) - .map_err(|e| debug!("error: {e:?}")) + .map_err(|e| debug!("error: {e:#}")) .instrument(error_span!("addr", addr=addr.to_string())) ); } @@ -996,7 +996,7 @@ impl DhtWorker { }, Err(e) => { self.dht.routing_table.write().mark_error(&id); - debug!("error: {e:?}"); + debug!("error: {e:#}"); } } }.instrument(error_span!("ping", addr=addr.to_string()))) @@ -1033,7 +1033,7 @@ impl DhtWorker { ) .unwrap(); if let Err(e) = socket.send_to(&buf, addr).await { - debug!("error sending to {addr}: {e:?}"); + debug!("error sending to {addr}: {e:#}"); if let Some(tid) = our_tid { self.on_send_error(tid, addr, e.into()); } diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index fc2874ce9..68eb2a5bc 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -36,6 +36,7 @@ storage_examples = [] tracing-subscriber-utils = ["tracing-subscriber"] postgres = ["sqlx"] async-bt = ["async-backtrace"] +watch = ["notify"] [dependencies] sqlx = { version = "0.8.2", features = [ @@ -105,6 +106,8 @@ mime_guess = { version = "2.0.5", default-features = false } tokio-socks = "0.5.2" async-trait = "0.1.81" async-backtrace = { version = "0.2", optional = true } +notify = { version = "6.1.1", optional = true } +walkdir = "2.5.0" [build-dependencies] anyhow = "1" diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 2fc78e4c5..4dfc0d509 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -72,7 +72,7 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash { macro_rules! visit_int { ($v:expr) => {{ - let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:?}")))?; + let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:#}")))?; Ok(TorrentIdOrHash::from(tid)) }}; } @@ -118,7 +118,7 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash { { TorrentIdOrHash::parse(v).map_err(|e| { E::custom(format!( - "expected integer or 40 byte info hash, couldn't parse string: {e:?}" + "expected integer or 40 byte info hash, couldn't parse string: {e:#}" )) }) } diff --git a/crates/librqbit/src/create_torrent_file.rs b/crates/librqbit/src/create_torrent_file.rs index dcd6e5851..caecaef5b 100644 --- a/crates/librqbit/src/create_torrent_file.rs +++ b/crates/librqbit/src/create_torrent_file.rs @@ -20,28 +20,13 @@ pub struct CreateTorrentOptions<'a> { } fn walk_dir_find_paths(dir: &Path, out: &mut Vec>) -> anyhow::Result<()> { - let mut stack = vec![Cow::Borrowed(dir)]; - while let Some(dir) = stack.pop() { - let rd = std::fs::read_dir(&dir).with_context(|| format!("error reading {:?}", dir))?; - for element in rd { - let element = - element.with_context(|| format!("error reading DirEntry from {:?}", dir))?; - let ft = element.file_type().with_context(|| { - format!( - "error determining filetype of DirEntry {:?} while reading {:?}", - element.file_name(), - dir - ) - })?; - - let full_path = Cow::Owned(dir.join(element.file_name())); - if ft.is_dir() { - stack.push(full_path); - } else { - out.push(full_path); - } - } - } + out.extend( + walkdir::WalkDir::new(dir) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .map(|e| e.path().to_owned().into()), + ); Ok(()) } diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index f8543c9a4..455381cbf 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -67,6 +67,8 @@ pub mod tracing_subscriber_config_utils; mod type_aliases; #[cfg(all(feature = "http-api", feature = "upnp-serve-adapter"))] pub mod upnp_server_adapter; +#[cfg(feature = "watch")] +pub mod watch; pub use api::Api; pub use api_error::ApiError; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 0ddf83e0a..b9e2d0c4c 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -673,7 +673,7 @@ impl Session { tokio::select! { Some(res) = futs.next(), if !futs.is_empty() => { if let Err(e) = res { - error!("error adding torrent to session: {e:?}"); + error!("error adding torrent to session: {e:#}"); } } st = ps.next(), if !added_all => { @@ -1207,6 +1207,10 @@ impl Session { .context("error starting torrent")?; } + if let Some(name) = managed_torrent.shared().info.name.as_ref() { + info!(?name, id, "added torrent"); + } + Ok(AddTorrentResponse::Added(id, managed_torrent)) } @@ -1248,7 +1252,7 @@ impl Session { .with_context(|| format!("torrent with id {} did not exist", id))?; if let Err(e) = removed.pause() { - debug!("error pausing torrent before deletion: {e:?}") + debug!("error pausing torrent before deletion: {e:#}") } let storage = removed @@ -1259,7 +1263,7 @@ impl Session { .pause() // inspect_err not available in 1.75 .map_err(|e| { - warn!("error pausing torrent: {e:?}"); + warn!("error pausing torrent: {e:#}"); e }) .ok() @@ -1285,7 +1289,7 @@ impl Session { if removed.shared().options.output_folder != self.output_folder { if let Err(e) = storage.remove_directory_if_empty(Path::new("")) { warn!( - "error removing {:?}: {e:?}", + "error removing {:?}: {e:#}", removed.shared().options.output_folder ) } @@ -1398,7 +1402,7 @@ fn remove_files_and_dirs(info: &ManagedTorrentShared, files: &dyn TorrentStorage }; for dir in all_dirs { if let Err(e) = files.remove_directory_if_empty(dir) { - warn!("error removing {dir:?}: {e:?}"); + warn!("error removing {dir:?}: {e:#}"); } else { debug!("removed {dir:?}") } diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 67b6fcb5c..3b8eda020 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -147,7 +147,7 @@ async fn _test_e2e_download(drop_checks: &DropChecks) { } Ok(true) } - crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"), + crate::ManagedTorrentState::Error(e) => bail!("error: {e:#}"), _ => bail!("broken state"), }) .context("error checking for torrent liveness")?; diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 8ed82c629..2d8d81974 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1519,7 +1519,7 @@ impl PeerHandler { match state.file_ops().write_chunk(addr, piece, chunk_info) { Ok(()) => {} Err(e) => { - error!("FATAL: error writing chunk to disk: {:?}", e); + error!("FATAL: error writing chunk to disk: {e:#}"); return state.on_fatal_error(e); } }; diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index deadf3bd0..73fb9ea05 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -154,7 +154,7 @@ macro_rules! poll_try_io { match e { Ok(r) => r, Err(e) => { - debug!("stream error {e:?}"); + debug!("stream error {e:#}"); return Poll::Ready(Err(e)); } } diff --git a/crates/librqbit/src/watch.rs b/crates/librqbit/src/watch.rs new file mode 100644 index 000000000..41f1fa60a --- /dev/null +++ b/crates/librqbit/src/watch.rs @@ -0,0 +1,199 @@ +use std::{ + io::Read, + path::{Path, PathBuf}, + sync::{Arc, Weak}, +}; + +use anyhow::{bail, Context}; +use buffers::ByteBuf; +use librqbit_core::torrent_metainfo::torrent_from_bytes; +use notify::Watcher; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tracing::{debug, error, error_span, trace, warn}; +use walkdir::WalkDir; + +use crate::{AddTorrent, AddTorrentOptions, AddTorrentResponse, Session}; + +struct ThreadCancelEvent { + mutex: parking_lot::Mutex, + condvar: parking_lot::Condvar, +} + +impl ThreadCancelEvent { + fn new() -> Arc { + Arc::new(Self { + mutex: parking_lot::Mutex::new(false), + condvar: parking_lot::Condvar::new(), + }) + } + + fn cancel(&self) { + let mut g = self.mutex.lock(); + *g = true; + self.condvar.notify_all(); + } + + fn wait_until_cancelled(&self) { + let mut g = self.mutex.lock(); + while !*g { + self.condvar.wait(&mut g); + } + } +} + +async fn watch_adder( + session_w: Weak, + mut rx: UnboundedReceiver<(AddTorrent<'static>, PathBuf)>, +) { + async fn add_one( + session_w: &Weak, + add_torrent: AddTorrent<'static>, + path: PathBuf, + ) -> anyhow::Result<()> { + let session = match session_w.upgrade() { + Some(s) => s, + None => return Ok(()), + }; + let res = session + .add_torrent( + add_torrent, + Some(AddTorrentOptions { + overwrite: true, + ..Default::default() + }), + ) + .await + .with_context(|| format!("error adding torrent from {path:?}"))?; + match res { + AddTorrentResponse::Added(_, _) => {} + AddTorrentResponse::AlreadyManaged(_, _) => { + debug!(?path, "already managed"); + } + AddTorrentResponse::ListOnly(..) => bail!("bug: unexpected list only"), + } + Ok(()) + } + + while let Some((add_torrent, path)) = rx.recv().await { + if let Err(e) = add_one(&session_w, add_torrent, path).await { + warn!("error adding torrent: {e:#}"); + } + } +} + +fn watch_thread( + folder: PathBuf, + tx: UnboundedSender<(AddTorrent<'static>, PathBuf)>, + cancel_event: &ThreadCancelEvent, +) -> anyhow::Result<()> { + fn read_and_validate_torrent(path: &Path) -> anyhow::Result> { + let mut buf = Vec::new(); + std::fs::File::open(path) + .context("error opening")? + .read_to_end(&mut buf) + .context("error reading")?; + torrent_from_bytes::(&buf).context("invalid .torrent file")?; + Ok(AddTorrent::from_bytes(buf)) + } + + fn watch_cb( + ev: notify::Result, + tx: &UnboundedSender<(AddTorrent<'static>, PathBuf)>, + ) -> anyhow::Result<()> { + trace!(event=?ev, "watch event"); + let ev = ev.context("error event")?; + match ev.kind { + notify::EventKind::Create(_) | notify::EventKind::Modify(_) => {} + other => { + debug!(kind=?other, paths=?ev.paths, "ignoring event"); + return Ok(()); + } + } + for path in ev.paths { + if path.extension().and_then(|e| e.to_str()) != Some("torrent") { + trace!(?path, "ignoring path"); + continue; + } + let add = match read_and_validate_torrent(&path) { + Ok(add) => add, + Err(e) => { + warn!(?path, "error validating torrent: {e:#}"); + continue; + } + }; + + if tx.send((add, path.to_owned())).is_err() { + return Ok(()); + } + } + Ok(()) + } + + for entry in WalkDir::new(&folder) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .filter(|e| e.path().extension().and_then(|e| e.to_str()) == Some("torrent")) + { + let t = match read_and_validate_torrent(entry.path()) { + Ok(t) => t, + Err(e) => { + warn!(path=?entry.path(), "error validating torrent: {e:#}"); + continue; + } + }; + if tx.send((t, entry.path().to_owned())).is_err() { + debug!(?folder, "watcher thread done"); + return Ok(()); + } + } + + let mut watcher = notify::recommended_watcher(move |ev| { + if let Err(e) = watch_cb(ev, &tx) { + warn!("error processing watch event: {e:#}"); + } + }) + .context("error creating watcher")?; + watcher + .watch(&folder, notify::RecursiveMode::Recursive) + .context("error watching")?; + cancel_event.wait_until_cancelled(); + debug!(?folder, "watcher thread done"); + Ok(()) +} + +impl Session { + pub fn watch_folder(self: &Arc, watch_folder: &Path) { + let session_w = Arc::downgrade(self); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + self.spawn(error_span!("watch_adder", ?watch_folder), async move { + watch_adder(session_w, rx).await; + Ok(()) + }); + + let cancel_event = ThreadCancelEvent::new(); + let cancel_event_2 = cancel_event.clone(); + let cancel_token = self.cancellation_token().clone(); + crate::spawn_utils::spawn( + "watch_cancel", + error_span!("watch_cancel", ?watch_folder), + async move { + cancel_token.cancelled().await; + trace!("canceling watcher"); + cancel_event.cancel(); + Ok(()) + }, + ); + + let watch_folder = PathBuf::from(watch_folder); + let session_span = self.rs(); + std::thread::spawn(move || { + let span = error_span!(parent: session_span, "watcher", folder=?watch_folder); + span.in_scope(move || { + if let Err(e) = watch_thread(watch_folder, tx, &cancel_event_2) { + error!("error in watcher thread: {e:#}"); + } + }) + }); + } +} diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index 27926dff2..bd9a83ed3 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -163,7 +163,7 @@ impl std::fmt::Display for MessageDeserializeError { len_prefix, } => write!( f, - "error deserializing {name} (msg_id={msg_id}, len_prefix={len_prefix}): {error:?}" + "error deserializing {name} (msg_id={msg_id}, len_prefix={len_prefix}): {error:#}" ), MessageDeserializeError::Other(e) => write!(f, "{e}"), } diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index 8865888b3..1a0c0469d 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -27,6 +27,7 @@ librqbit = { version = "7.1.0-beta.0", path = "../librqbit", default-features = "http-api", "tracing-subscriber-utils", "upnp-serve-adapter", + "watch", ] } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } console-subscriber = { version = "0.4", optional = true } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 4e37de20a..561d52d37 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -1,4 +1,11 @@ -use std::{io, net::SocketAddr, path::PathBuf, sync::Arc, thread, time::Duration}; +use std::{ + io, + net::SocketAddr, + path::{Path, PathBuf}, + sync::Arc, + thread, + time::Duration, +}; use anyhow::{bail, Context}; use clap::{CommandFactory, Parser, ValueEnum}; @@ -236,6 +243,11 @@ struct ServerStartOptions { /// [Experimental] if set, will try to resume quickly after restart and skip checksumming. #[arg(long = "fastresume", env = "RQBIT_FASTRESUME")] fastresume: bool, + + /// The folder to watch for added .torrent files. All files in this folder will be automatically added + /// to the session. + #[arg(long = "watch-folder", env = "RQBIT_WATCH_FOLDER")] + watch_folder: Option, } #[derive(Parser)] @@ -580,7 +592,7 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> }; let api = Api::new( - session, + session.clone(), Some(log_config.rust_log_reload_tx), Some(log_config.line_broadcast), ); @@ -595,6 +607,10 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> let upnp_router = upnp_server.as_mut().and_then(|s| s.take_router().ok()); let http_api_fut = http_api.make_http_api_and_run(tcp_listener, upnp_router); + if let Some(watch_folder) = start_opts.watch_folder.as_ref() { + session.watch_folder(Path::new(watch_folder)); + } + let res = match upnp_server { Some(srv) => { let upnp_fut = srv.run_ssdp_forever(); @@ -609,7 +625,6 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> r = http_api_fut => r, }, }; - res.context("error running server") } },