diff --git a/Cargo.lock b/Cargo.lock index 3be6778b7d..96499371b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12609,7 +12609,6 @@ dependencies = [ "prometheus-client 0.22.3", "rand", "rayon", - "schnellru", "schnorrkel", "serde", "serde_json", diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 4d83aa4f26..766d38b23b 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -46,7 +46,6 @@ pin-project = "1.1.5" prometheus-client = "0.22.3" rand = "0.8.5" rayon = "1.10.0" -schnellru = "0.2.3" schnorrkel = "0.11.4" serde = { version = "1.0.110", features = ["derive"] } serde_json = "1.0.128" @@ -67,7 +66,7 @@ supports-color = { version = "3.0.1", optional = true } tempfile = "3.13.0" thiserror = "2.0.0" thread-priority = "1.1.0" -tokio = { version = "1.40.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] } +tokio = { version = "1.40.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } tokio-stream = { version = "0.1.16", features = ["sync"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"], optional = true } diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 4c749f34cb..07d1fd686d 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -38,6 +38,7 @@ use subspace_networking::libp2p::PeerId; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::{KeyWithDistance, LocalRecordProvider}; use tokio::runtime::Handle; +use tokio::sync::Semaphore; use tokio::task::{block_in_place, yield_now}; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; @@ -539,12 +540,18 @@ where self.handlers.progress.call_simple(&0.0); let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate(); + let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES); + let downloading_pieces_stream = stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| { let downloaded_pieces_count = &downloaded_pieces_count; let caches = &caches; async move { + let mut permit = downloading_semaphore + .acquire_many(SYNC_BATCH_SIZE as u32) + .await + .expect("Semaphore is never closed; qed"); debug!(%batch, num_pieces = %piece_indices.len(), "Downloading pieces"); let pieces_stream = match piece_getter.get_pieces(piece_indices).await { @@ -580,6 +587,8 @@ where continue; } }; + // Release slot for future batches + permit.split(1); let (offset, maybe_backend) = { let mut caches = caches.lock(); @@ -639,10 +648,13 @@ where } } })); + // Download several batches concurrently to make sure slow tail of one is compensated by // another downloading_pieces_stream - .buffer_unordered(SYNC_CONCURRENT_BATCHES) + // This allows to schedule new batch while previous batches partially completed, but + // avoids excessive memory usage like when all futures are created upfront + .buffer_unordered(SYNC_CONCURRENT_BATCHES * 2) // Simply drain everything .for_each(|()| async {}) .await;