Skip to content

Commit

Permalink
chore(networking): remove REPLICATION_PEER_COUNT
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef committed Oct 16, 2024
1 parent 2499b46 commit adae8c5
Show file tree
Hide file tree
Showing 24 changed files with 1,048 additions and 792 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,10 @@ jobs:
echo "EVM_NETWORK has been set to $EVM_NETWORK"
fi
- name: Wait from network to stabilise
shell: bash
run: sleep 30

- name: Verify the routing tables of the nodes
run: cargo test --release -p sn_node --features="local" --test verify_routing_table -- --nocapture
env:
Expand All @@ -768,7 +772,7 @@ jobs:
uses: maidsafe/sn-local-testnet-action@main
with:
action: stop
log_file_prefix: safe_test_logs_data_location
log_file_prefix: safe_test_logs_data_location_routing_table
platform: ${{ matrix.os }}

- name: Verify restart of nodes using rg
Expand Down
29 changes: 20 additions & 9 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::{
use bls::{PublicKey, SecretKey, Signature};
use libp2p::{
identity::Keypair,
kad::{Quorum, Record},
kad::{KBucketDistance, Quorum, Record},
Multiaddr, PeerId,
};
use rand::{thread_rng, Rng};
Expand Down Expand Up @@ -305,6 +305,11 @@ impl Client {
self.events_broadcaster.subscribe()
}

/// Return the underlying network GetRange
pub async fn get_range(&self) -> Result<KBucketDistance> {
self.network.get_range().await.map_err(Error::from)
}

/// Sign the given data.
///
/// # Arguments
Expand Down Expand Up @@ -823,18 +828,26 @@ impl Client {

// When there is retry on Put side, no need to have a retry on Get
let verification_cfg = GetRecordCfg {
get_quorum: Quorum::Majority,
get_quorum: Quorum::All,
retry_strategy: None,
target_record: record_to_verify,
expected_holders,
is_register: false,
};

let verification = if verify_store {
Some((VerificationKind::Network, verification_cfg))
} else {
None
};

let put_cfg = PutRecordCfg {
put_quorum: Quorum::Majority,
put_quorum: Quorum::All,
retry_strategy: Some(RetryStrategy::Persistent),
use_put_record_to: None,
verification: Some((VerificationKind::Network, verification_cfg)),
verification,
};

Ok(self.network.put_record(record, &put_cfg).await?)
}

Expand Down Expand Up @@ -871,7 +884,7 @@ impl Client {
self.try_fetch_spend_from_network(
address,
GetRecordCfg {
get_quorum: Quorum::Majority,
get_quorum: Quorum::All,
retry_strategy: Some(RetryStrategy::Balanced),
target_record: None,
expected_holders: Default::default(),
Expand Down Expand Up @@ -904,7 +917,7 @@ impl Client {
self.try_fetch_spend_from_network(
address,
GetRecordCfg {
get_quorum: Quorum::Majority,
get_quorum: Quorum::All,
retry_strategy: None,
target_record: None,
expected_holders: Default::default(),
Expand Down Expand Up @@ -961,9 +974,7 @@ impl Client {
}
Err(err) => {
warn!("Invalid signed spend got from network for {address:?}: {err:?}.");
Err(Error::CouldNotVerifyTransfer(format!(
"Verification failed for spent at {address:?} with error {err:?}"
)))
Err(Error::from(err))
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions sn_client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::UploadSummary;
use super::ClientEvent;
use sn_protocol::NetworkAddress;
use sn_registers::{Entry, EntryHash};
use sn_transfers::SpendAddress;
use std::collections::BTreeSet;
use thiserror::Error;
use tokio::time::Duration;
Expand Down Expand Up @@ -45,6 +46,9 @@ pub enum Error {
#[error("Chunks error {0}.")]
Chunks(#[from] super::chunks::Error),

#[error("No cashnote found at {0:?}.")]
NoCashNoteFound(SpendAddress),

#[error("Decrypting a Folder's item failed: {0}")]
FolderEntryDecryption(EntryHash),

Expand All @@ -63,9 +67,6 @@ pub enum Error {
#[error(transparent)]
JoinError(#[from] tokio::task::JoinError),

/// A general error when verifying a transfer validity in the network.
#[error("Failed to verify transfer validity in the network {0}")]
CouldNotVerifyTransfer(String),
#[error("Invalid DAG")]
InvalidDag,
#[error("Serialization error: {0:?}")]
Expand Down
2 changes: 1 addition & 1 deletion sn_client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::{info, warn};
pub const AMOUNT_TO_FUND_WALLETS: u64 = 100 * 1_000_000_000;

// The number of times to try to load the faucet wallet
const LOAD_FAUCET_WALLET_RETRIES: usize = 6;
const LOAD_FAUCET_WALLET_RETRIES: usize = 10;

// mutex to restrict access to faucet wallet from concurrent tests
static FAUCET_WALLET_MUTEX: Mutex<()> = Mutex::const_new(());
Expand Down
38 changes: 28 additions & 10 deletions sn_client/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,9 +1009,7 @@ impl Client {
}

if cash_notes.is_empty() {
return Err(WalletError::CouldNotVerifyTransfer(
"All the redeemed CashNotes are already spent".to_string(),
));
return Err(WalletError::AllRedeemedCashnotesSpent);
}

Ok(cash_notes)
Expand Down Expand Up @@ -1049,23 +1047,40 @@ impl Client {
/// # }
/// ```
pub async fn verify_cashnote(&self, cash_note: &CashNote) -> WalletResult<()> {
let address = SpendAddress::from_unique_pubkey(&cash_note.unique_pubkey());

// We need to get all the spends in the cash_note from the network,
// and compare them to the spends in the cash_note, to know if the
// transfer is considered valid in the network.
let mut tasks = Vec::new();

info!(
"parent spends for cn; {address:?}: {:?}",
&cash_note.parent_spends.len()
);

for spend in &cash_note.parent_spends {
let address = SpendAddress::from_unique_pubkey(spend.unique_pubkey());
debug!(
"Getting spend for pubkey {:?} from network at {address:?}",
warn!(
"Getting parent spend for cn {address:?} pubkey {:?} from network at {address:?}",
spend.unique_pubkey()
);
tasks.push(self.get_spend_from_network(address));
}

let mut received_spends = std::collections::BTreeSet::new();
for result in join_all(tasks).await {
let network_valid_spend =
result.map_err(|err| WalletError::CouldNotVerifyTransfer(err.to_string()))?;
let network_valid_spend = match result {
Ok(spend) => Ok(spend),
Err(error) => match error {
Error::Network(sn_networking::NetworkError::DoubleSpendAttempt(spends)) => {
warn!("DoubleSpentAttempt found with {spends:?}");
Err(WalletError::BurntSpend)
}
err => Err(WalletError::CouldNotVerifyTransfer(format!("{err:?}"))),
},
}?;

let _ = received_spends.insert(network_valid_spend);
}

Expand All @@ -1074,9 +1089,12 @@ impl Client {
if received_spends == cash_note.parent_spends {
return Ok(());
}
Err(WalletError::CouldNotVerifyTransfer(
"The spends in network were not the same as the ones in the CashNote. The parents of this CashNote are probably double spends.".into(),
))

warn!(
"Unexpected parent spends found in CashNote verification at {:?}: {received_spends:?}.",
address
);
Err(WalletError::UnexpectedParentSpends(address))
}
}

Expand Down
120 changes: 11 additions & 109 deletions sn_networking/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,19 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{driver::PendingGetClosestType, SwarmDriver};
use rand::{rngs::OsRng, Rng};
use tokio::time::Duration;

use crate::target_arch::{interval, Instant, Interval};
use crate::target_arch::Instant;

/// The default interval at which NetworkDiscovery is triggered. The interval is increased as more peers are added to the
/// routing table.
pub(crate) const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(10);

/// Every BOOTSTRAP_CONNECTED_PEERS_STEP connected peer, we step up the BOOTSTRAP_INTERVAL to slow down bootstrapping
/// process
const BOOTSTRAP_CONNECTED_PEERS_STEP: u32 = 5;

/// If the previously added peer has been before LAST_PEER_ADDED_TIME_LIMIT, then we should slowdown the bootstrapping
/// process. This is to make sure we don't flood the network with `FindNode` msgs.
const LAST_PEER_ADDED_TIME_LIMIT: Duration = Duration::from_secs(180);

/// A minimum interval to prevent bootstrap got triggered too often
const LAST_BOOTSTRAP_TRIGGERED_TIME_LIMIT: Duration = Duration::from_secs(30);

/// The bootstrap interval to use if we haven't added any new peers in a while.
const NO_PEER_ADDED_SLOWDOWN_INTERVAL_MAX_S: u64 = 600;
pub(crate) const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(30);

impl SwarmDriver {
/// This functions triggers network discovery based on when the last peer was added to the RT and the number of
/// peers in RT. The function also returns a new bootstrap interval that is proportional to the number of
/// peers in RT, so more peers in RT, the longer the interval.
pub(crate) async fn run_bootstrap_continuously(
&mut self,
current_bootstrap_interval: Duration,
) -> Option<Interval> {
let (should_bootstrap, new_interval) = self
.bootstrap
.should_we_bootstrap(self.peers_in_rt as u32, current_bootstrap_interval)
.await;
if should_bootstrap {
self.trigger_network_discovery();
}
new_interval
/// peers in RT.
pub(crate) fn run_bootstrap_continuously(&mut self) {
self.trigger_network_discovery();
}

pub(crate) fn trigger_network_discovery(&mut self) {
Expand All @@ -61,27 +35,27 @@ impl SwarmDriver {
.get_closest_peers(addr.as_bytes());
let _ = self.pending_get_closest_peers.insert(
query_id,
(PendingGetClosestType::NetworkDiscovery, Default::default()),
(
addr,
PendingGetClosestType::NetworkDiscovery,
Default::default(),
),
);
}

self.bootstrap.initiated();
debug!("Trigger network discovery took {:?}", now.elapsed());
info!("Trigger network discovery took {:?}", now.elapsed());
}
}

/// Tracks and helps with the continuous kad::bootstrapping process
pub(crate) struct ContinuousBootstrap {
initial_bootstrap_done: bool,
last_peer_added_instant: Instant,
last_bootstrap_triggered: Option<Instant>,
}

impl ContinuousBootstrap {
pub(crate) fn new() -> Self {
Self {
initial_bootstrap_done: false,
last_peer_added_instant: Instant::now(),
last_bootstrap_triggered: None,
}
}
Expand All @@ -90,76 +64,4 @@ impl ContinuousBootstrap {
pub(crate) fn initiated(&mut self) {
self.last_bootstrap_triggered = Some(Instant::now());
}

/// Notify about a newly added peer to the RT. This will help with slowing down the bootstrap process.
/// Returns `true` if we have to perform the initial bootstrapping.
pub(crate) fn notify_new_peer(&mut self) -> bool {
self.last_peer_added_instant = Instant::now();
// true to kick off the initial bootstrapping. `run_bootstrap_continuously` might kick of so soon that we might
// not have a single peer in the RT and we'd not perform any bootstrapping for a while.
if !self.initial_bootstrap_done {
self.initial_bootstrap_done = true;
true
} else {
false
}
}

/// Returns `true` if we should carry out the Kademlia Bootstrap process immediately.
/// Also optionally returns the new interval to re-bootstrap.
pub(crate) async fn should_we_bootstrap(
&self,
peers_in_rt: u32,
current_interval: Duration,
) -> (bool, Option<Interval>) {
let is_ongoing = if let Some(last_bootstrap_triggered) = self.last_bootstrap_triggered {
last_bootstrap_triggered.elapsed() < LAST_BOOTSTRAP_TRIGGERED_TIME_LIMIT
} else {
false
};
let should_bootstrap = !is_ongoing && peers_in_rt >= 1;

// if it has been a while (LAST_PEER_ADDED_TIME_LIMIT) since we have added a new peer to our RT, then, slowdown
// the bootstrapping process.
// Don't slow down if we haven't even added one peer to our RT.
if self.last_peer_added_instant.elapsed() > LAST_PEER_ADDED_TIME_LIMIT && peers_in_rt != 0 {
// To avoid a heart beat like cpu usage due to the 1K candidates generation,
// randomize the interval within certain range
let no_peer_added_slowdown_interval: u64 = OsRng.gen_range(
NO_PEER_ADDED_SLOWDOWN_INTERVAL_MAX_S / 2..NO_PEER_ADDED_SLOWDOWN_INTERVAL_MAX_S,
);
let no_peer_added_slowdown_interval_duration =
Duration::from_secs(no_peer_added_slowdown_interval);
info!(
"It has been {LAST_PEER_ADDED_TIME_LIMIT:?} since we last added a peer to RT. Slowing down the continuous bootstrapping process. Old interval: {current_interval:?}, New interval: {no_peer_added_slowdown_interval_duration:?}"
);

// `Interval` ticks immediately for Tokio, but not for `wasmtimer`, which is used for wasm32.
#[cfg_attr(target_arch = "wasm32", allow(unused_mut))]
let mut new_interval = interval(no_peer_added_slowdown_interval_duration);
#[cfg(not(target_arch = "wasm32"))]
new_interval.tick().await;

return (should_bootstrap, Some(new_interval));
}

// increment bootstrap_interval in steps of BOOTSTRAP_INTERVAL every BOOTSTRAP_CONNECTED_PEERS_STEP
let step = peers_in_rt / BOOTSTRAP_CONNECTED_PEERS_STEP;
let step = std::cmp::max(1, step);
let new_interval = BOOTSTRAP_INTERVAL * step;
let new_interval = if new_interval > current_interval {
info!("More peers have been added to our RT!. Slowing down the continuous bootstrapping process. Old interval: {current_interval:?}, New interval: {new_interval:?}");

// `Interval` ticks immediately for Tokio, but not for `wasmtimer`, which is used for wasm32.
#[cfg_attr(target_arch = "wasm32", allow(unused_mut))]
let mut interval = interval(new_interval);
#[cfg(not(target_arch = "wasm32"))]
interval.tick().await;

Some(interval)
} else {
None
};
(should_bootstrap, new_interval)
}
}
Loading

0 comments on commit adae8c5

Please sign in to comment.