Skip to content

Commit

Permalink
feat(autonomi): download chunks in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Oct 29, 2024
1 parent 7ca5531 commit 75991b9
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
16 changes: 16 additions & 0 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ pub static CHUNK_UPLOAD_BATCH_SIZE: LazyLock<usize> = LazyLock::new(|| {
batch_size
});

/// Number of chunks to download in parallel.
/// Can be overridden by the `CHUNK_DOWNLOAD_BATCH_SIZE` environment variable.
pub static CHUNK_DOWNLOAD_BATCH_SIZE: LazyLock<usize> = LazyLock::new(|| {
let batch_size = std::env::var("CHUNK_DOWNLOAD_BATCH_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
* 8,
);
info!("Chunk download batch size: {}", batch_size);
batch_size
});

/// Raw Data Address (points to a DataMap)
pub type DataAddr = XorName;
/// Raw Chunk Address (points to a [`Chunk`])
Expand Down
31 changes: 20 additions & 11 deletions autonomi/src/client/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{collections::HashMap, future::Future, num::NonZero};
use xor_name::XorName;

use super::{
data::{CostError, GetError, PayError, PutError},
data::{CostError, GetError, PayError, PutError, CHUNK_DOWNLOAD_BATCH_SIZE},
Client,
};
use crate::self_encryption::DataMapLevel;
Expand All @@ -35,16 +35,25 @@ impl Client {
pub(crate) async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result<Bytes, GetError> {
let mut encrypted_chunks = vec![];

for info in data_map.infos() {
let chunk = self
.chunk_get(info.dst_hash)
.await
.inspect_err(|err| error!("Error fetching chunk {:?}: {err:?}", info.dst_hash))?;
let chunk = EncryptedChunk {
index: info.index,
content: chunk.value,
};
encrypted_chunks.push(chunk);
let mut stream = futures::stream::iter(data_map.infos().into_iter())
.map(|info| {
let dst_hash = info.dst_hash;
async move {
self.chunk_get(dst_hash)
.await
.inspect_err(move |err| {
error!("Error fetching chunk {:?}: {err:?}", dst_hash)
})
.map(|chunk| EncryptedChunk {
index: info.index,
content: chunk.value,
})
}
})
.buffered(*CHUNK_DOWNLOAD_BATCH_SIZE);

while let Some(encrypted_chunk) = stream.next().await {
encrypted_chunks.push(encrypted_chunk?);
}

let data = decrypt_full_set(data_map, &encrypted_chunks).map_err(|e| {
Expand Down

0 comments on commit 75991b9

Please sign in to comment.