Skip to content

Commit

Permalink
feat(autonomi): retry failed puts
Browse files Browse the repository at this point in the history
  • Loading branch information
b-zee committed Nov 26, 2024
1 parent 199dc78 commit eda5837
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 61 deletions.
116 changes: 86 additions & 30 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
use bytes::Bytes;
use libp2p::kad::Quorum;

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::LazyLock;
use xor_name::XorName;

use crate::client::payment::PaymentOption;
use crate::client::utils::process_tasks_with_max_concurrency;
use crate::client::{ClientEvent, UploadSummary};
use crate::{self_encryption::encrypt, Client};
use sn_evm::EvmWalletError;
use sn_evm::{Amount, AttoTokens};
use sn_evm::{EvmWalletError, ProofOfPayment};
use sn_networking::{GetRecordCfg, NetworkError};
use sn_protocol::{
storage::{try_deserialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind},
Expand All @@ -41,6 +41,9 @@ pub static CHUNK_UPLOAD_BATCH_SIZE: LazyLock<usize> = LazyLock::new(|| {
batch_size
});

/// Number of retries to upload chunks.
pub const RETRY_ATTEMPTS: usize = 3;

/// Number of chunks to download in parallel.
/// Can be overridden by the `CHUNK_DOWNLOAD_BATCH_SIZE` environment variable.
pub static CHUNK_DOWNLOAD_BATCH_SIZE: LazyLock<usize> = LazyLock::new(|| {
Expand Down Expand Up @@ -164,35 +167,28 @@ impl Client {

// Upload all the chunks in parallel including the data map chunk
debug!("Uploading {} chunks", chunks.len());
let mut upload_tasks = vec![];
for chunk in chunks.into_iter().chain(std::iter::once(data_map_chunk)) {
let self_clone = self.clone();
let address = *chunk.address();
if let Some(proof) = receipt.get(chunk.name()) {
let proof_clone = proof.clone();
upload_tasks.push(async move {
self_clone
.chunk_upload_with_payment(chunk, proof_clone)
.await
.inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}"))
});
} else {
debug!("Chunk at {address:?} was already paid for so skipping");
}

let mut failed_uploads = self
.upload_chunks_with_retries(
chunks
.iter()
.chain(std::iter::once(&data_map_chunk))
.collect(),
&receipt,
)
.await;

// Return the last chunk upload error
if let Some(last_chunk_fail) = failed_uploads.pop() {
tracing::error!(
"Error uploading chunk ({:?}): {:?}",
last_chunk_fail.0.address(),
last_chunk_fail.1
);
return Err(last_chunk_fail.1);
}
let uploads =
process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE).await;

// Check for errors
let total_uploads = uploads.len();
let ok_uploads = uploads
.iter()
.filter_map(|up| up.is_ok().then_some(()))
.count();
info!("Uploaded {} chunks out of {}", ok_uploads, total_uploads);
let uploads: Result<Vec<_>, _> = uploads.into_iter().collect();
uploads.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
let record_count = ok_uploads;

let record_count = chunks.len() + 1;

// Reporting
if let Some(channel) = self.client_event_sender.as_ref() {
Expand Down Expand Up @@ -273,4 +269,64 @@ impl Client {
);
Ok(total_cost)
}

// Upload chunks and retry failed uploads up to `RETRY_ATTEMPTS` times.
pub(crate) async fn upload_chunks_with_retries<'a>(
&self,
mut chunks: Vec<&'a Chunk>,
receipt: &HashMap<XorName, ProofOfPayment>,
) -> Vec<(&'a Chunk, PutError)> {
let mut current_attempt: usize = 1;

loop {
let mut upload_tasks = vec![];
for chunk in chunks {
let self_clone = self.clone();
let address = *chunk.address();

let Some(proof) = receipt.get(chunk.name()) else {
debug!("Chunk at {address:?} was already paid for so skipping");
continue;
};

upload_tasks.push(async move {
self_clone
.chunk_upload_with_payment(chunk, proof.clone())
.await
.inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}"))
// Return chunk reference too, to re-use it next attempt/iteration
.map_err(|err| (chunk, err))
});
}
let uploads =
process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE).await;

// Check for errors.
let total_uploads = uploads.len();
let uploads_failed: Vec<_> = uploads.into_iter().filter_map(|up| up.err()).collect();
info!(
"Uploaded {} chunks out of {total_uploads}",
total_uploads - uploads_failed.len()
);

// All uploads succeeded.
if uploads_failed.is_empty() {
return vec![];
}

// Max retries reached.
if current_attempt > RETRY_ATTEMPTS {
return uploads_failed;
}

tracing::info!(
"Retrying putting {} failed chunks (attempt {current_attempt}/3)",
uploads_failed.len()
);

// Re-iterate over the failed chunks
chunks = uploads_failed.into_iter().map(|(chunk, _)| chunk).collect();
current_attempt += 1;
}
}
}
45 changes: 15 additions & 30 deletions autonomi/src/client/data_private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ use serde::{Deserialize, Serialize};
use sn_evm::Amount;
use sn_protocol::storage::Chunk;

use super::data::CHUNK_UPLOAD_BATCH_SIZE;
use super::data::{GetError, PutError};
use crate::client::payment::PaymentOption;
use crate::client::utils::process_tasks_with_max_concurrency;
use crate::client::{ClientEvent, UploadSummary};
use crate::{self_encryption::encrypt, Client};

Expand Down Expand Up @@ -81,35 +79,22 @@ impl Client {

// Upload the chunks with the payments
debug!("Uploading {} chunks", chunks.len());
let mut upload_tasks = vec![];
for chunk in chunks {
let self_clone = self.clone();
let address = *chunk.address();
if let Some(proof) = receipt.get(chunk.name()) {
let proof_clone = proof.clone();
upload_tasks.push(async move {
self_clone
.chunk_upload_with_payment(chunk, proof_clone)
.await
.inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}"))
});
} else {
debug!("Chunk at {address:?} was already paid for so skipping");
}

let mut failed_uploads = self
.upload_chunks_with_retries(chunks.iter().collect(), &receipt)
.await;

// Return the last chunk upload error
if let Some(last_chunk_fail) = failed_uploads.pop() {
tracing::error!(
"Error uploading chunk ({:?}): {:?}",
last_chunk_fail.0.address(),
last_chunk_fail.1
);
return Err(last_chunk_fail.1);
}
let uploads =
process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE).await;

// Check for errors
let total_uploads = uploads.len();
let ok_uploads = uploads
.iter()
.filter_map(|up| up.is_ok().then_some(()))
.count();
info!("Uploaded {} chunks out of {}", ok_uploads, total_uploads);
let uploads: Result<Vec<_>, _> = uploads.into_iter().collect();
uploads.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
let record_count = ok_uploads;

let record_count = chunks.len();

// Reporting
if let Some(channel) = self.client_event_sender.as_ref() {
Expand Down
2 changes: 1 addition & 1 deletion autonomi/src/client/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl Client {

pub(crate) async fn chunk_upload_with_payment(
&self,
chunk: Chunk,
chunk: &Chunk,
payment: ProofOfPayment,
) -> Result<(), PutError> {
let storing_node = payment.to_peer_id_payee().expect("Missing node Peer ID");
Expand Down

0 comments on commit eda5837

Please sign in to comment.