Skip to content

Commit

Permalink
Merge pull request #2333 from grumbach/upload_performance
Browse files Browse the repository at this point in the history
feat: improved fs uploads performance
  • Loading branch information
grumbach authored Oct 29, 2024
2 parents b6e9218 + d273e2f commit 7203ffd
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 47 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ jobs:

- uses: Swatinem/rust-cache@v2

- name: Run autonomi tests
timeout-minutes: 25
run: cargo test --release --package autonomi --lib --features="full,fs"

- name: Run node tests
timeout-minutes: 25
run: cargo test --release --package sn_node --lib
Expand Down
58 changes: 42 additions & 16 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
// permissions and limitations relating to use of the SAFE Network Software.

use bytes::Bytes;
use futures::StreamExt as _;
use libp2p::kad::Quorum;
use tokio::task::JoinError;

use std::collections::HashSet;
use std::sync::LazyLock;
use xor_name::XorName;

use crate::client::utils::process_tasks_with_max_concurrency;
use crate::client::{ClientEvent, UploadSummary};
use crate::{self_encryption::encrypt, Client};
use sn_evm::{Amount, AttoTokens};
Expand All @@ -23,6 +25,22 @@ use sn_protocol::{
NetworkAddress,
};

/// Number of chunks to upload in parallel.
/// Can be overridden by the `CHUNK_UPLOAD_BATCH_SIZE` environment variable.
pub static CHUNK_UPLOAD_BATCH_SIZE: LazyLock<usize> = LazyLock::new(|| {
let batch_size = std::env::var("CHUNK_UPLOAD_BATCH_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
* 8,
);
info!("Chunk upload batch size: {}", batch_size);
batch_size
});

