diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index 8ccf5f6d7d..db89c867be 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -110,6 +110,10 @@ jobs: - uses: Swatinem/rust-cache@v2 + - name: Run autonomi tests + timeout-minutes: 25 + run: cargo test --release --package autonomi --lib --features="full,fs" + - name: Run node tests timeout-minutes: 25 run: cargo test --release --package sn_node --lib diff --git a/autonomi/src/client/data.rs b/autonomi/src/client/data.rs index 164c85b6b0..06975919f5 100644 --- a/autonomi/src/client/data.rs +++ b/autonomi/src/client/data.rs @@ -7,12 +7,14 @@ // permissions and limitations relating to use of the SAFE Network Software. use bytes::Bytes; -use futures::StreamExt as _; use libp2p::kad::Quorum; +use tokio::task::JoinError; use std::collections::HashSet; +use std::sync::LazyLock; use xor_name::XorName; +use crate::client::utils::process_tasks_with_max_concurrency; use crate::client::{ClientEvent, UploadSummary}; use crate::{self_encryption::encrypt, Client}; use sn_evm::{Amount, AttoTokens}; @@ -23,6 +25,22 @@ use sn_protocol::{ NetworkAddress, }; +/// Number of chunks to upload in parallel. +/// Can be overridden by the `CHUNK_UPLOAD_BATCH_SIZE` environment variable. +pub static CHUNK_UPLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { + let batch_size = std::env::var("CHUNK_UPLOAD_BATCH_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or( + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + * 8, + ); + info!("Chunk upload batch size: {}", batch_size); + batch_size +}); + /// Raw Data Address (points to a DataMap) pub type DataAddr = XorName; /// Raw Chunk Address (points to a [`Chunk`]) @@ -41,6 +59,8 @@ pub enum PutError { PayError(#[from] PayError), #[error("Serialization error: {0}")] Serialization(String), + #[error("Join error uploading chunk.")] + JoinError(#[from] JoinError), #[error("A wallet error occurred.")] Wallet(#[from] sn_evm::EvmError), #[error("The vault owner key does not match the client's public key")] @@ -106,12 +126,9 @@ impl Client { pub async fn data_put(&self, data: Bytes, wallet: &EvmWallet) -> Result { let now = sn_networking::target_arch::Instant::now(); let (data_map_chunk, chunks) = encrypt(data)?; - info!( - "Uploading datamap chunk to the network at: {:?}", - data_map_chunk.address() - ); - + let data_map_addr = data_map_chunk.address(); debug!("Encryption took: {:.2?}", now.elapsed()); + info!("Uploading datamap chunk to the network at: {data_map_addr:?}"); let map_xor_name = *data_map_chunk.address().xorname(); let mut xor_names = vec![map_xor_name]; @@ -127,18 +144,15 @@ impl Client { .await .inspect_err(|err| error!("Error paying for data: {err:?}"))?; - let mut record_count = 0; - // Upload all the chunks in parallel including the data map chunk debug!("Uploading {} chunks", chunks.len()); - let mut tasks = futures::stream::FuturesUnordered::new(); - + let mut upload_tasks = vec![]; for chunk in chunks.into_iter().chain(std::iter::once(data_map_chunk)) { let self_clone = self.clone(); let address = *chunk.address(); if let Some(proof) = payment_proofs.get(chunk.name()) { let proof_clone = proof.clone(); - tasks.push(async move { + upload_tasks.push(async move { self_clone .chunk_upload_with_payment(chunk, proof_clone) .await @@ -148,11 +162,23 @@ impl Client { debug!("Chunk at {address:?} was already paid for so skipping"); } } - while let Some(result) = tasks.next().await { - result.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?; - record_count += 1; - } - + let uploads = process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE) + .await + .inspect_err(|err| error!("Join error uploading chunk: {err:?}")) + .map_err(PutError::JoinError)?; + + // Check for errors + let total_uploads = uploads.len(); + let ok_uploads = uploads + .iter() + .filter_map(|up| up.is_ok().then_some(())) + .count(); + info!("Uploaded {} chunks out of {}", ok_uploads, total_uploads); + let uploads: Result, _> = uploads.into_iter().collect(); + uploads.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?; + let record_count = ok_uploads; + + // Reporting if let Some(channel) = self.client_event_sender.as_ref() { let tokens_spent = payment_proofs .values() diff --git a/autonomi/src/client/data_private.rs b/autonomi/src/client/data_private.rs index 35eb3e30d2..d464f4db4e 100644 --- a/autonomi/src/client/data_private.rs +++ b/autonomi/src/client/data_private.rs @@ -9,12 +9,13 @@ use std::hash::{DefaultHasher, Hash, Hasher}; use bytes::Bytes; -use futures::StreamExt as _; use serde::{Deserialize, Serialize}; use sn_evm::{Amount, EvmWallet}; use sn_protocol::storage::Chunk; +use super::data::CHUNK_UPLOAD_BATCH_SIZE; use super::data::{GetError, PutError}; +use crate::client::utils::process_tasks_with_max_concurrency; use crate::client::{ClientEvent, UploadSummary}; use crate::{self_encryption::encrypt, Client}; @@ -78,15 +79,14 @@ impl Client { .inspect_err(|err| error!("Error paying for data: {err:?}"))?; // Upload the chunks with the payments - let mut record_count = 0; debug!("Uploading {} chunks", chunks.len()); - let mut tasks = futures::stream::FuturesUnordered::new(); + let mut upload_tasks = vec![]; for chunk in chunks { let self_clone = self.clone(); let address = *chunk.address(); if let Some(proof) = payment_proofs.get(chunk.name()) { let proof_clone = proof.clone(); - tasks.push(async move { + upload_tasks.push(async move { self_clone .chunk_upload_with_payment(chunk, proof_clone) .await @@ -96,10 +96,21 @@ impl Client { debug!("Chunk at {address:?} was already paid for so skipping"); } } - while let Some(result) = tasks.next().await { - result.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?; - record_count += 1; - } + let uploads = process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE) + .await + .inspect_err(|err| error!("Join error uploading chunk: {err:?}")) + .map_err(PutError::JoinError)?; + + // Check for errors + let total_uploads = uploads.len(); + let ok_uploads = uploads + .iter() + .filter_map(|up| up.is_ok().then_some(())) + .count(); + info!("Uploaded {} chunks out of {}", ok_uploads, total_uploads); + let uploads: Result, _> = uploads.into_iter().collect(); + uploads.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?; + let record_count = ok_uploads; // Reporting if let Some(channel) = self.client_event_sender.as_ref() { diff --git a/autonomi/src/client/fs.rs b/autonomi/src/client/fs.rs index d7f243df68..c42c2d10bc 100644 --- a/autonomi/src/client/fs.rs +++ b/autonomi/src/client/fs.rs @@ -8,15 +8,34 @@ use crate::client::archive::Metadata; use crate::client::data::CostError; +use crate::client::utils::process_tasks_with_max_concurrency; use crate::client::Client; use bytes::Bytes; use sn_evm::EvmWallet; use sn_networking::target_arch::{Duration, SystemTime}; use std::path::PathBuf; +use std::sync::LazyLock; +use tokio::task::JoinError; use super::archive::{Archive, ArchiveAddr}; use super::data::{DataAddr, GetError, PutError}; +/// Number of files to upload in parallel. +/// Can be overridden by the `FILE_UPLOAD_BATCH_SIZE` environment variable. +pub static FILE_UPLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { + let batch_size = std::env::var("FILE_UPLOAD_BATCH_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or( + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + * 8, + ); + info!("File upload batch size: {}", batch_size); + batch_size +}); + /// Errors that can occur during the file upload operation. #[cfg(feature = "fs")] #[derive(Debug, thiserror::Error)] @@ -29,6 +48,8 @@ pub enum UploadError { PutError(#[from] PutError), #[error("Failed to fetch file")] GetError(#[from] GetError), + #[error("Error in parralel processing")] + JoinError(#[from] JoinError), #[error("Failed to serialize")] Serialization(#[from] rmp_serde::encode::Error), #[error("Failed to deserialize")] @@ -96,30 +117,51 @@ impl Client { dir_path: PathBuf, wallet: &EvmWallet, ) -> Result { - let mut archive = Archive::new(); + info!("Uploading directory: {dir_path:?}"); + let start = tokio::time::Instant::now(); + // start upload of files in parallel + let mut upload_tasks = Vec::new(); for entry in walkdir::WalkDir::new(dir_path) { let entry = entry?; - if !entry.file_type().is_file() { continue; } - let path = entry.path().to_path_buf(); - tracing::info!("Uploading file: {path:?}"); - #[cfg(feature = "loud")] - println!("Uploading file: {path:?}"); - let file = self.file_upload(path.clone(), wallet).await?; - let metadata = metadata_from_entry(&entry); + let path = entry.path().to_path_buf(); + upload_tasks.push(async move { + let file = self.file_upload(path.clone(), wallet).await; + (path, metadata, file) + }); + } - archive.add_file(path, file, metadata); + // wait for all files to be uploaded + let uploads = + process_tasks_with_max_concurrency(upload_tasks, *FILE_UPLOAD_BATCH_SIZE).await?; + info!( + "Upload of {} files completed in {:?}", + uploads.len(), + start.elapsed() + ); + let mut archive = Archive::new(); + for (path, metadata, maybe_file) in uploads.into_iter() { + match maybe_file { + Ok(file) => archive.add_file(path, file, metadata), + Err(err) => { + error!("Failed to upload file: {path:?}: {err:?}"); + return Err(err); + } + } } + // upload archive let archive_serialized = archive.into_bytes()?; - let arch_addr = self.data_put(archive_serialized, wallet).await?; + info!("Complete archive upload completed in {:?}", start.elapsed()); + #[cfg(feature = "loud")] + println!("Upload completed in {:?}", start.elapsed()); Ok(arch_addr) } @@ -130,6 +172,10 @@ impl Client { path: PathBuf, wallet: &EvmWallet, ) -> Result { + info!("Uploading file: {path:?}"); + #[cfg(feature = "loud")] + println!("Uploading file: {path:?}"); + let data = tokio::fs::read(path).await?; let data = Bytes::from(data); let addr = self.data_put(data, wallet).await?; diff --git a/autonomi/src/client/fs_private.rs b/autonomi/src/client/fs_private.rs index 0d9b819d70..b6ed2672f9 100644 --- a/autonomi/src/client/fs_private.rs +++ b/autonomi/src/client/fs_private.rs @@ -14,6 +14,7 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. +use crate::client::utils::process_tasks_with_max_concurrency; use crate::client::Client; use bytes::Bytes; use sn_evm::EvmWallet; @@ -23,6 +24,8 @@ use super::archive_private::{PrivateArchive, PrivateArchiveAccess}; use super::data_private::PrivateDataAccess; use super::fs::{DownloadError, UploadError}; +use super::fs::FILE_UPLOAD_BATCH_SIZE; + impl Client { /// Download a private file from network to local file system pub async fn private_file_download( @@ -59,30 +62,54 @@ impl Client { dir_path: PathBuf, wallet: &EvmWallet, ) -> Result { - let mut archive = PrivateArchive::new(); + info!("Uploading directory as private: {dir_path:?}"); + let start = tokio::time::Instant::now(); + // start upload of file in parallel + let mut upload_tasks = Vec::new(); for entry in walkdir::WalkDir::new(dir_path) { let entry = entry?; - if !entry.file_type().is_file() { continue; } - let path = entry.path().to_path_buf(); - tracing::info!("Uploading file: {path:?}"); - #[cfg(feature = "loud")] - println!("Uploading file: {path:?}"); - let file = self.private_file_upload(path.clone(), wallet).await?; - let metadata = super::fs::metadata_from_entry(&entry); + let path = entry.path().to_path_buf(); + upload_tasks.push(async move { + let file = self.private_file_upload(path.clone(), wallet).await; + (path, metadata, file) + }); + } - archive.add_file(path, file, metadata); + // wait for all files to be uploaded + let uploads = + process_tasks_with_max_concurrency(upload_tasks, *FILE_UPLOAD_BATCH_SIZE).await?; + info!( + "Upload of {} files completed in {:?}", + uploads.len(), + start.elapsed() + ); + let mut archive = PrivateArchive::new(); + for (path, metadata, maybe_file) in uploads.into_iter() { + match maybe_file { + Ok(file) => archive.add_file(path, file, metadata), + Err(err) => { + error!("Failed to upload file: {path:?}: {err:?}"); + return Err(err); + } + } } + // upload archive let archive_serialized = archive.into_bytes()?; - let arch_addr = self.private_data_put(archive_serialized, wallet).await?; + info!( + "Complete private archive upload completed in {:?}", + start.elapsed() + ); + #[cfg(feature = "loud")] + println!("Upload completed in {:?}", start.elapsed()); Ok(arch_addr) } @@ -93,6 +120,10 @@ impl Client { path: PathBuf, wallet: &EvmWallet, ) -> Result { + info!("Uploading file: {path:?}"); + #[cfg(feature = "loud")] + println!("Uploading file: {path:?}"); + let data = tokio::fs::read(path).await?; let data = Bytes::from(data); let addr = self.private_data_put(data, wallet).await?; diff --git a/autonomi/src/client/utils.rs b/autonomi/src/client/utils.rs index 68ae70f2f7..be26e35fd2 100644 --- a/autonomi/src/client/utils.rs +++ b/autonomi/src/client/utils.rs @@ -6,9 +6,8 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use std::{collections::HashMap, num::NonZero}; - use bytes::Bytes; +use futures::stream::{FuturesUnordered, StreamExt}; use libp2p::kad::{Quorum, Record}; use rand::{thread_rng, Rng}; use self_encryption::{decrypt_full_set, DataMap, EncryptedChunk}; @@ -21,6 +20,8 @@ use sn_protocol::{ storage::{try_serialize_record, Chunk, ChunkAddress, RecordKind, RetryStrategy}, NetworkAddress, }; +use std::{collections::HashMap, future::Future, num::NonZero}; +use tokio::task::JoinError; use xor_name::XorName; use super::{ @@ -158,6 +159,11 @@ impl Client { let (quote_payments, skipped_chunks) = extract_quote_payments(&cost_map); + // Make sure nobody else can use the wallet while we are paying + debug!("Waiting for wallet lock"); + let lock_guard = wallet.lock().await; + debug!("Locked wallet"); + // TODO: the error might contain some succeeded quote payments as well. These should be returned on err, so that they can be skipped when retrying. // TODO: retry when it fails? // Execute chunk payments @@ -166,6 +172,10 @@ impl Client { .await .map_err(|err| PayError::from(err.0))?; + // payment is done, unlock the wallet for other threads + drop(lock_guard); + debug!("Unlocked wallet"); + let proofs = payment_proof_from_quotes_and_payments(&cost_map, &payments); trace!( @@ -248,3 +258,33 @@ pub(crate) fn extract_quote_payments( (to_be_paid, already_paid) } + +pub(crate) async fn process_tasks_with_max_concurrency( + tasks: I, + batch_size: usize, +) -> Result, JoinError> +where + I: IntoIterator, + I::Item: Future + Send, + R: Send, +{ + let mut futures = FuturesUnordered::new(); + let mut results = Vec::new(); + + for task in tasks.into_iter() { + futures.push(task); + + if futures.len() >= batch_size { + if let Some(result) = futures.next().await { + results.push(result); + } + } + } + + // Process remaining tasks + while let Some(result) = futures.next().await { + results.push(result); + } + + Ok(results) +} diff --git a/evmlib/src/wallet.rs b/evmlib/src/wallet.rs index b9504f69a1..b6719be336 100644 --- a/evmlib/src/wallet.rs +++ b/evmlib/src/wallet.rs @@ -22,6 +22,7 @@ use alloy::signers::local::{LocalSigner, PrivateKeySigner}; use alloy::transports::http::{reqwest, Client, Http}; use alloy::transports::{RpcError, TransportErrorKind}; use std::collections::BTreeMap; +use std::sync::Arc; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -39,12 +40,17 @@ pub enum Error { pub struct Wallet { wallet: EthereumWallet, network: Network, + lock: Arc>, } impl Wallet { /// Creates a new Wallet object with the specific Network and EthereumWallet. pub fn new(network: Network, wallet: EthereumWallet) -> Self { - Self { wallet, network } + Self { + wallet, + network, + lock: Arc::new(tokio::sync::Mutex::new(())), + } } /// Convenience function that creates a new Wallet with a random EthereumWallet. @@ -136,6 +142,12 @@ impl Wallet { pub fn to_provider(&self) -> ProviderWithWallet { http_provider_with_wallet(self.network.rpc_url().clone(), self.wallet.clone()) } + + /// Lock the wallet to prevent concurrent use. + /// Drop the guard to unlock the wallet. + pub async fn lock(&self) -> tokio::sync::MutexGuard<()> { + self.lock.lock().await + } } /// Generate an EthereumWallet with a random private key.