From 5740d3ebe92a0f1a090524f319e23dfca2cab4c8 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 12 Aug 2024 23:10:11 +0100 Subject: [PATCH 01/11] create InteralAddResult --- crates/librqbit/src/session.rs | 55 ++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 501260e2a..54ae996bc 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -475,6 +475,14 @@ pub(crate) struct CheckedIncomingConnection { pub handshake: Handshake, } +struct InternalAddResult { + info_hash: Id20, + info: TorrentMetaV1Info, + trackers: Vec, + peer_rx: Option, + initial_peers: Vec, +} + impl Session { /// Create a new session with default options. /// The passed in folder will be used as a default unless overriden per torrent. @@ -910,7 +918,7 @@ impl Session { // into a torrent file by connecting to peers that support extended handshakes. // So we must discover at least one peer and connect to it to be able to proceed further. - let (info_hash, info, trackers, peer_rx, initial_peers) = match add { + let add_res = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { let magnet = Magnet::parse(&magnet) .context("provided path is not a valid magnet URL")?; @@ -950,13 +958,13 @@ impl Session { } }; debug!(?info, "received result from DHT"); - ( + InternalAddResult { info_hash, info, - magnet.trackers.into_iter().unique().collect(), - Some(peer_rx), - initial_peers, - ) + trackers: magnet.trackers.into_iter().unique().collect(), + peer_rx: Some(peer_rx), + initial_peers: initial_peers.into_iter().collect(), + } } other => { let torrent = match other { @@ -1004,29 +1012,22 @@ impl Session { )? }; - ( - torrent.info_hash, - torrent.info, + InternalAddResult { + info_hash: torrent.info_hash, + info: torrent.info, trackers, peer_rx, - opts.initial_peers + initial_peers: opts + .initial_peers .clone() .unwrap_or_default() .into_iter() .collect(), - ) + } } }; - self.main_torrent_info( - info_hash, - info, - trackers, - peer_rx, - initial_peers.into_iter().collect(), - opts, - ) - .await + self.main_torrent_info(add_res, opts).await } .boxed() } @@ -1060,13 +1061,17 @@ impl Session { async fn main_torrent_info( &self, - info_hash: Id20, - info: TorrentMetaV1Info, - trackers: Vec, - peer_rx: Option, - initial_peers: Vec, + add_res: InternalAddResult, mut opts: AddTorrentOptions, ) -> anyhow::Result { + let InternalAddResult { + info, + info_hash, + trackers, + peer_rx, + initial_peers, + } = add_res; + debug!("Torrent info: {:#?}", &info); let only_files = compute_only_files( From 41a2cd58b315af60a182c632c6d33bd731e9dcbf Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 12 Aug 2024 23:24:11 +0100 Subject: [PATCH 02/11] Store torrent bytes --- crates/librqbit/src/api.rs | 1 + crates/librqbit/src/dht_utils.rs | 3 +- crates/librqbit/src/peer_info_reader/mod.rs | 14 +++--- crates/librqbit/src/session.rs | 51 ++++++++++++++------- 4 files changed, 46 insertions(+), 23 deletions(-) diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 78ed72b7a..4a253aefd 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -191,6 +191,7 @@ impl Api { only_files, seen_peers, output_folder, + .. }) => ApiAddTorrentResponse { id: None, output_folder: output_folder.to_string_lossy().into_owned(), diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index d348a7b2b..c01b5526c 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -16,6 +16,7 @@ use librqbit_core::hash_id::Id20; pub enum ReadMetainfoResult { Found { info: TorrentMetaV1Info, + bytes: ByteBufOwned, rx: Rx, seen: HashSet, }, @@ -80,7 +81,7 @@ pub async fn read_metainfo_from_peer_receiver + Unp }, done = unordered.next(), if !unordered.is_empty() => { match done { - Some(Ok(info)) => return ReadMetainfoResult::Found { info, seen, rx: addrs }, + Some(Ok(info)) => return ReadMetainfoResult::Found { info: info.0, bytes: info.1, seen, rx: addrs }, Some(Err(e)) => { debug!("{:#}", e); }, diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index dc3e0b90a..a2b170d35 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -32,9 +32,10 @@ pub(crate) async fn read_metainfo_from_peer( peer_connection_options: Option, spawner: BlockingSpawner, connector: Arc, -) -> anyhow::Result> { - let (result_tx, result_rx) = - tokio::sync::oneshot::channel::>>(); +) -> anyhow::Result { + let (result_tx, result_rx) = tokio::sync::oneshot::channel::< + anyhow::Result<(TorrentMetaV1Info, ByteBufOwned)>, + >(); let (writer_tx, writer_rx) = tokio::sync::mpsc::unbounded_channel::(); let handler = Handler { addr, @@ -135,13 +136,13 @@ impl HandlerLocked { } } +pub type TorrentAndBytes = (TorrentMetaV1Info, ByteBufOwned); + struct Handler { addr: SocketAddr, info_hash: Id20, writer_tx: UnboundedSender, - result_tx: Mutex< - Option>>>, - >, + result_tx: Mutex>>>, locked: RwLock>, } @@ -179,6 +180,7 @@ impl PeerConnectionHandler for Handler { if piece_ready { let buf = self.locked.write().take().unwrap().buffer; let info = from_bytes::>(&buf); + let info = info.map(|i| (i, ByteBufOwned(buf.into_boxed_slice()))); self.result_tx .lock() .take() diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 54ae996bc..e2016826a 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -207,7 +207,7 @@ pub struct Session { async fn torrent_from_url( reqwest_client: &reqwest::Client, url: &str, -) -> anyhow::Result { +) -> anyhow::Result<(TorrentMetaV1Owned, ByteBufOwned)> { let response = reqwest_client .get(url) .send() @@ -220,7 +220,10 @@ async fn torrent_from_url( .bytes() .await .with_context(|| format!("error reading response body from {url}"))?; - torrent_from_bytes(&b).context("error decoding torrent") + Ok(( + torrent_from_bytes(&b).context("error decoding torrent")?, + b.to_vec().into(), + )) } fn compute_only_files_regex>( @@ -344,6 +347,7 @@ pub struct ListOnlyResponse { pub only_files: Option>, pub output_folder: PathBuf, pub seen_peers: Vec, + pub torrent_bytes: ByteBufOwned, } #[allow(clippy::large_enum_variant)] @@ -478,6 +482,7 @@ pub(crate) struct CheckedIncomingConnection { struct InternalAddResult { info_hash: Id20, info: TorrentMetaV1Info, + torrent_bytes: ByteBufOwned, trackers: Vec, peer_rx: Option, initial_peers: Vec, @@ -942,7 +947,7 @@ impl Session { }; debug!(?info_hash, "querying DHT"); - let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver( + match read_metainfo_from_peer_receiver( self.peer_id, info_hash, opts.initial_peers.clone().unwrap_or_default(), @@ -952,22 +957,29 @@ impl Session { ) .await { - ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), + ReadMetainfoResult::Found { + info, + bytes, + rx, + seen, + } => { + debug!(?info, "received result from DHT"); + InternalAddResult { + info_hash, + info, + torrent_bytes: bytes, + trackers: magnet.trackers.into_iter().unique().collect(), + peer_rx: Some(rx), + initial_peers: seen.into_iter().collect(), + } + } ReadMetainfoResult::ChannelClosed { .. } => { bail!("DHT died, no way to discover torrent metainfo") } - }; - debug!(?info, "received result from DHT"); - InternalAddResult { - info_hash, - info, - trackers: magnet.trackers.into_iter().unique().collect(), - peer_rx: Some(peer_rx), - initial_peers: initial_peers.into_iter().collect(), } } other => { - let torrent = match other { + let (torrent, bytes) = match other { AddTorrent::Url(url) if url.starts_with("http://") || url.starts_with("https://") => { @@ -979,10 +991,14 @@ impl Session { url ) } - AddTorrent::TorrentFileBytes(bytes) => { - torrent_from_bytes(&bytes).context("error decoding torrent")? + AddTorrent::TorrentFileBytes(bytes) => ( + torrent_from_bytes(&bytes).context("error decoding torrent")?, + ByteBufOwned::from(bytes.into_owned()), + ), + AddTorrent::TorrentInfo(t) => { + // TODO: this is lossy, as we don't store the bytes. + (*t, ByteBufOwned(Vec::new().into_boxed_slice())) } - AddTorrent::TorrentInfo(t) => *t, }; let trackers = torrent @@ -1015,6 +1031,7 @@ impl Session { InternalAddResult { info_hash: torrent.info_hash, info: torrent.info, + torrent_bytes: bytes, trackers, peer_rx, initial_peers: opts @@ -1070,6 +1087,7 @@ impl Session { trackers, peer_rx, initial_peers, + torrent_bytes, } = add_res; debug!("Torrent info: {:#?}", &info); @@ -1106,6 +1124,7 @@ impl Session { only_files, output_folder, seen_peers: initial_peers, + torrent_bytes, })); } From fe7a1e09ba867f5eacf44caeb62532ae87684cd7 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 12 Aug 2024 23:43:23 +0100 Subject: [PATCH 03/11] Store torrent bytes in memory --- crates/librqbit/src/session.rs | 42 ++++++++++++++++++++++-- crates/librqbit/src/torrent_state/mod.rs | 5 +++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index e2016826a..2ae66f51e 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -119,6 +119,7 @@ impl SessionDatabase { .map(|u| u.to_string()) .collect(), info_hash: torrent.info_hash().as_string(), + torrent_bytes: torrent.info.torrent_bytes.clone(), info: torrent.info().info.clone(), only_files: torrent.only_files().clone(), is_paused: torrent @@ -140,6 +141,12 @@ struct SerializedTorrent { deserialize_with = "deserialize_torrent" )] info: TorrentMetaV1Info, + #[serde( + serialize_with = "serialize_torrent_bytes", + deserialize_with = "deserialize_torrent_bytes", + default = "empty_bytes" + )] + torrent_bytes: ByteBufOwned, trackers: HashSet, output_folder: PathBuf, only_files: Option>, @@ -175,6 +182,32 @@ where .map_err(D::Error::custom) } +fn serialize_torrent_bytes(t: &ByteBufOwned, serializer: S) -> Result +where + S: Serializer, +{ + use base64::{engine::general_purpose, Engine as _}; + let s = general_purpose::STANDARD_NO_PAD.encode(&t.0); + s.serialize(serializer) +} + +fn deserialize_torrent_bytes<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + use base64::{engine::general_purpose, Engine as _}; + use serde::de::Error; + let s = String::deserialize(deserializer)?; + let b = general_purpose::STANDARD_NO_PAD + .decode(s) + .map_err(D::Error::custom)?; + Ok(b.into()) +} + +fn empty_bytes() -> ByteBufOwned { + ByteBufOwned(Vec::new().into_boxed_slice()) +} + #[derive(Serialize, Deserialize)] struct SerializedSessionDatabase { torrents: HashMap, @@ -1128,8 +1161,13 @@ impl Session { })); } - let mut builder = - ManagedTorrentBuilder::new(info, info_hash, output_folder, storage_factory); + let mut builder = ManagedTorrentBuilder::new( + info, + info_hash, + torrent_bytes, + output_folder, + storage_factory, + ); builder .allow_overwrite(opts.overwrite) .spawner(self.spawner) diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 6e544da1d..f65331696 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -99,6 +99,7 @@ pub(crate) struct ManagedTorrentOptions { pub struct ManagedTorrentInfo { pub info: TorrentMetaV1Info, + pub torrent_bytes: ByteBufOwned, pub info_hash: Id20, pub(crate) spawner: BlockingSpawner, pub trackers: HashSet, @@ -501,6 +502,7 @@ pub(crate) struct ManagedTorrentBuilder { info: TorrentMetaV1Info, output_folder: PathBuf, info_hash: Id20, + torrent_bytes: ByteBufOwned, force_tracker_interval: Option, peer_connect_timeout: Option, peer_read_write_timeout: Option, @@ -518,12 +520,14 @@ impl ManagedTorrentBuilder { pub fn new( info: TorrentMetaV1Info, info_hash: Id20, + torrent_bytes: ByteBufOwned, output_folder: PathBuf, storage_factory: BoxStorageFactory, ) -> Self { Self { info, info_hash, + torrent_bytes, spawner: None, force_tracker_interval: None, peer_connect_timeout: None, @@ -608,6 +612,7 @@ impl ManagedTorrentBuilder { span, file_infos, info: self.info, + torrent_bytes: self.torrent_bytes, info_hash: self.info_hash, trackers: self.trackers.into_iter().collect(), spawner: self.spawner.unwrap_or_default(), From 5b5352710eaca48fc904bd0d0e6fafb351ecd743 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 12 Aug 2024 23:50:55 +0100 Subject: [PATCH 04/11] Add an HTTP endpoint to read torrent bytes --- crates/librqbit/src/http_api.rs | 34 ++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 46b505c5a..360388acc 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -26,7 +26,7 @@ use crate::torrent_state::peer::stats::snapshot::PeerStatsFilter; type ApiState = Api; use crate::api::Result; -use crate::{ApiError, ManagedTorrent}; +use crate::{ApiError, ListOnlyResponse, ManagedTorrent}; /// An HTTP server for the API. pub struct HttpApi { @@ -188,6 +188,37 @@ impl HttpApi { ) } + async fn resolve_magnet( + State(state): State, + url: String, + ) -> Result { + let added = state + .session() + .add_torrent( + AddTorrent::from_url(&url), + Some(AddTorrentOptions { + list_only: true, + ..Default::default() + }), + ) + .await?; + let content = match added { + crate::AddTorrentResponse::AlreadyManaged(_, handle) => { + handle.info().torrent_bytes.clone().0 + } + crate::AddTorrentResponse::ListOnly(ListOnlyResponse { torrent_bytes, .. }) => { + torrent_bytes.0 + } + crate::AddTorrentResponse::Added(_, _) => { + return Err(ApiError::new_from_text( + StatusCode::INTERNAL_SERVER_ERROR, + "bug: torrent was added to session, but shouldn't have been", + )) + } + }; + Ok(content) + } + async fn torrent_playlist( State(state): State, headers: HeaderMap, @@ -388,6 +419,7 @@ impl HttpApi { .route("/torrents/:id/stream/:file_id", get(torrent_stream_file)) .route("/torrents/:id/playlist", get(torrent_playlist)) .route("/torrents/playlist", get(global_playlist)) + .route("/torrents/resolve_magnet", post(resolve_magnet)) .route( "/torrents/:id/stream/:file_id/*filename", get(torrent_stream_file), From 5193153e09a993148b475547473f954c278779ad Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 12 Aug 2024 23:59:23 +0100 Subject: [PATCH 05/11] Add headers --- crates/librqbit/src/http_api.rs | 34 +++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 360388acc..3392bbf99 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -202,13 +202,16 @@ impl HttpApi { }), ) .await?; - let content = match added { - crate::AddTorrentResponse::AlreadyManaged(_, handle) => { - handle.info().torrent_bytes.clone().0 - } - crate::AddTorrentResponse::ListOnly(ListOnlyResponse { torrent_bytes, .. }) => { - torrent_bytes.0 - } + let (info, content) = match added { + crate::AddTorrentResponse::AlreadyManaged(_, handle) => ( + handle.info().info.clone(), + handle.info().torrent_bytes.clone().0, + ), + crate::AddTorrentResponse::ListOnly(ListOnlyResponse { + info, + torrent_bytes, + .. + }) => (info, torrent_bytes.0), crate::AddTorrentResponse::Added(_, _) => { return Err(ApiError::new_from_text( StatusCode::INTERNAL_SERVER_ERROR, @@ -216,7 +219,22 @@ impl HttpApi { )) } }; - Ok(content) + let mut headers = HeaderMap::new(); + headers.insert( + "Content-Type", + HeaderValue::from_static("application/x-bittorrent"), + ); + + if let Some(name) = info.name.as_ref() { + if let Ok(name) = std::str::from_utf8(&name) { + if let Ok(h) = + HeaderValue::from_str(&format!("attachment; filename=\"{}.torrent\"", name)) + { + headers.insert("Content-Disposition", h); + } + } + } + Ok((headers, content)) } async fn torrent_playlist( From 55aeb079946d1887c17e0d8669a6be7889d02282 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 13 Aug 2024 06:43:52 +0100 Subject: [PATCH 06/11] use byte::Bytes instead of ByteBufOwned to store torrent bytes --- crates/librqbit/src/http_api.rs | 6 +++--- crates/librqbit/src/session.rs | 23 ++++++++++------------- crates/librqbit/src/torrent_state/mod.rs | 7 ++++--- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 3392bbf99..bb696353e 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -205,13 +205,13 @@ impl HttpApi { let (info, content) = match added { crate::AddTorrentResponse::AlreadyManaged(_, handle) => ( handle.info().info.clone(), - handle.info().torrent_bytes.clone().0, + handle.info().torrent_bytes.clone(), ), crate::AddTorrentResponse::ListOnly(ListOnlyResponse { info, torrent_bytes, .. - }) => (info, torrent_bytes.0), + }) => (info, torrent_bytes), crate::AddTorrentResponse::Added(_, _) => { return Err(ApiError::new_from_text( StatusCode::INTERNAL_SERVER_ERROR, @@ -226,7 +226,7 @@ impl HttpApi { ); if let Some(name) = info.name.as_ref() { - if let Ok(name) = std::str::from_utf8(&name) { + if let Ok(name) = std::str::from_utf8(name) { if let Ok(h) = HeaderValue::from_str(&format!("attachment; filename=\"{}.torrent\"", name)) { diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 2ae66f51e..bc9d9400e 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -29,6 +29,7 @@ use crate::{ use anyhow::{bail, Context}; use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use buffers::{ByteBuf, ByteBufOwned, ByteBufT}; +use bytes::Bytes; use clone_to_owned::CloneToOwned; use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig}; use futures::{ @@ -144,9 +145,9 @@ struct SerializedTorrent { #[serde( serialize_with = "serialize_torrent_bytes", deserialize_with = "deserialize_torrent_bytes", - default = "empty_bytes" + default )] - torrent_bytes: ByteBufOwned, + torrent_bytes: Bytes, trackers: HashSet, output_folder: PathBuf, only_files: Option>, @@ -182,16 +183,16 @@ where .map_err(D::Error::custom) } -fn serialize_torrent_bytes(t: &ByteBufOwned, serializer: S) -> Result +fn serialize_torrent_bytes(t: &Bytes, serializer: S) -> Result where S: Serializer, { use base64::{engine::general_purpose, Engine as _}; - let s = general_purpose::STANDARD_NO_PAD.encode(&t.0); + let s = general_purpose::STANDARD_NO_PAD.encode(t); s.serialize(serializer) } -fn deserialize_torrent_bytes<'de, D>(deserializer: D) -> Result +fn deserialize_torrent_bytes<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { @@ -204,10 +205,6 @@ where Ok(b.into()) } -fn empty_bytes() -> ByteBufOwned { - ByteBufOwned(Vec::new().into_boxed_slice()) -} - #[derive(Serialize, Deserialize)] struct SerializedSessionDatabase { torrents: HashMap, @@ -380,7 +377,7 @@ pub struct ListOnlyResponse { pub only_files: Option>, pub output_folder: PathBuf, pub seen_peers: Vec, - pub torrent_bytes: ByteBufOwned, + pub torrent_bytes: Bytes, } #[allow(clippy::large_enum_variant)] @@ -515,7 +512,7 @@ pub(crate) struct CheckedIncomingConnection { struct InternalAddResult { info_hash: Id20, info: TorrentMetaV1Info, - torrent_bytes: ByteBufOwned, + torrent_bytes: Bytes, trackers: Vec, peer_rx: Option, initial_peers: Vec, @@ -1000,7 +997,7 @@ impl Session { InternalAddResult { info_hash, info, - torrent_bytes: bytes, + torrent_bytes: Bytes::from(bytes.0), trackers: magnet.trackers.into_iter().unique().collect(), peer_rx: Some(rx), initial_peers: seen.into_iter().collect(), @@ -1064,7 +1061,7 @@ impl Session { InternalAddResult { info_hash: torrent.info_hash, info: torrent.info, - torrent_bytes: bytes, + torrent_bytes: Bytes::from(bytes.0), trackers, peer_rx, initial_peers: opts diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index f65331696..d4d935b66 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -14,6 +14,7 @@ use std::time::Duration; use anyhow::bail; use anyhow::Context; use buffers::ByteBufOwned; +use bytes::Bytes; use futures::future::BoxFuture; use futures::FutureExt; use librqbit_core::hash_id::Id20; @@ -99,7 +100,7 @@ pub(crate) struct ManagedTorrentOptions { pub struct ManagedTorrentInfo { pub info: TorrentMetaV1Info, - pub torrent_bytes: ByteBufOwned, + pub torrent_bytes: Bytes, pub info_hash: Id20, pub(crate) spawner: BlockingSpawner, pub trackers: HashSet, @@ -502,7 +503,7 @@ pub(crate) struct ManagedTorrentBuilder { info: TorrentMetaV1Info, output_folder: PathBuf, info_hash: Id20, - torrent_bytes: ByteBufOwned, + torrent_bytes: Bytes, force_tracker_interval: Option, peer_connect_timeout: Option, peer_read_write_timeout: Option, @@ -520,7 +521,7 @@ impl ManagedTorrentBuilder { pub fn new( info: TorrentMetaV1Info, info_hash: Id20, - torrent_bytes: ByteBufOwned, + torrent_bytes: Bytes, output_folder: PathBuf, storage_factory: BoxStorageFactory, ) -> Self { From e7c7543228d5eeb178bec16b197624ebb734ef19 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 13 Aug 2024 06:52:43 +0100 Subject: [PATCH 07/11] a tiny refactor --- crates/librqbit/src/session.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index bc9d9400e..253ef0255 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -940,9 +940,6 @@ impl Session { ) -> BoxFuture<'a, anyhow::Result> { async move { // Magnet links are different in that we first need to discover the metadata. - let span = error_span!("add_torrent"); - let _ = span.enter(); - let opts = opts.unwrap_or_default(); let paused = opts.list_only || opts.paused; @@ -1076,6 +1073,7 @@ impl Session { self.main_torrent_info(add_res, opts).await } + .instrument(error_span!("add_torrent")) .boxed() } From cd0f38f0fb10ae8ba38f54574e8e5cd828a57432 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 13 Aug 2024 15:51:04 +0100 Subject: [PATCH 08/11] Stub for torrent_file_from_info_and_bytes --- crates/librqbit/src/dht_utils.rs | 4 ++-- crates/librqbit/src/peer_info_reader/mod.rs | 6 +++--- crates/librqbit/src/session.rs | 21 ++++++++++++++++++--- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index c01b5526c..d97a98d0d 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -16,7 +16,7 @@ use librqbit_core::hash_id::Id20; pub enum ReadMetainfoResult { Found { info: TorrentMetaV1Info, - bytes: ByteBufOwned, + info_bytes: ByteBufOwned, rx: Rx, seen: HashSet, }, @@ -81,7 +81,7 @@ pub async fn read_metainfo_from_peer_receiver + Unp }, done = unordered.next(), if !unordered.is_empty() => { match done { - Some(Ok(info)) => return ReadMetainfoResult::Found { info: info.0, bytes: info.1, seen, rx: addrs }, + Some(Ok((info, info_bytes))) => return ReadMetainfoResult::Found { info, info_bytes, seen, rx: addrs }, Some(Err(e)) => { debug!("{:#}", e); }, diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index a2b170d35..aff3e4419 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -32,7 +32,7 @@ pub(crate) async fn read_metainfo_from_peer( peer_connection_options: Option, spawner: BlockingSpawner, connector: Arc, -) -> anyhow::Result { +) -> anyhow::Result { let (result_tx, result_rx) = tokio::sync::oneshot::channel::< anyhow::Result<(TorrentMetaV1Info, ByteBufOwned)>, >(); @@ -136,13 +136,13 @@ impl HandlerLocked { } } -pub type TorrentAndBytes = (TorrentMetaV1Info, ByteBufOwned); +pub type TorrentAndInfoBytes = (TorrentMetaV1Info, ByteBufOwned); struct Handler { addr: SocketAddr, info_hash: Id20, writer_tx: UnboundedSender, - result_tx: Mutex>>>, + result_tx: Mutex>>>, locked: RwLock>, } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 253ef0255..4aa1b1a06 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -502,6 +502,15 @@ async fn create_tcp_listener( bail!("no free TCP ports in range {port_range:?}"); } +fn torrent_file_from_info_and_bytes( + info: &TorrentMetaV1Info, + info_hash: &Id20, + info_bytes: &[u8], + trackers: &[String], +) -> Bytes { + todo!() +} + pub(crate) struct CheckedIncomingConnection { pub addr: SocketAddr, pub stream: tokio::net::TcpStream, @@ -986,16 +995,22 @@ impl Session { { ReadMetainfoResult::Found { info, - bytes, + info_bytes, rx, seen, } => { debug!(?info, "received result from DHT"); + let trackers = magnet.trackers.into_iter().unique().collect_vec(); InternalAddResult { info_hash, + torrent_bytes: torrent_file_from_info_and_bytes( + &info, + &info_hash, + &info_bytes, + &trackers, + ), info, - torrent_bytes: Bytes::from(bytes.0), - trackers: magnet.trackers.into_iter().unique().collect(), + trackers, peer_rx: Some(rx), initial_peers: seen.into_iter().collect(), } From d54b67d2dc7e34298c8394961ad5c95697125241 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 13 Aug 2024 19:06:17 +0100 Subject: [PATCH 09/11] Make the /resolve_magnet HTTP endpoint return an actual torrent file, not info --- crates/bencode/src/lib.rs | 1 + crates/bencode/src/raw_value.rs | 28 ++++++++ crates/bencode/src/serde_bencode_de.rs | 8 ++- crates/bencode/src/serde_bencode_ser.rs | 10 ++- crates/librqbit/src/session.rs | 73 +++++++++++++++++--- crates/librqbit_core/src/torrent_metainfo.rs | 35 +++++++--- 6 files changed, 132 insertions(+), 23 deletions(-) create mode 100644 crates/bencode/src/raw_value.rs diff --git a/crates/bencode/src/lib.rs b/crates/bencode/src/lib.rs index 2753a6a1b..1389395aa 100644 --- a/crates/bencode/src/lib.rs +++ b/crates/bencode/src/lib.rs @@ -1,4 +1,5 @@ mod bencode_value; +pub mod raw_value; mod serde_bencode_de; mod serde_bencode_ser; diff --git a/crates/bencode/src/raw_value.rs b/crates/bencode/src/raw_value.rs new file mode 100644 index 000000000..fbeceb8ea --- /dev/null +++ b/crates/bencode/src/raw_value.rs @@ -0,0 +1,28 @@ +use serde::Serialize; + +pub struct RawValue(pub T); + +pub(crate) const TAG: &str = "::librqbit_bencode::RawValue"; + +impl Serialize for RawValue +where + T: AsRef<[u8]>, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + struct Wrapper<'a>(&'a [u8]); + + impl<'a> Serialize for Wrapper<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_bytes(self.0) + } + } + + serializer.serialize_newtype_struct(TAG, &Wrapper(self.0.as_ref())) + } +} diff --git a/crates/bencode/src/serde_bencode_de.rs b/crates/bencode/src/serde_bencode_de.rs index 42993672b..b6f3a47e3 100644 --- a/crates/bencode/src/serde_bencode_de.rs +++ b/crates/bencode/src/serde_bencode_de.rs @@ -10,6 +10,7 @@ pub struct BencodeDeserializer<'de> { // This is a f**ing hack pub is_torrent_info: bool, pub torrent_info_digest: Option<[u8; 20]>, + pub torrent_info_bytes: Option<&'de [u8]>, } impl<'de> BencodeDeserializer<'de> { @@ -20,6 +21,7 @@ impl<'de> BencodeDeserializer<'de> { parsing_key: false, is_torrent_info: false, torrent_info_digest: None, + torrent_info_bytes: None, } } pub fn into_remaining(self) -> &'de [u8] { @@ -542,9 +544,11 @@ impl<'a, 'de> serde::de::MapAccess<'de> for MapAccess<'a, 'de> { if self.de.is_torrent_info && self.de.field_context.as_slice() == [ByteBuf(b"info")] { let len = self.de.buf.as_ptr() as usize - buf_before.as_ptr() as usize; let mut hash = Sha1::new(); - hash.update(&buf_before[..len]); + let torrent_info_bytes = &buf_before[..len]; + hash.update(torrent_info_bytes); let digest = hash.finish(); - self.de.torrent_info_digest = Some(digest) + self.de.torrent_info_digest = Some(digest); + self.de.torrent_info_bytes = Some(torrent_info_bytes); } self.de.field_context.pop(); Ok(value) diff --git a/crates/bencode/src/serde_bencode_ser.rs b/crates/bencode/src/serde_bencode_ser.rs index 78202ea31..0618b996e 100644 --- a/crates/bencode/src/serde_bencode_ser.rs +++ b/crates/bencode/src/serde_bencode_ser.rs @@ -328,12 +328,18 @@ impl<'ser, W: std::io::Write> Serializer for &'ser mut BencodeSerializer { fn serialize_newtype_struct( self, - _name: &'static str, - _value: &T, + name: &'static str, + value: &T, ) -> Result where T: ?Sized + serde::Serialize, { + if name == crate::raw_value::TAG { + self.hack_no_bytestring_prefix = true; + value.serialize(&mut *self)?; + self.hack_no_bytestring_prefix = false; + return Ok(()); + } Err(SerError::custom_with_ser( "bencode doesn't support newtype structs", self, diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 4aa1b1a06..cef29a2b2 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -502,13 +502,23 @@ async fn create_tcp_listener( bail!("no free TCP ports in range {port_range:?}"); } -fn torrent_file_from_info_and_bytes( - info: &TorrentMetaV1Info, - info_hash: &Id20, - info_bytes: &[u8], - trackers: &[String], -) -> Bytes { - todo!() +fn torrent_file_from_info_bytes(info_bytes: &[u8], trackers: &[String]) -> anyhow::Result { + #[derive(Serialize)] + struct Tmp<'a> { + announce: &'a str, + #[serde(rename = "announce-list")] + announce_list: &'a [&'a [String]], + info: bencode::raw_value::RawValue<&'a [u8]>, + } + + let mut w = Vec::new(); + let v = Tmp { + info: bencode::raw_value::RawValue(info_bytes), + announce: trackers.first().map(|s| s.as_str()).unwrap_or(""), + announce_list: &[trackers], + }; + bencode_serialize_to_writer(&v, &mut w)?; + Ok(w.into()) } pub(crate) struct CheckedIncomingConnection { @@ -1003,12 +1013,10 @@ impl Session { let trackers = magnet.trackers.into_iter().unique().collect_vec(); InternalAddResult { info_hash, - torrent_bytes: torrent_file_from_info_and_bytes( - &info, - &info_hash, + torrent_bytes: torrent_file_from_info_bytes( &info_bytes, &trackers, - ), + )?, info, trackers, peer_rx: Some(rx), @@ -1427,3 +1435,46 @@ impl tracker_comms::TorrentStatsProvider for PeerRxTorrentInfo { } } } + +#[cfg(test)] +mod tests { + use std::io::Write; + + use buffers::ByteBuf; + use itertools::Itertools; + use librqbit_core::torrent_metainfo::{torrent_from_bytes_ext, TorrentMetaV1}; + + use super::torrent_file_from_info_bytes; + + #[test] + fn test_torrent_file_from_info_and_bytes() { + fn get_trackers(info: &TorrentMetaV1) -> Vec { + info.iter_announce() + .filter_map(|t| std::str::from_utf8(t.as_ref()).ok().map(|t| t.to_owned())) + .collect_vec() + } + + let orig_full_torrent = + include_bytes!("../resources/ubuntu-21.04-desktop-amd64.iso.torrent"); + let parsed = torrent_from_bytes_ext::(&orig_full_torrent[..]).unwrap(); + let parsed_trackers = get_trackers(&parsed.meta); + + let generated_torrent = + torrent_file_from_info_bytes(parsed.info_bytes.as_ref(), &parsed_trackers).unwrap(); + { + let mut f = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open("/tmp/generated") + .unwrap(); + f.write_all(&generated_torrent).unwrap(); + } + let generated_parsed = + torrent_from_bytes_ext::(generated_torrent.as_ref()).unwrap(); + assert_eq!(parsed.meta.info_hash, generated_parsed.meta.info_hash); + assert_eq!(parsed.meta.info, generated_parsed.meta.info); + assert_eq!(parsed.info_bytes, generated_parsed.info_bytes); + assert_eq!(parsed_trackers, get_trackers(&generated_parsed.meta)); + } +} diff --git a/crates/librqbit_core/src/torrent_metainfo.rs b/crates/librqbit_core/src/torrent_metainfo.rs index 9933f67dd..0ca37f678 100644 --- a/crates/librqbit_core/src/torrent_metainfo.rs +++ b/crates/librqbit_core/src/torrent_metainfo.rs @@ -12,18 +12,37 @@ use crate::{hash_id::Id20, lengths::Lengths}; pub type TorrentMetaV1Borrowed<'a> = TorrentMetaV1>; pub type TorrentMetaV1Owned = TorrentMetaV1; -/// Parse torrent metainfo from bytes. -pub fn torrent_from_bytes<'de, BufType: Deserialize<'de>>( +pub struct ParsedTorrent { + /// The parsed torrent. + pub meta: TorrentMetaV1, + + /// The raw bytes of the torrent's "info" dict. + pub info_bytes: BufType, +} + +/// Parse torrent metainfo from bytes (includes additional fields). +pub fn torrent_from_bytes_ext<'de, BufType: Deserialize<'de> + From<&'de [u8]>>( buf: &'de [u8], -) -> anyhow::Result> { +) -> anyhow::Result> { let mut de = BencodeDeserializer::new_from_buf(buf); de.is_torrent_info = true; let mut t = TorrentMetaV1::deserialize(&mut de)?; - t.info_hash = Id20::new( - de.torrent_info_digest - .ok_or_else(|| anyhow::anyhow!("programming error"))?, - ); - Ok(t) + let (digest, info_bytes) = match (de.torrent_info_digest, de.torrent_info_bytes) { + (Some(digest), Some(info_bytes)) => (digest, info_bytes), + _ => anyhow::bail!("programming error"), + }; + t.info_hash = Id20::new(digest); + Ok(ParsedTorrent { + meta: t, + info_bytes: BufType::from(info_bytes), + }) +} + +/// Parse torrent metainfo from bytes. +pub fn torrent_from_bytes<'de, BufType: Deserialize<'de> + From<&'de [u8]>>( + buf: &'de [u8], +) -> anyhow::Result> { + torrent_from_bytes_ext(buf).map(|r| r.meta) } /// A parsed .torrent file. From a0483e67f81a31b95879b632248c565d58697412 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 13 Aug 2024 19:13:20 +0100 Subject: [PATCH 10/11] Remove debugging test that was failing on windows --- crates/librqbit/src/session.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index cef29a2b2..f111bde3b 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1461,15 +1461,6 @@ mod tests { let generated_torrent = torrent_file_from_info_bytes(parsed.info_bytes.as_ref(), &parsed_trackers).unwrap(); - { - let mut f = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open("/tmp/generated") - .unwrap(); - f.write_all(&generated_torrent).unwrap(); - } let generated_parsed = torrent_from_bytes_ext::(generated_torrent.as_ref()).unwrap(); assert_eq!(parsed.meta.info_hash, generated_parsed.meta.info_hash); From 1c1200cc2ae6f46a09b4eb3569ff47454b8cb2e0 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 13 Aug 2024 19:14:41 +0100 Subject: [PATCH 11/11] clippy --- crates/librqbit/src/session.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index f111bde3b..f6e27d07f 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1438,8 +1438,6 @@ impl tracker_comms::TorrentStatsProvider for PeerRxTorrentInfo { #[cfg(test)] mod tests { - use std::io::Write; - use buffers::ByteBuf; use itertools::Itertools; use librqbit_core::torrent_metainfo::{torrent_from_bytes_ext, TorrentMetaV1};