Skip to content

Commit

Permalink
Merge pull request #237 from ikatson/watch
Browse files Browse the repository at this point in the history
[Feature] watching a directory for .torrent files and adding them automatically
  • Loading branch information
ikatson authored Sep 13, 2024
2 parents 624b41a + a38385f commit 932131b
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 42 deletions.
84 changes: 83 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions crates/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksFindNodes> {
let request_one = |id, addr, depth| {
req.request_one(id, addr, depth)
.map_err(|e| {
debug!("error: {e:?}");
debug!("error: {e:#}");
e
})
.instrument(error_span!(
Expand Down Expand Up @@ -341,7 +341,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksGetPeers> {
Ok(n) if n < 8 => REQUERY_INTERVAL / 8 * (n as u32),
Ok(_) => REQUERY_INTERVAL,
Err(e) => {
error!("error in get_peers_root(): {e:?}");
error!("error in get_peers_root(): {e:#}");
return Err::<(), anyhow::Error>(e);
}
};
Expand All @@ -359,7 +359,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksGetPeers> {
let (id, addr, depth) = addr.unwrap();
futs.push(
this.request_one(id, addr, depth)
.map_err(|e| debug!("error: {e:?}"))
.map_err(|e| debug!("error: {e:#}"))
.instrument(error_span!("addr", addr=addr.to_string()))
);
}
Expand Down Expand Up @@ -996,7 +996,7 @@ impl DhtWorker {
},
Err(e) => {
self.dht.routing_table.write().mark_error(&id);
debug!("error: {e:?}");
debug!("error: {e:#}");
}
}
}.instrument(error_span!("ping", addr=addr.to_string())))
Expand Down Expand Up @@ -1033,7 +1033,7 @@ impl DhtWorker {
)
.unwrap();
if let Err(e) = socket.send_to(&buf, addr).await {
debug!("error sending to {addr}: {e:?}");
debug!("error sending to {addr}: {e:#}");
if let Some(tid) = our_tid {
self.on_send_error(tid, addr, e.into());
}
Expand Down
3 changes: 3 additions & 0 deletions crates/librqbit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ storage_examples = []
tracing-subscriber-utils = ["tracing-subscriber"]
postgres = ["sqlx"]
async-bt = ["async-backtrace"]
watch = ["notify"]

[dependencies]
sqlx = { version = "0.8.2", features = [
Expand Down Expand Up @@ -105,6 +106,8 @@ mime_guess = { version = "2.0.5", default-features = false }
tokio-socks = "0.5.2"
async-trait = "0.1.81"
async-backtrace = { version = "0.2", optional = true }
notify = { version = "6.1.1", optional = true }
walkdir = "2.5.0"

[build-dependencies]
anyhow = "1"
Expand Down
4 changes: 2 additions & 2 deletions crates/librqbit/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash {

macro_rules! visit_int {
($v:expr) => {{
let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:?}")))?;
let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:#}")))?;
Ok(TorrentIdOrHash::from(tid))
}};
}
Expand Down Expand Up @@ -118,7 +118,7 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash {
{
TorrentIdOrHash::parse(v).map_err(|e| {
E::custom(format!(
"expected integer or 40 byte info hash, couldn't parse string: {e:?}"
"expected integer or 40 byte info hash, couldn't parse string: {e:#}"
))
})
}
Expand Down
29 changes: 7 additions & 22 deletions crates/librqbit/src/create_torrent_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,13 @@ pub struct CreateTorrentOptions<'a> {
}

fn walk_dir_find_paths(dir: &Path, out: &mut Vec<Cow<'_, Path>>) -> anyhow::Result<()> {
let mut stack = vec![Cow::Borrowed(dir)];
while let Some(dir) = stack.pop() {
let rd = std::fs::read_dir(&dir).with_context(|| format!("error reading {:?}", dir))?;
for element in rd {
let element =
element.with_context(|| format!("error reading DirEntry from {:?}", dir))?;
let ft = element.file_type().with_context(|| {
format!(
"error determining filetype of DirEntry {:?} while reading {:?}",
element.file_name(),
dir
)
})?;

let full_path = Cow::Owned(dir.join(element.file_name()));
if ft.is_dir() {
stack.push(full_path);
} else {
out.push(full_path);
}
}
}
out.extend(
walkdir::WalkDir::new(dir)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.map(|e| e.path().to_owned().into()),
);
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions crates/librqbit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub mod tracing_subscriber_config_utils;
mod type_aliases;
#[cfg(all(feature = "http-api", feature = "upnp-serve-adapter"))]
pub mod upnp_server_adapter;
#[cfg(feature = "watch")]
pub mod watch;

pub use api::Api;
pub use api_error::ApiError;
Expand Down
14 changes: 9 additions & 5 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ impl Session {
tokio::select! {
Some(res) = futs.next(), if !futs.is_empty() => {
if let Err(e) = res {
error!("error adding torrent to session: {e:?}");
error!("error adding torrent to session: {e:#}");
}
}
st = ps.next(), if !added_all => {
Expand Down Expand Up @@ -1207,6 +1207,10 @@ impl Session {
.context("error starting torrent")?;
}

if let Some(name) = managed_torrent.shared().info.name.as_ref() {
info!(?name, id, "added torrent");
}

Ok(AddTorrentResponse::Added(id, managed_torrent))
}

Expand Down Expand Up @@ -1248,7 +1252,7 @@ impl Session {
.with_context(|| format!("torrent with id {} did not exist", id))?;

if let Err(e) = removed.pause() {
debug!("error pausing torrent before deletion: {e:?}")
debug!("error pausing torrent before deletion: {e:#}")
}

let storage = removed
Expand All @@ -1259,7 +1263,7 @@ impl Session {
.pause()
// inspect_err not available in 1.75
.map_err(|e| {
warn!("error pausing torrent: {e:?}");
warn!("error pausing torrent: {e:#}");
e
})
.ok()
Expand All @@ -1285,7 +1289,7 @@ impl Session {
if removed.shared().options.output_folder != self.output_folder {
if let Err(e) = storage.remove_directory_if_empty(Path::new("")) {
warn!(
"error removing {:?}: {e:?}",
"error removing {:?}: {e:#}",
removed.shared().options.output_folder
)
}
Expand Down Expand Up @@ -1398,7 +1402,7 @@ fn remove_files_and_dirs(info: &ManagedTorrentShared, files: &dyn TorrentStorage
};
for dir in all_dirs {
if let Err(e) = files.remove_directory_if_empty(dir) {
warn!("error removing {dir:?}: {e:?}");
warn!("error removing {dir:?}: {e:#}");
} else {
debug!("removed {dir:?}")
}
Expand Down
2 changes: 1 addition & 1 deletion crates/librqbit/src/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async fn _test_e2e_download(drop_checks: &DropChecks) {
}
Ok(true)
}
crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"),
crate::ManagedTorrentState::Error(e) => bail!("error: {e:#}"),
_ => bail!("broken state"),
})
.context("error checking for torrent liveness")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/librqbit/src/torrent_state/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1519,7 +1519,7 @@ impl PeerHandler {
match state.file_ops().write_chunk(addr, piece, chunk_info) {
Ok(()) => {}
Err(e) => {
error!("FATAL: error writing chunk to disk: {:?}", e);
error!("FATAL: error writing chunk to disk: {e:#}");
return state.on_fatal_error(e);
}
};
Expand Down
2 changes: 1 addition & 1 deletion crates/librqbit/src/torrent_state/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ macro_rules! poll_try_io {
match e {
Ok(r) => r,
Err(e) => {
debug!("stream error {e:?}");
debug!("stream error {e:#}");
return Poll::Ready(Err(e));
}
}
Expand Down
Loading

0 comments on commit 932131b

Please sign in to comment.