Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] watching a directory for .torrent files and adding them automatically #237

Merged
merged 4 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions crates/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksFindNodes> {
let request_one = |id, addr, depth| {
req.request_one(id, addr, depth)
.map_err(|e| {
debug!("error: {e:?}");
debug!("error: {e:#}");
e
})
.instrument(error_span!(
Expand Down Expand Up @@ -341,7 +341,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksGetPeers> {
Ok(n) if n < 8 => REQUERY_INTERVAL / 8 * (n as u32),
Ok(_) => REQUERY_INTERVAL,
Err(e) => {
error!("error in get_peers_root(): {e:?}");
error!("error in get_peers_root(): {e:#}");
return Err::<(), anyhow::Error>(e);
}
};
Expand All @@ -359,7 +359,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksGetPeers> {
let (id, addr, depth) = addr.unwrap();
futs.push(
this.request_one(id, addr, depth)
.map_err(|e| debug!("error: {e:?}"))
.map_err(|e| debug!("error: {e:#}"))
.instrument(error_span!("addr", addr=addr.to_string()))
);
}
Expand Down Expand Up @@ -996,7 +996,7 @@ impl DhtWorker {
},
Err(e) => {
self.dht.routing_table.write().mark_error(&id);
debug!("error: {e:?}");
debug!("error: {e:#}");
}
}
}.instrument(error_span!("ping", addr=addr.to_string())))
Expand Down Expand Up @@ -1033,7 +1033,7 @@ impl DhtWorker {
)
.unwrap();
if let Err(e) = socket.send_to(&buf, addr).await {
debug!("error sending to {addr}: {e:?}");
debug!("error sending to {addr}: {e:#}");
if let Some(tid) = our_tid {
self.on_send_error(tid, addr, e.into());
}
Expand Down
3 changes: 3 additions & 0 deletions crates/librqbit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ storage_examples = []
tracing-subscriber-utils = ["tracing-subscriber"]
postgres = ["sqlx"]
async-bt = ["async-backtrace"]
watch = ["notify"]

[dependencies]
sqlx = { version = "0.8.2", features = [
Expand Down Expand Up @@ -105,6 +106,8 @@ mime_guess = { version = "2.0.5", default-features = false }
tokio-socks = "0.5.2"
async-trait = "0.1.81"
async-backtrace = { version = "0.2", optional = true }
notify = { version = "6.1.1", optional = true }
walkdir = "2.5.0"

[build-dependencies]
anyhow = "1"
Expand Down
4 changes: 2 additions & 2 deletions crates/librqbit/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash {

macro_rules! visit_int {
($v:expr) => {{
let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:?}")))?;
let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:#}")))?;
Ok(TorrentIdOrHash::from(tid))
}};
}
Expand Down Expand Up @@ -118,7 +118,7 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash {
{
TorrentIdOrHash::parse(v).map_err(|e| {
E::custom(format!(
"expected integer or 40 byte info hash, couldn't parse string: {e:?}"
"expected integer or 40 byte info hash, couldn't parse string: {e:#}"
))
})
}
Expand Down
29 changes: 7 additions & 22 deletions crates/librqbit/src/create_torrent_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,13 @@ pub struct CreateTorrentOptions<'a> {
}

fn walk_dir_find_paths(dir: &Path, out: &mut Vec<Cow<'_, Path>>) -> anyhow::Result<()> {
let mut stack = vec![Cow::Borrowed(dir)];
while let Some(dir) = stack.pop() {
let rd = std::fs::read_dir(&dir).with_context(|| format!("error reading {:?}", dir))?;
for element in rd {
let element =
element.with_context(|| format!("error reading DirEntry from {:?}", dir))?;
let ft = element.file_type().with_context(|| {
format!(
"error determining filetype of DirEntry {:?} while reading {:?}",
element.file_name(),
dir
)
})?;

let full_path = Cow::Owned(dir.join(element.file_name()));
if ft.is_dir() {
stack.push(full_path);
} else {
out.push(full_path);
}
}
}
out.extend(
walkdir::WalkDir::new(dir)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.map(|e| e.path().to_owned().into()),
);
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions crates/librqbit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub mod tracing_subscriber_config_utils;
mod type_aliases;
#[cfg(all(feature = "http-api", feature = "upnp-serve-adapter"))]
pub mod upnp_server_adapter;
#[cfg(feature = "watch")]
pub mod watch;

pub use api::Api;
pub use api_error::ApiError;
Expand Down
14 changes: 9 additions & 5 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ impl Session {
tokio::select! {
Some(res) = futs.next(), if !futs.is_empty() => {
if let Err(e) = res {
error!("error adding torrent to session: {e:?}");
error!("error adding torrent to session: {e:#}");
}
}
st = ps.next(), if !added_all => {
Expand Down Expand Up @@ -1207,6 +1207,10 @@ impl Session {
.context("error starting torrent")?;
}

if let Some(name) = managed_torrent.shared().info.name.as_ref() {
info!(?name, id, "added torrent");
}

Ok(AddTorrentResponse::Added(id, managed_torrent))
}

Expand Down Expand Up @@ -1248,7 +1252,7 @@ impl Session {
.with_context(|| format!("torrent with id {} did not exist", id))?;

if let Err(e) = removed.pause() {
debug!("error pausing torrent before deletion: {e:?}")
debug!("error pausing torrent before deletion: {e:#}")
}

let storage = removed
Expand All @@ -1259,7 +1263,7 @@ impl Session {
.pause()
// inspect_err not available in 1.75
.map_err(|e| {
warn!("error pausing torrent: {e:?}");
warn!("error pausing torrent: {e:#}");
e
})
.ok()
Expand All @@ -1285,7 +1289,7 @@ impl Session {
if removed.shared().options.output_folder != self.output_folder {
if let Err(e) = storage.remove_directory_if_empty(Path::new("")) {
warn!(
"error removing {:?}: {e:?}",
"error removing {:?}: {e:#}",
removed.shared().options.output_folder
)
}
Expand Down Expand Up @@ -1398,7 +1402,7 @@ fn remove_files_and_dirs(info: &ManagedTorrentShared, files: &dyn TorrentStorage
};
for dir in all_dirs {
if let Err(e) = files.remove_directory_if_empty(dir) {
warn!("error removing {dir:?}: {e:?}");
warn!("error removing {dir:?}: {e:#}");
} else {
debug!("removed {dir:?}")
}
Expand Down
2 changes: 1 addition & 1 deletion crates/librqbit/src/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async fn _test_e2e_download(drop_checks: &DropChecks) {
}
Ok(true)
}
crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"),
crate::ManagedTorrentState::Error(e) => bail!("error: {e:#}"),
_ => bail!("broken state"),
})
.context("error checking for torrent liveness")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/librqbit/src/torrent_state/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1519,7 +1519,7 @@ impl PeerHandler {
match state.file_ops().write_chunk(addr, piece, chunk_info) {
Ok(()) => {}
Err(e) => {
error!("FATAL: error writing chunk to disk: {:?}", e);
error!("FATAL: error writing chunk to disk: {e:#}");
return state.on_fatal_error(e);
}
};
Expand Down
2 changes: 1 addition & 1 deletion crates/librqbit/src/torrent_state/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ macro_rules! poll_try_io {
match e {
Ok(r) => r,
Err(e) => {
debug!("stream error {e:?}");
debug!("stream error {e:#}");
return Poll::Ready(Err(e));
}
}
Expand Down
Loading