Skip to content

Commit

Permalink
Accelerate piece cache sync by starting downloading of the previous b…
Browse files Browse the repository at this point in the history
…atch after partial completion of the previous batches
  • Loading branch information
nazar-pc committed Nov 17, 2024
1 parent 0943923 commit d303125
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pin-project = "1.1.5"
prometheus-client = "0.22.3"
rand = "0.8.5"
rayon = "1.10.0"
schnellru = "0.2.3"
schnorrkel = "0.11.4"
serde = { version = "1.0.110", features = ["derive"] }
serde_json = "1.0.128"
Expand All @@ -67,7 +66,7 @@ supports-color = { version = "3.0.1", optional = true }
tempfile = "3.13.0"
thiserror = "2.0.0"
thread-priority = "1.1.0"
tokio = { version = "1.40.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
tokio = { version = "1.40.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1.16", features = ["sync"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"], optional = true }
Expand Down
14 changes: 13 additions & 1 deletion crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{KeyWithDistance, LocalRecordProvider};
use tokio::runtime::Handle;
use tokio::sync::Semaphore;
use tokio::task::{block_in_place, yield_now};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};

Expand Down Expand Up @@ -539,12 +540,18 @@ where
self.handlers.progress.call_simple(&0.0);
let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();

let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);

let downloading_pieces_stream =
stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
let downloaded_pieces_count = &downloaded_pieces_count;
let caches = &caches;

async move {
let mut permit = downloading_semaphore
.acquire_many(SYNC_BATCH_SIZE as u32)
.await
.expect("Semaphore is never closed; qed");
debug!(%batch, num_pieces = %piece_indices.len(), "Downloading pieces");

let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
Expand Down Expand Up @@ -580,6 +587,8 @@ where
continue;
}
};
// Release slot for future batches
permit.split(1);

let (offset, maybe_backend) = {
let mut caches = caches.lock();
Expand Down Expand Up @@ -639,10 +648,13 @@ where
}
}
}));

// Download several batches concurrently to make sure slow tail of one is compensated by
// another
downloading_pieces_stream
.buffer_unordered(SYNC_CONCURRENT_BATCHES)
// This allows to schedule new batch while previous batches partially completed, but
// avoids excessive memory usage like when all futures are created upfront
.buffer_unordered(SYNC_CONCURRENT_BATCHES * 2)
// Simply drain everything
.for_each(|()| async {})
.await;
Expand Down

0 comments on commit d303125

Please sign in to comment.