Skip to content

Commit

Permalink
feat(sn_networking): use wasm compatible retry
Browse files Browse the repository at this point in the history
Previously the `backoff` crate was used which is not compatible with wasm and futures (though the docs suggest there is compatibility, but that seems to be without futures).

The retry strategy is adjusted, but I have attempted to keep the end result similar if not the same.
  • Loading branch information
b-zee committed Nov 5, 2024
1 parent 29ba387 commit 75ab499
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 186 deletions.
26 changes: 11 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sn_networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ libp2p = { version = "0.54.1", features = [
] }
async-trait = "0.1"
bytes = { version = "1.0.1", features = ["serde"] }
exponential-backoff = "2.0.0"
futures = "~0.3.13"
hex = "~0.4.3"
hyper = { version = "0.14", features = [
Expand Down Expand Up @@ -71,7 +72,6 @@ tokio = { version = "1.32.0", features = [
] }
tracing = { version = "~0.1.26" }
xor_name = "5.0.0"
backoff = { version = "0.4.0", features = ["tokio"] }
aes-gcm-siv = "0.11.1"
walkdir = "~2.5.0"
strum = { version = "0.26.2", features = ["derive"] }
Expand Down
234 changes: 96 additions & 138 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,9 @@ impl Network {
quorum: Quorum,
retry_strategy: Option<RetryStrategy>,
) -> Result<()> {
let mut total_attempts = 1;
total_attempts += retry_strategy
.map(|strategy| strategy.get_count())
.unwrap_or(0);
let total_attempts = retry_strategy
.map(|strategy| strategy.attempts())
.unwrap_or(1);

let pretty_key = PrettyPrintRecordKey::from(&chunk_address.to_record_key()).into_owned();
let expected_n_verified = get_quorum_value(&quorum);
Expand Down Expand Up @@ -647,30 +646,6 @@ impl Network {
Ok(all_register_copies)
}

/// Get a record from the network
/// This differs from non-wasm32 builds as no retries are applied
#[cfg(target_arch = "wasm32")]
pub async fn get_record_from_network(
&self,
key: RecordKey,
cfg: &GetRecordCfg,
) -> Result<Record> {
let pretty_key = PrettyPrintRecordKey::from(&key);
info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord {
key: key.clone(),
sender,
cfg: cfg.clone(),
});
let result = receiver.await.map_err(|e| {
error!("When fetching record {pretty_key:?}, encountered a channel error {e:?}");
NetworkError::InternalMsgChannelDropped
})?;

result.map_err(NetworkError::from)
}

/// Get the Record from the network
/// Carry out re-attempts if required
/// In case a target_record is provided, only return when fetched target.
Expand All @@ -679,93 +654,92 @@ impl Network {
/// It also handles the split record error for spends and registers.
/// For spends, it accumulates the spends and returns an error if more than one.
/// For registers, it merges the registers and returns the merged record.
#[cfg(not(target_arch = "wasm32"))]
pub async fn get_record_from_network(
&self,
key: RecordKey,
cfg: &GetRecordCfg,
) -> Result<Record> {
let retry_duration = cfg.retry_strategy.map(|strategy| strategy.get_duration());
backoff::future::retry(
backoff::ExponentialBackoff {
// None sets a random duration, but we'll be terminating with a BackoffError::Permanent, so retry will
// be disabled.
max_elapsed_time: retry_duration,
..Default::default()
},
|| async {
let pretty_key = PrettyPrintRecordKey::from(&key);
info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord {
key: key.clone(),
sender,
cfg: cfg.clone(),
});
let result = receiver.await.map_err(|e| {
error!("When fetching record {pretty_key:?}, encountered a channel error {e:?}");
NetworkError::InternalMsgChannelDropped
}).map_err(|err| backoff::Error::Transient { err, retry_after: None })?;

// log the results
match &result {
Ok(_) => {
info!("Record returned: {pretty_key:?}.");
}
Err(GetRecordError::RecordDoesNotMatch(_)) => {
warn!("The returned record does not match target {pretty_key:?}.");
}
Err(GetRecordError::NotEnoughCopiesInRange { expected, got, .. }) => {
warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}.");
}
// libp2p RecordNotFound does mean no holders answered.
// it does not actually mean the record does not exist.
// just that those asked did not have it
Err(GetRecordError::RecordNotFound) => {
warn!("No holder of record '{pretty_key:?}' found.");
}
// This is returned during SplitRecordError, we should not get this error here.
Err(GetRecordError::RecordKindMismatch) => {
error!("Record kind mismatch for {pretty_key:?}. This error should not happen here.");
}
Err(GetRecordError::SplitRecord { result_map }) => {
error!("Encountered a split record for {pretty_key:?}.");
if let Some(record) = Self::handle_split_record_error(result_map, &key)? {
info!("Merged the split record (register) for {pretty_key:?}, into a single record");
return Ok(record);
}
}
Err(GetRecordError::QueryTimeout) => {
error!("Encountered query timeout for {pretty_key:?}.");
}
};
let pretty_key = PrettyPrintRecordKey::from(&key);
let mut backoff = cfg
.retry_strategy
.unwrap_or(RetryStrategy::None)
.backoff()
.into_iter();

loop {
info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",);
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord {
key: key.clone(),
sender,
cfg: cfg.clone(),
});
let result = match receiver.await {
Ok(result) => result,
Err(err) => {
error!(
"When fetching record {pretty_key:?}, encountered a channel error {err:?}"
);
// Do not attempt retries.
return Err(NetworkError::InternalMsgChannelDropped);
}
};

// if we don't want to retry, throw permanent error
if cfg.retry_strategy.is_none() {
if let Err(e) = result {
return Err(backoff::Error::Permanent(NetworkError::from(e)));
let err = match result {
Ok(record) => {
info!("Record returned: {pretty_key:?}.");
return Ok(record);
}
Err(err) => err,
};

// log the results
match &err {
GetRecordError::RecordDoesNotMatch(_) => {
warn!("The returned record does not match target {pretty_key:?}.");
}
GetRecordError::NotEnoughCopiesInRange { expected, got, .. } => {
warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}.");
}
// libp2p RecordNotFound does mean no holders answered.
// it does not actually mean the record does not exist.
// just that those asked did not have it
GetRecordError::RecordNotFound => {
warn!("No holder of record '{pretty_key:?}' found.");
}
// This is returned during SplitRecordError, we should not get this error here.
GetRecordError::RecordKindMismatch => {
error!("Record kind mismatch for {pretty_key:?}. This error should not happen here.");
}
GetRecordError::SplitRecord { result_map } => {
error!("Encountered a split record for {pretty_key:?}.");
if let Some(record) = Self::handle_split_record_error(result_map, &key)? {
info!("Merged the split record (register) for {pretty_key:?}, into a single record");
return Ok(record);
}
}
if result.is_err() {
GetRecordError::QueryTimeout => {
error!("Encountered query timeout for {pretty_key:?}.");
}
}

match backoff.next() {
Some(Some(duration)) => {
crate::target_arch::sleep(duration).await;
debug!("Getting record from network of {pretty_key:?} via backoff...");
}
result.map_err(|err| backoff::Error::Transient {
err: NetworkError::from(err),
retry_after: None,
})
},
)
.await
_ => break Err(err.into()),
}
}
}

/// Handle the split record error.
/// Spend: Accumulate spends and return error if more than one.
/// Register: Merge registers and return the merged record.
#[cfg(not(target_arch = "wasm32"))]
fn handle_split_record_error(
result_map: &HashMap<XorName, (Record, HashSet<PeerId>)>,
key: &RecordKey,
) -> std::result::Result<Option<Record>, backoff::Error<NetworkError>> {
) -> std::result::Result<Option<Record>, NetworkError> {
let pretty_key = PrettyPrintRecordKey::from(key);

// attempt to deserialise and accumulate any spends or registers
Expand All @@ -783,9 +757,9 @@ impl Network {
let kind = record_kind.get_or_insert(header.kind);
if *kind != header.kind {
error!("Encountered a split record for {pretty_key:?} with different RecordHeaders. Expected {kind:?} but got {:?}",header.kind);
return Err(backoff::Error::Permanent(NetworkError::GetRecordError(
return Err(NetworkError::GetRecordError(
GetRecordError::RecordKindMismatch,
)));
));
}

// Accumulate the spends
Expand Down Expand Up @@ -832,9 +806,7 @@ impl Network {
info!("For record {pretty_key:?} task found split record for a spend, accumulated and sending them as a single record");
let accumulated_spends = accumulated_spends.into_iter().collect::<Vec<SignedSpend>>();

return Err(backoff::Error::Permanent(NetworkError::DoubleSpendAttempt(
accumulated_spends,
)));
return Err(NetworkError::DoubleSpendAttempt(accumulated_spends));
} else if !collected_registers.is_empty() {
info!("For record {pretty_key:?} task found multiple registers, merging them.");
let signed_register = collected_registers.iter().fold(collected_registers[0].clone(), |mut acc, x| {
Expand All @@ -849,7 +821,7 @@ impl Network {
error!(
"Error while serializing the merged register for {pretty_key:?}: {err:?}"
);
backoff::Error::Permanent(NetworkError::from(err))
NetworkError::from(err)
})?
.to_vec();

Expand Down Expand Up @@ -907,49 +879,35 @@ impl Network {

/// Put `Record` to network
/// Optionally verify the record is stored after putting it to network
/// If verify is on, retry multiple times within MAX_PUT_RETRY_DURATION duration.
#[cfg(target_arch = "wasm32")]
pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
let pretty_key = PrettyPrintRecordKey::from(&record.key);

info!("Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}");
self.put_record_once(record.clone(), cfg).await
}

/// Put `Record` to network
/// Optionally verify the record is stored after putting it to network
/// If verify is on, retry multiple times within MAX_PUT_RETRY_DURATION duration.
#[cfg(not(target_arch = "wasm32"))]
/// If verify is on, we retry.
pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
let mut backoff = cfg
.retry_strategy
.unwrap_or(RetryStrategy::None)
.backoff()
.into_iter();

// Here we only retry after a failed validation.
// So a long validation time will limit the number of PUT retries we attempt here.
let retry_duration = cfg.retry_strategy.map(|strategy| strategy.get_duration());
backoff::future::retry(
backoff::ExponentialBackoff {
// None sets a random duration, but we'll be terminating with a BackoffError::Permanent, so retry will
// be disabled.
max_elapsed_time: retry_duration,
..Default::default()
}, || async {

loop {
info!(
"Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}, retrying via backoff..."
);
self.put_record_once(record.clone(), cfg).await.map_err(|err|
{
// FIXME: Skip if we get a permanent error during verification, e.g., DoubleSpendAttempt
warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}");

if cfg.retry_strategy.is_some() {
backoff::Error::Transient { err, retry_after: None }
} else {
backoff::Error::Permanent(err)
}
let err = match self.put_record_once(record.clone(), cfg).await {
Ok(_) => break Ok(()),
Err(err) => err,
};

})
}).await
// FIXME: Skip if we get a permanent error during verification, e.g., DoubleSpendAttempt

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}");

match backoff.next() {
Some(Some(duration)) => {
crate::target_arch::sleep(duration).await;
}
_ => break Err(err),
}
}
}

async fn put_record_once(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions sn_protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tracing = { version = "~0.1.26" }
prost = { version = "0.9" , optional=true }
tonic = { version = "0.6.2", optional=true, default-features = false, features = ["prost", "tls", "codegen"]}
xor_name = "5.0.0"
exponential-backoff = "2.0.0"


[build-dependencies]
Expand Down
Loading

0 comments on commit 75ab499

Please sign in to comment.