Skip to content

Commit

Permalink
Merge pull request #2351 from RolandSherwin/parallel_download
Browse files Browse the repository at this point in the history
feat(autonomi): download chunks in parallel
  • Loading branch information
grumbach authored Oct 30, 2024
2 parents 0f98b24 + b671669 commit a2cc3e1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 12 deletions.
16 changes: 16 additions & 0 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ pub static CHUNK_UPLOAD_BATCH_SIZE: LazyLock<usize> = LazyLock::new(|| {
batch_size
});

/// Number of chunks to download in parallel.
/// Can be overridden by the `CHUNK_DOWNLOAD_BATCH_SIZE` environment variable.
pub static CHUNK_DOWNLOAD_BATCH_SIZE: LazyLock<usize> = LazyLock::new(|| {
let batch_size = std::env::var("CHUNK_DOWNLOAD_BATCH_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
* 8,
);
info!("Chunk download batch size: {}", batch_size);
batch_size
});

/// Raw Data Address (points to a DataMap)
pub type DataAddr = XorName;
/// Raw Chunk Address (points to a [`Chunk`])
Expand Down
36 changes: 24 additions & 12 deletions autonomi/src/client/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{collections::HashMap, future::Future, num::NonZero};
use xor_name::XorName;

use super::{
data::{CostError, GetError, PayError, PutError},
data::{CostError, GetError, PayError, PutError, CHUNK_DOWNLOAD_BATCH_SIZE},
Client,
};
use crate::self_encryption::DataMapLevel;
Expand All @@ -33,20 +33,32 @@ use crate::utils::payment_proof_from_quotes_and_payments;
impl Client {
/// Fetch and decrypt all chunks in the data map.
pub(crate) async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result<Bytes, GetError> {
let mut encrypted_chunks = vec![];

let mut download_tasks = vec![];
for info in data_map.infos() {
let chunk = self
.chunk_get(info.dst_hash)
.await
.inspect_err(|err| error!("Error fetching chunk {:?}: {err:?}", info.dst_hash))?;
let chunk = EncryptedChunk {
index: info.index,
content: chunk.value,
};
encrypted_chunks.push(chunk);
download_tasks.push(async move {
match self
.chunk_get(info.dst_hash)
.await
.inspect_err(|err| error!("Error fetching chunk {:?}: {err:?}", info.dst_hash))
{
Ok(chunk) => Ok(EncryptedChunk {
index: info.index,
content: chunk.value,
}),
Err(err) => {
error!("Error fetching chunk {:?}: {err:?}", info.dst_hash);
Err(err)
}
}
});
}

let encrypted_chunks =
process_tasks_with_max_concurrency(download_tasks, *CHUNK_DOWNLOAD_BATCH_SIZE)
.await
.into_iter()
.collect::<Result<Vec<EncryptedChunk>, GetError>>()?;

let data = decrypt_full_set(data_map, &encrypted_chunks).map_err(|e| {
error!("Error decrypting encrypted_chunks: {e:?}");
GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e))
Expand Down

0 comments on commit a2cc3e1

Please sign in to comment.