From ace4bed0c664e8827cd7fd28d314ce4e6c070dab Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 21 Jun 2024 13:18:30 +0100 Subject: [PATCH 1/3] Folders and files are now deleted more comprehensively --- Makefile | 2 +- crates/librqbit/examples/custom_storage.rs | 10 +- crates/librqbit/src/session.rs | 87 +++++++++++++---- .../librqbit/src/storage/examples/inmemory.rs | 10 +- crates/librqbit/src/storage/examples/mmap.rs | 10 +- crates/librqbit/src/storage/filesystem/fs.rs | 95 +++++++++++-------- .../librqbit/src/storage/filesystem/mmap.rs | 33 ++++--- .../librqbit/src/storage/middleware/slow.rs | 17 +++- .../librqbit/src/storage/middleware/timing.rs | 17 +++- .../storage/middleware/write_through_cache.rs | 14 ++- crates/librqbit/src/storage/mod.rs | 29 +++++- .../src/torrent_state/initializing.rs | 26 +++-- crates/librqbit/src/torrent_state/mod.rs | 4 +- 13 files changed, 252 insertions(+), 102 deletions(-) diff --git a/Makefile b/Makefile index c22b97e5b..373536f52 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ webui-build: webui-deps @PHONY: devserver devserver: - echo -n '' > /tmp/rqbit-log && cargo run --release -- \ + echo -n '' > /tmp/rqbit-log && cargo run -- \ --log-file /tmp/rqbit-log \ --log-file-rust-log=debug,librqbit=trace \ server start /tmp/scratch/ diff --git a/crates/librqbit/examples/custom_storage.rs b/crates/librqbit/examples/custom_storage.rs index 3463e47b7..74e394986 100644 --- a/crates/librqbit/examples/custom_storage.rs +++ b/crates/librqbit/examples/custom_storage.rs @@ -19,7 +19,7 @@ struct CustomStorage { impl StorageFactory for CustomStorageFactory { type Storage = CustomStorage; - fn init_storage(&self, _info: &librqbit::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, _info: &librqbit::ManagedTorrentInfo) -> anyhow::Result { Ok(CustomStorage::default()) } @@ -48,6 +48,14 @@ impl TorrentStorage for CustomStorage { fn take(&self) -> anyhow::Result> { anyhow::bail!("not implemented") } + + fn remove_directory_if_empty(&self, _path: &std::path::Path) -> anyhow::Result<()> { + anyhow::bail!("not implemented") + } + + fn init(&mut self, _meta: &librqbit::ManagedTorrentInfo) -> anyhow::Result<()> { + anyhow::bail!("not implemented") + } } #[tokio::main] diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 242c51390..0d9800438 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -4,7 +4,7 @@ use std::{ collections::{HashMap, HashSet}, io::{BufReader, BufWriter, Read}, net::SocketAddr, - path::PathBuf, + path::{Path, PathBuf}, str::FromStr, sync::Arc, time::Duration, @@ -16,11 +16,14 @@ use crate::{ peer_connection::PeerConnectionOptions, read_buf::ReadBuf, spawn_utils::BlockingSpawner, - storage::{filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt}, + storage::{ + filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage, + }, torrent_state::{ ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, }, type_aliases::{DiskWorkQueueSender, PeerStream}, + ManagedTorrentInfo, }; use anyhow::{bail, Context}; use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; @@ -1136,31 +1139,44 @@ impl Session { .remove(&id) .with_context(|| format!("torrent with id {} did not exist", id))?; - let paused = removed - .with_state_mut(|s| { - let paused = match s.take() { - ManagedTorrentState::Paused(p) => p, - ManagedTorrentState::Live(l) => l.pause()?, - _ => return Ok(None), - }; - Ok::<_, anyhow::Error>(Some(paused)) + if let Err(e) = removed.pause() { + debug!("error pausing torrent before deletion: {e:?}") + } + + let storage = removed + .with_state_mut(|s| match s.take() { + ManagedTorrentState::Initializing(p) => p.files.take().ok(), + ManagedTorrentState::Paused(p) => Some(p.files), + ManagedTorrentState::Live(l) => l + .pause() + .inspect_err(|e| warn!("error pausing torrent: {e:?}")) + .ok() + .map(|p| p.files), + _ => None, }) - .context("error pausing torrent"); + .map(Ok) + .unwrap_or_else(|| removed.storage_factory.create(removed.info())); - match (paused, delete_files) { + match (storage, delete_files) { (Err(e), true) => return Err(e).context("torrent deleted, but could not delete files"), - (Err(e), false) => { - warn!(error=?e, "error deleting torrent cleanly"); - } - (Ok(Some(paused)), true) => { - for (id, fi) in removed.info().file_infos.iter().enumerate() { - if let Err(e) = paused.files.remove_file(id, &fi.relative_filename) { - warn!(?fi.relative_filename, error=?e, "could not delete file"); + (Ok(storage), true) => { + debug!("will delete files"); + remove_files_and_dirs(removed.info(), &storage); + if removed.info().options.output_folder != self.output_folder { + if let Err(e) = storage.remove_directory_if_empty(Path::new("")) { + warn!( + "error removing {:?}: {e:?}", + removed.info().options.output_folder + ) } } } - _ => {} + (_, false) => { + debug!("not deleting files") + } }; + + info!(id, "deleted torrent"); Ok(()) } @@ -1220,6 +1236,37 @@ impl Session { } } +fn remove_files_and_dirs(info: &ManagedTorrentInfo, files: &dyn TorrentStorage) { + let mut all_dirs = HashSet::new(); + for (id, fi) in info.file_infos.iter().enumerate() { + let mut fname = &*fi.relative_filename; + if let Err(e) = files.remove_file(id, fname) { + warn!(?fi.relative_filename, error=?e, "could not delete file"); + } else { + debug!(?fi.relative_filename, "deleted the file") + } + while let Some(parent) = fname.parent() { + if parent != Path::new("") { + all_dirs.insert(parent); + } + fname = parent; + } + } + + let all_dirs = { + let mut v = all_dirs.into_iter().collect::>(); + v.sort_unstable_by_key(|p| std::cmp::Reverse(p.as_os_str().len())); + v + }; + for dir in all_dirs { + if let Err(e) = files.remove_directory_if_empty(dir) { + warn!("error removing {dir:?}: {e:?}"); + } else { + debug!("removed {dir:?}") + } + } +} + // Ad adapter for converting stats into the format that tracker_comms accepts. struct PeerRxTorrentInfo { info_hash: Id20, diff --git a/crates/librqbit/src/storage/examples/inmemory.rs b/crates/librqbit/src/storage/examples/inmemory.rs index d7259951b..3bd2a389c 100644 --- a/crates/librqbit/src/storage/examples/inmemory.rs +++ b/crates/librqbit/src/storage/examples/inmemory.rs @@ -25,7 +25,7 @@ pub struct InMemoryExampleStorageFactory {} impl StorageFactory for InMemoryExampleStorageFactory { type Storage = InMemoryExampleStorage; - fn init_storage( + fn create( &self, info: &crate::torrent_state::ManagedTorrentInfo, ) -> anyhow::Result { @@ -110,4 +110,12 @@ impl TorrentStorage for InMemoryExampleStorage { file_infos: self.file_infos.clone(), })) } + + fn init(&mut self, _meta: &crate::ManagedTorrentInfo) -> anyhow::Result<()> { + Ok(()) + } + + fn remove_directory_if_empty(&self, _path: &Path) -> anyhow::Result<()> { + Ok(()) + } } diff --git a/crates/librqbit/src/storage/examples/mmap.rs b/crates/librqbit/src/storage/examples/mmap.rs index 5f40a1cab..e448417ad 100644 --- a/crates/librqbit/src/storage/examples/mmap.rs +++ b/crates/librqbit/src/storage/examples/mmap.rs @@ -18,7 +18,7 @@ pub struct MmapStorage { impl StorageFactory for MmapStorageFactory { type Storage = MmapStorage; - fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result { Ok(MmapStorage { mmap: RwLock::new( MmapOptions::new() @@ -62,4 +62,12 @@ impl TorrentStorage for MmapStorage { fn take(&self) -> anyhow::Result> { anyhow::bail!("not implemented") } + + fn init(&mut self, _meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + Ok(()) + } + + fn remove_directory_if_empty(&self, _path: &std::path::Path) -> anyhow::Result<()> { + Ok(()) + } } diff --git a/crates/librqbit/src/storage/filesystem/fs.rs b/crates/librqbit/src/storage/filesystem/fs.rs index a7624ed7c..4831a7699 100644 --- a/crates/librqbit/src/storage/filesystem/fs.rs +++ b/crates/librqbit/src/storage/filesystem/fs.rs @@ -4,6 +4,7 @@ use std::{ }; use anyhow::Context; +use tracing::warn; use crate::{storage::StorageFactoryExt, torrent_state::ManagedTorrentInfo}; @@ -17,45 +18,10 @@ pub struct FilesystemStorageFactory {} impl StorageFactory for FilesystemStorageFactory { type Storage = FilesystemStorage; - fn init_storage(&self, meta: &ManagedTorrentInfo) -> anyhow::Result { - let mut files = Vec::::new(); - let output_folder = &meta.options.output_folder; - for file_details in meta.info.iter_file_details(&meta.lengths)? { - let mut full_path = output_folder.clone(); - let relative_path = file_details - .filename - .to_pathbuf() - .context("error converting file to path")?; - full_path.push(relative_path); - - std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; - let file = if meta.options.allow_overwrite { - OpenOptions::new() - .create(true) - .truncate(false) - .read(true) - .write(true) - .open(&full_path) - .with_context(|| format!("error opening {full_path:?} in read/write mode"))? - } else { - // create_new does not seem to work with read(true), so calling this twice. - OpenOptions::new() - .create_new(true) - .write(true) - .open(&full_path) - .with_context(|| { - format!( - "error creating a new file (because allow_overwrite = false) {:?}", - &full_path - ) - })?; - OpenOptions::new().read(true).write(true).open(&full_path)? - }; - files.push(OpenedFile::new(file)); - } + fn create(&self, meta: &ManagedTorrentInfo) -> anyhow::Result { Ok(FilesystemStorage { - output_folder: output_folder.clone(), - opened_files: files, + output_folder: meta.options.output_folder.clone(), + opened_files: Default::default(), }) } @@ -142,4 +108,57 @@ impl TorrentStorage for FilesystemStorage { output_folder: self.output_folder.clone(), })) } + + fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()> { + let path = self.output_folder.join(path); + if !path.is_dir() { + anyhow::bail!("cannot remove dir: {path:?} is not a directory") + } + if std::fs::read_dir(&path)?.count() == 0 { + std::fs::remove_dir(&path).with_context(|| format!("error removing {path:?}")) + } else { + warn!("did not remove {path:?} as it was not empty"); + Ok(()) + } + } + + fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + let mut files = Vec::::new(); + for file_details in meta.info.iter_file_details(&meta.lengths)? { + let mut full_path = self.output_folder.clone(); + let relative_path = file_details + .filename + .to_pathbuf() + .context("error converting file to path")?; + full_path.push(relative_path); + + std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; + let file = if meta.options.allow_overwrite { + OpenOptions::new() + .create(true) + .truncate(false) + .read(true) + .write(true) + .open(&full_path) + .with_context(|| format!("error opening {full_path:?} in read/write mode"))? + } else { + // create_new does not seem to work with read(true), so calling this twice. + OpenOptions::new() + .create_new(true) + .write(true) + .open(&full_path) + .with_context(|| { + format!( + "error creating a new file (because allow_overwrite = false) {:?}", + &full_path + ) + })?; + OpenOptions::new().read(true).write(true).open(&full_path)? + }; + files.push(OpenedFile::new(file)); + } + + self.opened_files = files; + Ok(()) + } } diff --git a/crates/librqbit/src/storage/filesystem/mmap.rs b/crates/librqbit/src/storage/filesystem/mmap.rs index 87e415776..07efe1ebf 100644 --- a/crates/librqbit/src/storage/filesystem/mmap.rs +++ b/crates/librqbit/src/storage/filesystem/mmap.rs @@ -22,19 +22,11 @@ fn dummy_mmap() -> anyhow::Result { impl StorageFactory for MmapFilesystemStorageFactory { type Storage = MmapFilesystemStorage; - fn init_storage(&self, meta: &ManagedTorrentInfo) -> anyhow::Result { - let fs_storage = FilesystemStorageFactory::default().init_storage(meta)?; - let mut mmaps = Vec::new(); - for (idx, file) in fs_storage.opened_files.iter().enumerate() { - let fg = file.file.write(); - fg.set_len(meta.file_infos[idx].len) - .context("mmap storage: error setting length")?; - let mmap = unsafe { MmapOptions::new().map_mut(&*fg) }.context("error mapping file")?; - mmaps.push(RwLock::new(mmap)); - } + fn create(&self, meta: &ManagedTorrentInfo) -> anyhow::Result { + let fs_storage = FilesystemStorageFactory::default().create(meta)?; Ok(MmapFilesystemStorage { - opened_mmaps: mmaps, + opened_mmaps: Vec::new(), fs: fs_storage, }) } @@ -82,6 +74,10 @@ impl TorrentStorage for MmapFilesystemStorage { self.fs.remove_file(file_id, filename) } + fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()> { + self.fs.remove_directory_if_empty(path) + } + fn ensure_file_length(&self, file_id: usize, len: u64) -> anyhow::Result<()> { self.fs.ensure_file_length(file_id, len) } @@ -100,4 +96,19 @@ impl TorrentStorage for MmapFilesystemStorage { fs: self.fs.take_fs()?, })) } + + fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + self.fs.init(meta)?; + let mut mmaps = Vec::new(); + for (idx, file) in self.fs.opened_files.iter().enumerate() { + let fg = file.file.write(); + fg.set_len(meta.file_infos[idx].len) + .context("mmap storage: error setting length")?; + let mmap = unsafe { MmapOptions::new().map_mut(&*fg) }.context("error mapping file")?; + mmaps.push(RwLock::new(mmap)); + } + + self.opened_mmaps = mmaps; + Ok(()) + } } diff --git a/crates/librqbit/src/storage/middleware/slow.rs b/crates/librqbit/src/storage/middleware/slow.rs index ad01e6922..fba516045 100644 --- a/crates/librqbit/src/storage/middleware/slow.rs +++ b/crates/librqbit/src/storage/middleware/slow.rs @@ -14,7 +14,10 @@ use std::{ use parking_lot::Mutex; -use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; +use crate::{ + storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, + ManagedTorrentInfo, +}; #[derive(Clone)] pub struct SlowStorageFactory { @@ -32,9 +35,9 @@ impl SlowStorageFactory { impl StorageFactory for SlowStorageFactory { type Storage = SlowStorage; - fn init_storage(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { Ok(SlowStorage { - underlying: self.underlying_factory.init_storage(info)?, + underlying: self.underlying_factory.create(info)?, pwrite_all_bufread: Mutex::new(Box::new( BufReader::new( File::open( @@ -108,4 +111,12 @@ impl TorrentStorage for SlowStorage { fn take(&self) -> anyhow::Result> { anyhow::bail!("not implemented") } + + fn remove_directory_if_empty(&self, path: &std::path::Path) -> anyhow::Result<()> { + self.underlying.remove_directory_if_empty(path) + } + + fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + self.underlying.init(meta) + } } diff --git a/crates/librqbit/src/storage/middleware/timing.rs b/crates/librqbit/src/storage/middleware/timing.rs index 8419694b7..7079db2d5 100644 --- a/crates/librqbit/src/storage/middleware/timing.rs +++ b/crates/librqbit/src/storage/middleware/timing.rs @@ -2,7 +2,10 @@ A storage middleware that logs the time underlying storage operations took. */ -use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; +use crate::{ + storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, + ManagedTorrentInfo, +}; #[derive(Clone)] pub struct TimingStorageFactory { @@ -22,10 +25,10 @@ impl TimingStorageFactory { impl StorageFactory for TimingStorageFactory { type Storage = TimingStorage; - fn init_storage(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { Ok(TimingStorage { name: self.name.clone(), - underlying: self.underlying_factory.init_storage(info)?, + underlying: self.underlying_factory.create(info)?, }) } @@ -96,4 +99,12 @@ impl TorrentStorage for TimingStorage { name: self.name.clone(), })) } + + fn remove_directory_if_empty(&self, path: &std::path::Path) -> anyhow::Result<()> { + self.underlying.remove_directory_if_empty(path) + } + + fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + self.underlying.init(meta) + } } diff --git a/crates/librqbit/src/storage/middleware/write_through_cache.rs b/crates/librqbit/src/storage/middleware/write_through_cache.rs index 0fa3a3791..7b979a6b0 100644 --- a/crates/librqbit/src/storage/middleware/write_through_cache.rs +++ b/crates/librqbit/src/storage/middleware/write_through_cache.rs @@ -14,7 +14,7 @@ use parking_lot::RwLock; use crate::{ storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, - FileInfos, + FileInfos, ManagedTorrentInfo, }; #[derive(Clone, Copy)] @@ -35,7 +35,7 @@ impl WriteThroughCacheStorageFactory { impl StorageFactory for WriteThroughCacheStorageFactory { type Storage = WriteThroughCacheStorage; - fn init_storage(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { + fn create(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result { let pieces = self .max_cache_bytes .div_ceil(info.lengths.default_piece_length() as u64) @@ -44,7 +44,7 @@ impl StorageFactory for WriteThroughCacheStorageFacto let lru = RwLock::new(LruCache::new(pieces)); Ok(WriteThroughCacheStorage { lru, - underlying: self.underlying.init_storage(info)?, + underlying: self.underlying.create(info)?, lengths: info.lengths, file_infos: info.file_infos.clone(), }) @@ -116,4 +116,12 @@ impl TorrentStorage for WriteThroughCacheStorage { file_infos: self.file_infos.clone(), })) } + + fn remove_directory_if_empty(&self, path: &std::path::Path) -> anyhow::Result<()> { + self.underlying.remove_directory_if_empty(path) + } + + fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + self.underlying.init(meta) + } } diff --git a/crates/librqbit/src/storage/mod.rs b/crates/librqbit/src/storage/mod.rs index d84b500bb..14632cd31 100644 --- a/crates/librqbit/src/storage/mod.rs +++ b/crates/librqbit/src/storage/mod.rs @@ -16,7 +16,13 @@ use crate::torrent_state::ManagedTorrentInfo; pub trait StorageFactory: Send + Sync + Any { type Storage: TorrentStorage; - fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result; + fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result; + fn create_and_init(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + let mut storage = self.create(info)?; + storage.init(info)?; + Ok(storage) + } + fn is_type_id(&self, type_id: TypeId) -> bool { Self::type_id(self) == type_id } @@ -38,8 +44,8 @@ impl StorageFactoryExt for SF { impl StorageFactory for Wrapper { type Storage = Box; - fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result { - let s = self.sf.init_storage(info)?; + fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + let s = self.sf.create(info)?; Ok(Box::new(s)) } @@ -59,8 +65,8 @@ impl StorageFactoryExt for SF { impl StorageFactory for Box { type Storage = U::Storage; - fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result { - (**self).init_storage(info) + fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result { + (**self).create(info) } fn clone_box(&self) -> BoxStorageFactory { @@ -69,6 +75,9 @@ impl StorageFactory for Box { } pub trait TorrentStorage: Send + Sync { + // Create/open files etc. + fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()>; + /// Given a file_id (which you can get more info from in init_storage() through torrent info) /// read buf.len() bytes into buf at offset. fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()>; @@ -80,6 +89,8 @@ pub trait TorrentStorage: Send + Sync { /// Remove a file from the storage. If not supported, or it doesn't matter, just return Ok(()) fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()>; + fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()>; + /// E.g. for filesystem backend ensure that the file has a certain length, and grow/shrink as needed. fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()>; @@ -108,4 +119,12 @@ impl TorrentStorage for Box { fn take(&self) -> anyhow::Result> { (**self).take() } + + fn remove_directory_if_empty(&self, path: &Path) -> anyhow::Result<()> { + (**self).remove_directory_if_empty(path) + } + + fn init(&mut self, meta: &ManagedTorrentInfo) -> anyhow::Result<()> { + (**self).init(meta) + } } diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 684d1305d..953bfd8ac 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -8,25 +8,27 @@ use anyhow::Context; use size_format::SizeFormatterBinary as SF; use tracing::{debug, info, warn}; -use crate::{ - chunk_tracker::ChunkTracker, - file_ops::FileOps, - storage::{BoxStorageFactory, StorageFactory}, -}; +use crate::{chunk_tracker::ChunkTracker, file_ops::FileOps, type_aliases::FileStorage}; use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; pub struct TorrentStateInitializing { + pub(crate) files: FileStorage, pub(crate) meta: Arc, pub(crate) only_files: Option>, pub(crate) checked_bytes: AtomicU64, } impl TorrentStateInitializing { - pub fn new(meta: Arc, only_files: Option>) -> Self { + pub fn new( + meta: Arc, + only_files: Option>, + files: FileStorage, + ) -> Self { Self { meta, only_files, + files, checked_bytes: AtomicU64::new(0), } } @@ -36,16 +38,12 @@ impl TorrentStateInitializing { .load(std::sync::atomic::Ordering::Relaxed) } - pub async fn check( - &self, - storage_factory: &BoxStorageFactory, - ) -> anyhow::Result { - let files = storage_factory.init_storage(&self.meta)?; + pub async fn check(&self) -> anyhow::Result { info!("Doing initial checksum validation, this might take a while..."); let initial_check_results = self.meta.spawner.spawn_block_in_place(|| { FileOps::new( &self.meta.info, - &*files, + &self.files, &self.meta.file_infos, &self.meta.lengths, ) @@ -69,7 +67,7 @@ impl TorrentStateInitializing { .unwrap_or(true) { let now = Instant::now(); - if let Err(err) = files.ensure_file_length(idx, fi.len) { + if let Err(err) = self.files.ensure_file_length(idx, fi.len) { warn!( "Error setting length for file {:?} to {}: {:#?}", fi.relative_filename, fi.len, err @@ -97,7 +95,7 @@ impl TorrentStateInitializing { let paused = TorrentStatePaused { info: self.meta.clone(), - files, + files: self.files.take()?, chunk_tracker, streams: Arc::new(Default::default()), }; diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index b3438f258..08a243582 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -275,7 +275,7 @@ impl ManagedTorrent { error_span!(parent: span.clone(), "initialize_and_start"), token.clone(), async move { - match init.check(&t.storage_factory).await { + match init.check().await { Ok(paused) => { let mut g = t.locked.write(); if let ManagedTorrentState::Initializing(_) = &g.state { @@ -325,6 +325,7 @@ impl ManagedTorrent { let initializing = Arc::new(TorrentStateInitializing::new( self.info.clone(), g.only_files.clone(), + self.storage_factory.create_and_init(self.info())?, )); g.state = ManagedTorrentState::Initializing(initializing.clone()); self.state_change_notify.notify_waiters(); @@ -616,6 +617,7 @@ impl ManagedTorrentBuilder { let initializing = Arc::new(TorrentStateInitializing::new( info.clone(), self.only_files.clone(), + self.storage_factory.create_and_init(&info)?, )); Ok(Arc::new(ManagedTorrent { locked: RwLock::new(ManagedTorrentLocked { From 36359150a7f2b65c4eafcc771c5f9f98997ca6c3 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 21 Jun 2024 15:12:51 +0100 Subject: [PATCH 2/3] Remove annoying error message when task is cancelled --- crates/librqbit_core/src/spawn_utils.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/crates/librqbit_core/src/spawn_utils.rs b/crates/librqbit_core/src/spawn_utils.rs index 7479ca350..c20793bb4 100644 --- a/crates/librqbit_core/src/spawn_utils.rs +++ b/crates/librqbit_core/src/spawn_utils.rs @@ -1,6 +1,15 @@ use anyhow::bail; use tokio_util::sync::CancellationToken; -use tracing::{error, trace, Instrument}; +use tracing::{debug, error, trace, Instrument}; + +#[derive(Debug)] +struct CancelledError {} +impl std::error::Error for CancelledError {} +impl std::fmt::Display for CancelledError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("cancelled") + } +} /// Spawns a future with tracing instrumentation. pub fn spawn( @@ -23,7 +32,12 @@ pub fn spawn( trace!("finished"); } Err(e) => { - error!("finished with error: {:#}", e) + if e.is::() { + debug!("task cancelled") + } else { + error!("finished with error: {:#}", e) + } + } } return; @@ -43,7 +57,7 @@ pub fn spawn_with_cancel( spawn(span, async move { tokio::select! { _ = cancellation_token.cancelled() => { - bail!("cancelled"); + bail!(CancelledError{}) }, r = fut => r } From 151933b40346331e926e9184c37b5cb84f533984 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 21 Jun 2024 15:16:15 +0100 Subject: [PATCH 3/3] remove .inspect_err() --- crates/librqbit/src/session.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 0d9800438..0f8ccae1d 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1149,7 +1149,11 @@ impl Session { ManagedTorrentState::Paused(p) => Some(p.files), ManagedTorrentState::Live(l) => l .pause() - .inspect_err(|e| warn!("error pausing torrent: {e:?}")) + // inspect_err not available in 1.75 + .map_err(|e| { + warn!("error pausing torrent: {e:?}"); + e + }) .ok() .map(|p| p.files), _ => None,