/// Raw Data Address (points to a DataMap)
pub type DataAddr = XorName;
/// Raw Chunk Address (points to a [`Chunk`])
Expand All @@ -41,6 +59,8 @@ pub enum PutError {
PayError(#[from] PayError),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Join error uploading chunk.")]
JoinError(#[from] JoinError),
#[error("A wallet error occurred.")]
Wallet(#[from] sn_evm::EvmError),
#[error("The vault owner key does not match the client's public key")]
Expand Down Expand Up @@ -106,12 +126,9 @@ impl Client {
pub async fn data_put(&self, data: Bytes, wallet: &EvmWallet) -> Result<DataAddr, PutError> {
let now = sn_networking::target_arch::Instant::now();
let (data_map_chunk, chunks) = encrypt(data)?;
info!(
"Uploading datamap chunk to the network at: {:?}",
data_map_chunk.address()
);

let data_map_addr = data_map_chunk.address();
debug!("Encryption took: {:.2?}", now.elapsed());
info!("Uploading datamap chunk to the network at: {data_map_addr:?}");

let map_xor_name = *data_map_chunk.address().xorname();
let mut xor_names = vec![map_xor_name];
Expand All @@ -127,18 +144,15 @@ impl Client {
.await
.inspect_err(|err| error!("Error paying for data: {err:?}"))?;

let mut record_count = 0;

// Upload all the chunks in parallel including the data map chunk
debug!("Uploading {} chunks", chunks.len());
let mut tasks = futures::stream::FuturesUnordered::new();

let mut upload_tasks = vec![];
for chunk in chunks.into_iter().chain(std::iter::once(data_map_chunk)) {
let self_clone = self.clone();
let address = *chunk.address();
if let Some(proof) = payment_proofs.get(chunk.name()) {
let proof_clone = proof.clone();
tasks.push(async move {
upload_tasks.push(async move {
self_clone
.chunk_upload_with_payment(chunk, proof_clone)
.await
Expand All @@ -148,11 +162,23 @@ impl Client {
debug!("Chunk at {address:?} was already paid for so skipping");
}
}
while let Some(result) = tasks.next().await {
result.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
record_count += 1;
}

let uploads = process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE)
.await
.inspect_err(|err| error!("Join error uploading chunk: {err:?}"))
.map_err(PutError::JoinError)?;

// Check for errors
let total_uploads = uploads.len();
let ok_uploads = uploads
.iter()
.filter_map(|up| up.is_ok().then_some(()))
.count();
info!("Uploaded {} chunks out of {}", ok_uploads, total_uploads);
let uploads: Result<Vec<_>, _> = uploads.into_iter().collect();
uploads.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
let record_count = ok_uploads;

// Reporting
if let Some(channel) = self.client_event_sender.as_ref() {
let tokens_spent = payment_proofs
.values()
Expand Down
27 changes: 19 additions & 8 deletions autonomi/src/client/data_private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use bytes::Bytes;
use futures::StreamExt as _;
use serde::{Deserialize, Serialize};
use sn_evm::{Amount, EvmWallet};
use sn_protocol::storage::Chunk;

use super::data::CHUNK_UPLOAD_BATCH_SIZE;
use super::data::{GetError, PutError};
use crate::client::utils::process_tasks_with_max_concurrency;
use crate::client::{ClientEvent, UploadSummary};
use crate::{self_encryption::encrypt, Client};

Expand Down Expand Up @@ -78,15 +79,14 @@ impl Client {
.inspect_err(|err| error!("Error paying for data: {err:?}"))?;

// Upload the chunks with the payments
let mut record_count = 0;
debug!("Uploading {} chunks", chunks.len());
let mut tasks = futures::stream::FuturesUnordered::new();
let mut upload_tasks = vec![];
for chunk in chunks {
let self_clone = self.clone();
let address = *chunk.address();
if let Some(proof) = payment_proofs.get(chunk.name()) {
let proof_clone = proof.clone();
tasks.push(async move {
upload_tasks.push(async move {
self_clone
.chunk_upload_with_payment(chunk, proof_clone)
.await
Expand All @@ -96,10 +96,21 @@ impl Client {
debug!("Chunk at {address:?} was already paid for so skipping");
}
}
while let Some(result) = tasks.next().await {
result.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
record_count += 1;
}
let uploads = process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE)
.await
.inspect_err(|err| error!("Join error uploading chunk: {err:?}"))
.map_err(PutError::JoinError)?;

// Check for errors
let total_uploads = uploads.len();
let ok_uploads = uploads
.iter()
.filter_map(|up| up.is_ok().then_some(()))
.count();
info!("Uploaded {} chunks out of {}", ok_uploads, total_uploads);
let uploads: Result<Vec<_>, _> = uploads.into_iter().collect();
uploads.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
let record_count = ok_uploads;

// Reporting
if let Some(channel) = self.client_event_sender.as_ref() {
Expand Down
66 changes: 56 additions & 10 deletions autonomi/src/client/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,34 @@

use crate::client::archive::Metadata;
use crate::client::data::CostError;
use crate::client::utils::process_tasks_with_max_concurrency;
use crate::client::Client;
use bytes::Bytes;
use sn_evm::EvmWallet;
use sn_networking::target_arch::{Duration, SystemTime};
use std::path::PathBuf;
use std::sync::LazyLock;
use tokio::task::JoinError;

use super::archive::{Archive, ArchiveAddr};
use super::data::{DataAddr, GetError, PutError};

/// Number of files to upload in parallel.
/// Can be overridden by the `FILE_UPLOAD_BATCH_SIZE` environment variable.
pub static FILE_UPLOAD_BATCH_SIZE: LazyLock<usize> = LazyLock::new(|| {
let batch_size = std::env::var("FILE_UPLOAD_BATCH_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
* 8,
);
info!("File upload batch size: {}", batch_size);
batch_size
});

/// Errors that can occur during the file upload operation.
#[cfg(feature = "fs")]
#[derive(Debug, thiserror::Error)]
Expand All @@ -29,6 +48,8 @@ pub enum UploadError {
PutError(#[from] PutError),
#[error("Failed to fetch file")]
GetError(#[from] GetError),
#[error("Error in parralel processing")]
JoinError(#[from] JoinError),
#[error("Failed to serialize")]
Serialization(#[from] rmp_serde::encode::Error),
#[error("Failed to deserialize")]
Expand Down Expand Up @@ -96,30 +117,51 @@ impl Client {
dir_path: PathBuf,
wallet: &EvmWallet,
) -> Result<ArchiveAddr, UploadError> {
let mut archive = Archive::new();
info!("Uploading directory: {dir_path:?}");
let start = tokio::time::Instant::now();

// start upload of files in parallel
let mut upload_tasks = Vec::new();
for entry in walkdir::WalkDir::new(dir_path) {
let entry = entry?;

if !entry.file_type().is_file() {
continue;
}

let path = entry.path().to_path_buf();
tracing::info!("Uploading file: {path:?}");
#[cfg(feature = "loud")]
println!("Uploading file: {path:?}");
let file = self.file_upload(path.clone(), wallet).await?;

let metadata = metadata_from_entry(&entry);
let path = entry.path().to_path_buf();
upload_tasks.push(async move {
let file = self.file_upload(path.clone(), wallet).await;
(path, metadata, file)
});
}

archive.add_file(path, file, metadata);
// wait for all files to be uploaded
let uploads =
process_tasks_with_max_concurrency(upload_tasks, *FILE_UPLOAD_BATCH_SIZE).await?;
info!(
"Upload of {} files completed in {:?}",
uploads.len(),
start.elapsed()
);
let mut archive = Archive::new();
for (path, metadata, maybe_file) in uploads.into_iter() {
match maybe_file {
Ok(file) => archive.add_file(path, file, metadata),
Err(err) => {
error!("Failed to upload file: {path:?}: {err:?}");
return Err(err);
}
}
}

// upload archive
let archive_serialized = archive.into_bytes()?;

let arch_addr = self.data_put(archive_serialized, wallet).await?;

info!("Complete archive upload completed in {:?}", start.elapsed());
#[cfg(feature = "loud")]
println!("Upload completed in {:?}", start.elapsed());
Ok(arch_addr)
}

Expand All @@ -130,6 +172,10 @@ impl Client {
path: PathBuf,
wallet: &EvmWallet,
) -> Result<DataAddr, UploadError> {
info!("Uploading file: {path:?}");
#[cfg(feature = "loud")]
println!("Uploading file: {path:?}");

let data = tokio::fs::read(path).await?;
let data = Bytes::from(data);
let addr = self.data_put(data, wallet).await?;
Expand Down
51 changes: 41 additions & 10 deletions autonomi/src/client/fs_private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::client::utils::process_tasks_with_max_concurrency;
use crate::client::Client;
use bytes::Bytes;
use sn_evm::EvmWallet;
Expand All @@ -23,6 +24,8 @@ use super::archive_private::{PrivateArchive, PrivateArchiveAccess};
use super::data_private::PrivateDataAccess;
use super::fs::{DownloadError, UploadError};

use super::fs::FILE_UPLOAD_BATCH_SIZE;

impl Client {
/// Download a private file from network to local file system
pub async fn private_file_download(
Expand Down Expand Up @@ -59,30 +62,54 @@ impl Client {
dir_path: PathBuf,
wallet: &EvmWallet,
) -> Result<PrivateArchiveAccess, UploadError> {
let mut archive = PrivateArchive::new();
info!("Uploading directory as private: {dir_path:?}");
let start = tokio::time::Instant::now();

// start upload of file in parallel
let mut upload_tasks = Vec::new();
for entry in walkdir::WalkDir::new(dir_path) {
let entry = entry?;

if !entry.file_type().is_file() {
continue;
}

let path = entry.path().to_path_buf();
tracing::info!("Uploading file: {path:?}");
#[cfg(feature = "loud")]
println!("Uploading file: {path:?}");
let file = self.private_file_upload(path.clone(), wallet).await?;

let metadata = super::fs::metadata_from_entry(&entry);
let path = entry.path().to_path_buf();
upload_tasks.push(async move {
let file = self.private_file_upload(path.clone(), wallet).await;
(path, metadata, file)
});
}

archive.add_file(path, file, metadata);
// wait for all files to be uploaded
let uploads =
process_tasks_with_max_concurrency(upload_tasks, *FILE_UPLOAD_BATCH_SIZE).await?;
info!(
"Upload of {} files completed in {:?}",
uploads.len(),
start.elapsed()
);
let mut archive = PrivateArchive::new();
for (path, metadata, maybe_file) in uploads.into_iter() {
match maybe_file {
Ok(file) => archive.add_file(path, file, metadata),
Err(err) => {
error!("Failed to upload file: {path:?}: {err:?}");
return Err(err);
}
}
}

// upload archive
let archive_serialized = archive.into_bytes()?;

let arch_addr = self.private_data_put(archive_serialized, wallet).await?;

info!(
"Complete private archive upload completed in {:?}",
start.elapsed()
);
#[cfg(feature = "loud")]
println!("Upload completed in {:?}", start.elapsed());
Ok(arch_addr)
}

Expand All @@ -93,6 +120,10 @@ impl Client {
path: PathBuf,
wallet: &EvmWallet,
) -> Result<PrivateDataAccess, UploadError> {
info!("Uploading file: {path:?}");
#[cfg(feature = "loud")]
println!("Uploading file: {path:?}");

let data = tokio::fs::read(path).await?;
let data = Bytes::from(data);
let addr = self.private_data_put(data, wallet).await?;
Expand Down
Loading

0 comments on commit 7203ffd

Please sign in to comment.