diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index da6914f65b..fa94260975 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -627,6 +627,10 @@ jobs: CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }} timeout-minutes: 30 + # Sleep for a while to allow restarted nodes can be detected by others + - name: Sleep a while + run: sleep 300 + - name: Stop the local network and upload logs if: always() uses: maidsafe/sn-local-testnet-action@main @@ -653,6 +657,10 @@ jobs: rg "(\d+) matches" | rg "\d+" -o) echo "Restarted $restart_count nodes" + # `PeerRemovedFromRoutingTable` now only happens when a peer reported as `BadNode`. + # Otherwise kad will remove a `dropped out node` directly from RT. + # So, the detection of the removal explicity will now have much less chance, + # due to the removal of connection_issue tracking. - name: Get peers removed from nodes using rg shell: bash timeout-minutes: 1 @@ -665,24 +673,6 @@ jobs: fi echo "PeerRemovedFromRoutingTable $peer_removed times" - - name: Verify peers removed exceed restarted node counts - shell: bash - timeout-minutes: 1 - # get the counts, then the specific line, and then the digit count only - # then check we have an expected level of restarts - # TODO: make this use an env var, or relate to testnet size - run: | - restart_count=$(rg "Node is restarting in" "${{ matrix.node_data_path }}" -c --stats | \ - rg "(\d+) matches" | rg "\d+" -o) - echo "Restart $restart_count nodes" - peer_removed=$(rg "PeerRemovedFromRoutingTable" "${{ matrix.node_data_path }}" -c --stats | \ - rg "(\d+) matches" | rg "\d+" -o) - echo "PeerRemovedFromRoutingTable $peer_removed times" - if [ $peer_removed -lt $restart_count ]; then - echo "PeerRemovedFromRoutingTable times of: $peer_removed is less than the restart count of: $restart_count" - exit 1 - fi - # TODO: reenable this once the testnet dir creation is tidied up to avoid a large count here # if [ $restart_count -lt $node_count ]; then # echo "Restart count of: $restart_count is less than the node count of: $node_count" @@ -795,6 +785,10 @@ jobs: CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }} timeout-minutes: 5 + # Sleep for a while to allow restarted nodes can be detected by others + - name: Sleep a while + run: sleep 300 + - name: Stop the local network and upload logs if: always() uses: maidsafe/sn-local-testnet-action@main @@ -808,7 +802,11 @@ jobs: timeout-minutes: 1 # get the counts, then the specific line, and then the digit count only # then check we have an expected level of restarts - # TODO: make this use an env var, or relate to testnet size + # + # `PeerRemovedFromRoutingTable` now only happens when a peer reported as `BadNode`. + # Otherwise kad will remove a `dropped out node` directly from RT. + # So, the detection of the removal explicity will now have much less chance, + # due to the removal of connection_issue tracking. run: | restart_count=$(rg "Node is restarting in" "${{ matrix.node_data_path }}" -c --stats | \ rg "(\d+) matches" | rg "\d+" -o) @@ -816,8 +814,8 @@ jobs: peer_removed=$(rg "PeerRemovedFromRoutingTable" "${{ matrix.node_data_path }}" -c --stats | \ rg "(\d+) matches" | rg "\d+" -o) echo "PeerRemovedFromRoutingTable $peer_removed times" - if [ $peer_removed -lt $restart_count ]; then - echo "PeerRemovedFromRoutingTable times of: $peer_removed is less than the restart count of: $restart_count" + if [ -z "$peer_removed" ]; then + echo "No peer removal count found" exit 1 fi node_count=$(ls "${{ matrix.node_data_path }}" | wc -l) diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 1ad9c3e7a9..af80223a84 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -65,6 +65,7 @@ use std::{ fs, io::{Read, Write}, net::SocketAddr, + num::NonZeroUsize, path::PathBuf, }; use tokio::sync::{mpsc, oneshot}; @@ -130,6 +131,13 @@ const NETWORKING_CHANNEL_SIZE: usize = 10_000; /// Time before a Kad query times out if no response is received const KAD_QUERY_TIMEOUT_S: Duration = Duration::from_secs(10); +// Init during compilation, instead of runtime error that should never happen +// Option::expect will be stabilised as const in the future (https://github.com/rust-lang/rust/issues/67441) +const REPLICATION_FACTOR: NonZeroUsize = match NonZeroUsize::new(CLOSE_GROUP_SIZE) { + Some(v) => v, + None => panic!("CLOSE_GROUP_SIZE should not be zero"), +}; + /// The various settings to apply to when fetching a record from network #[derive(Clone)] pub struct GetRecordCfg { @@ -354,6 +362,7 @@ impl NetworkBuilder { .disjoint_query_paths(true) // Records never expire .set_record_ttl(None) + .set_replication_factor(REPLICATION_FACTOR) // Emit PUT events for validation prior to insertion into the RecordStore. // This is no longer needed as the record_storage::put now can carry out validation. // .set_record_filtering(KademliaStoreInserts::FilterBoth) @@ -437,6 +446,7 @@ impl NetworkBuilder { let _ = kad_cfg .set_kbucket_inserts(libp2p::kad::BucketInserts::Manual) .set_max_packet_size(MAX_PACKET_SIZE) + .set_replication_factor(REPLICATION_FACTOR) // Require iterative queries to use disjoint paths for increased resiliency in the presence of potentially adversarial nodes. .disjoint_query_paths(true); @@ -912,7 +922,7 @@ impl SwarmDriver { let farthest_peer_to_check = self .get_all_local_peers_excluding_self() .len() - .checked_div(3 * CLOSE_GROUP_SIZE) + .checked_div(5 * CLOSE_GROUP_SIZE) .unwrap_or(1); info!("Farthest peer we'll check: {:?}", farthest_peer_to_check); @@ -947,7 +957,7 @@ impl SwarmDriver { sorted_distances.sort_unstable(); - let median_index = sorted_distances.len() / 2; + let median_index = sorted_distances.len() / 8; let default = KBucketDistance::default(); let median = sorted_distances.get(median_index).cloned(); diff --git a/sn_networking/src/event/kad.rs b/sn_networking/src/event/kad.rs index 8e903a00ec..776d868e0d 100644 --- a/sn_networking/src/event/kad.rs +++ b/sn_networking/src/event/kad.rs @@ -493,128 +493,146 @@ impl SwarmDriver { /// SplitRecord if there are multiple content hash versions. fn handle_get_record_finished(&mut self, query_id: QueryId, step: ProgressStep) -> Result<()> { // return error if the entry cannot be found - if let Some((_key, senders, result_map, cfg)) = self.pending_get_record.remove(&query_id) { + if let Some((r_key, senders, result_map, cfg)) = self.pending_get_record.remove(&query_id) { let num_of_versions = result_map.len(); - let (result, log_string) = if let Some((record, from_peers)) = - result_map.values().next() - { - let data_key_address = NetworkAddress::from_record_key(&record.key); - let expected_get_range = self.get_request_range(); - - let we_have_searched_thoroughly = Self::have_we_have_searched_thoroughly_for_quorum( - expected_get_range, - from_peers, - &data_key_address, - &cfg.get_quorum, - ); - - let pretty_key = PrettyPrintRecordKey::from(&record.key); - info!("RANGE: {pretty_key:?} we_have_searched_far_enough: {we_have_searched_thoroughly:?}"); + let data_key_address = NetworkAddress::from_record_key(&r_key); + let expected_get_range = self.get_request_range(); + let all_seen_peers: HashSet<_> = result_map + .values() + .flat_map(|(_, peers)| peers) + .cloned() + .collect(); + let we_have_searched_thoroughly = Self::have_we_have_searched_thoroughly_for_quorum( + expected_get_range, + &all_seen_peers, + &data_key_address, + &cfg.get_quorum, + ); + + // we have a split record, return it + if num_of_versions > 1 { + warn!("RANGE: Multiple versions found over range"); + for sender in senders { + sender + .send(Err(GetRecordError::SplitRecord { + result_map: result_map.clone(), + })) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } - let result = if num_of_versions > 1 { - warn!("RANGE: more than one version found!"); - Err(GetRecordError::SplitRecord { - result_map: result_map.clone(), - }) - } else if we_have_searched_thoroughly { - warn!("RANGE: Get record finished: {pretty_key:?} Enough of the network has responded or it's not sensitive data... and we only have one copy..."); + for (record, _peers) in result_map.values() { + self.reput_data_to_range(record, &data_key_address, &all_seen_peers)?; + } - Ok(record.clone()) - } else { - // We have not searched enough of the network range. - let result = Err(GetRecordError::NotEnoughCopiesInRange { - record: record.clone(), - expected: get_quorum_value(&cfg.get_quorum), - got: from_peers.len(), - range: expected_get_range.ilog2().unwrap_or(0), - }); + return Ok(()); + } - // This should be a backstop... Quorum::All is the only one that enforces - // a full search of the network range. - if matches!(cfg.get_quorum, Quorum::All) { - warn!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has responded, we need to extend the range and PUT the data. {result:?}"); + // we have no results, bail + if num_of_versions == 0 { + warn!("RANGE: No versions found!"); + for sender in senders { + sender + .send(Err(GetRecordError::RecordNotFound)) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } + return Ok(()); + } - warn!("Reputting data to network {pretty_key:?}..."); + // if we have searched thoroughly, we can return the record + if num_of_versions == 1 { + let result = if let Some((record, peers)) = result_map.values().next() { + warn!("RANGE: one version found!"); + + if we_have_searched_thoroughly { + Ok(record.clone()) + } else { + self.reput_data_to_range(record, &data_key_address, &all_seen_peers)?; + Err(GetRecordError::NotEnoughCopiesInRange { + record: record.clone(), + expected: get_quorum_value(&cfg.get_quorum), + got: peers.len(), + range: expected_get_range.ilog2().unwrap_or(0), + }) + } + } else { + debug!("Getting record task {query_id:?} completed with step count {:?}, but no copy found.", step.count); + Err(GetRecordError::RecordNotFound) + }; + for sender in senders { + sender + .send(result.clone()) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } - // let's ensure we have an updated network view - self.trigger_network_discovery(); + #[cfg(feature = "open-metrics")] + if self.metrics_recorder.is_some() { + self.check_for_change_in_our_close_group(); + } + } + } else { + debug!("Can't locate query task {query_id:?} during GetRecord finished. We might have already returned the result to the sender."); + } + Ok(()) + } - warn!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has responded, we need PUT the data back into nodes in that range."); + /// Repost data to the network if we didn't get enough responses. + fn reput_data_to_range( + &mut self, + record: &Record, + data_key_address: &NetworkAddress, + // all peers who responded with any version of the record + from_peers: &HashSet, + ) -> Result<()> { + let pretty_key = PrettyPrintRecordKey::from(&record.key); + // This should be a backstop... Quorum::All is the only one that enforces + // a full search of the network range. + info!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has the record, or same state, we need to extend the range and PUT the data."); - let record_type = get_type_from_record(record)?; + info!("Reputting data to network {pretty_key:?}..."); - let replicate_targets: HashSet<_> = self - .get_filtered_peers_exceeding_range_or_closest_nodes(&data_key_address) - .iter() - .cloned() - .collect(); + warn!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has responded, we need PUT the data back into nodes in that range."); - if from_peers == &replicate_targets { - warn!("RANGE: {pretty_key:?} We asked everyone we know of in that range already!"); - } + let record_type = get_type_from_record(record)?; - // set holder to someone that has the data - let holder = NetworkAddress::from_peer( - from_peers - .iter() - .next() - .cloned() - .unwrap_or(self.self_peer_id), - ); + let replicate_targets: HashSet<_> = self + .get_filtered_peers_exceeding_range_or_closest_nodes(data_key_address) + .iter() + .cloned() + .collect(); - for peer in replicate_targets { - warn!("Reputting data to {peer:?} for {pretty_key:?} if needed..."); - // Do not send to any peer that has already informed us - if from_peers.contains(&peer) { - continue; - } - - debug!("RANGE: (insufficient, so ) Sending data to unresponded peer: {peer:?} for {pretty_key:?}"); - - // nodes will try/fail to trplicate it from us, but grab from the network thereafter - self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest { - req: Request::Cmd(Cmd::Replicate { - holder: holder.clone(), - keys: vec![(data_key_address.clone(), record_type.clone())], - }), - peer, - sender: None, - }); - } - } + if from_peers == &replicate_targets { + warn!("RANGE: {pretty_key:?} We asked everyone we know of in that range already!"); + } - result - }; + // set holder to someone that has the data + let holder = NetworkAddress::from_peer( + from_peers + .iter() + .next() + .cloned() + .unwrap_or(self.self_peer_id), + ); - ( - result, - format!("Getting record {:?} completed with only {:?} copies received, and {num_of_versions} versions.", - PrettyPrintRecordKey::from(&record.key), usize::from(step.count) - 1) - ) - } else { - ( - Err(GetRecordError::RecordNotFound), - format!("Getting record task {query_id:?} completed with step count {:?}, but no copy found.", step.count), - ) - }; - - if cfg.expected_holders.is_empty() { - debug!("{log_string}"); - } else { - debug!( - "{log_string}, and {:?} expected holders not responded", - cfg.expected_holders - ); + for peer in replicate_targets { + warn!("Reputting data to {peer:?} for {pretty_key:?} if needed..."); + // Do not send to any peer that has already informed us + if from_peers.contains(&peer) { + continue; } - for sender in senders { - sender - .send(result.clone()) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; - } - } else { - debug!("Can't locate query task {query_id:?} during GetRecord finished. We might have already returned the result to the sender."); + debug!("RANGE: (insufficient, so ) Sending data to unresponded peer: {peer:?} for {pretty_key:?}"); + + // nodes will try/fail to trplicate it from us, but grab from the network thereafter + self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest { + req: Request::Cmd(Cmd::Replicate { + holder: holder.clone(), + keys: vec![(data_key_address.clone(), record_type.clone())], + }), + peer, + sender: None, + }); } + Ok(()) } diff --git a/sn_networking/src/event/request_response.rs b/sn_networking/src/event/request_response.rs index ca6808ed1b..b0c9344724 100644 --- a/sn_networking/src/event/request_response.rs +++ b/sn_networking/src/event/request_response.rs @@ -196,6 +196,7 @@ impl SwarmDriver { ) { let peers = self.get_all_local_peers_excluding_self(); let our_peer_id = self.self_peer_id; + let more_than_one_key = incoming_keys.len() > 1; let holder = if let Some(peer_id) = sender.as_peer_id() { peer_id @@ -241,56 +242,60 @@ impl SwarmDriver { } let event_sender = self.event_sender.clone(); - let _handle = tokio::spawn(async move { - let keys_to_verify = - Self::select_verification_data_candidates(&peers, &all_keys, &sender); + if more_than_one_key && OsRng.gen_bool(0.1) { + let _handle = tokio::spawn(async move { + // Only run 10% of the time + let keys_to_verify = + Self::select_verification_data_candidates(&peers, &all_keys, &sender); - if keys_to_verify.is_empty() { - debug!("No valid candidate to be checked against peer {holder:?}"); - } else if let Err(error) = event_sender - .send(NetworkEvent::ChunkProofVerification { - peer_id: holder, - keys_to_verify, - }) - .await - { - error!("SwarmDriver failed to send event: {}", error); - } + if keys_to_verify.is_empty() { + debug!("No valid candidate to be checked against peer {holder:?}"); + } else if let Err(error) = event_sender + .send(NetworkEvent::ChunkProofVerification { + peer_id: holder, + keys_to_verify, + }) + .await + { + error!("SwarmDriver failed to send event: {}", error); + } - // In additon to verify the sender, we also verify a random close node. - // This is to avoid malicious node escaping the check by never send a replication_list. - // With further reduced probability of 1% (5% * 20%) - let close_group_peers = sort_peers_by_address_and_limit( - &peers, - &NetworkAddress::from_peer(our_peer_id), - CLOSE_GROUP_SIZE, - ) - .unwrap_or_default(); + // In additon to verify the sender, we also verify a random close node. + // This is to avoid malicious node escaping the check by never send a replication_list. + // With further reduced probability of 1% (5% * 20%) + let close_group_peers = sort_peers_by_address_and_limit( + &peers, + &NetworkAddress::from_peer(our_peer_id), + CLOSE_GROUP_SIZE, + ) + .unwrap_or_default(); - loop { - let index: usize = OsRng.gen_range(0..close_group_peers.len()); - let candidate_peer_id = *close_group_peers[index]; - let candidate = NetworkAddress::from_peer(*close_group_peers[index]); - if sender != candidate { - let keys_to_verify = - Self::select_verification_data_candidates(&peers, &all_keys, &candidate); + loop { + let index: usize = OsRng.gen_range(0..close_group_peers.len()); + let candidate_peer_id = *close_group_peers[index]; + let candidate = NetworkAddress::from_peer(*close_group_peers[index]); + if sender != candidate { + let keys_to_verify = Self::select_verification_data_candidates( + &peers, &all_keys, &candidate, + ); - if keys_to_verify.is_empty() { - debug!("No valid candidate to be checked against peer {candidate:?}"); - } else if let Err(error) = event_sender - .send(NetworkEvent::ChunkProofVerification { - peer_id: candidate_peer_id, - keys_to_verify, - }) - .await - { - error!("SwarmDriver failed to send event: {}", error); - } + if keys_to_verify.is_empty() { + debug!("No valid candidate to be checked against peer {candidate:?}"); + } else if let Err(error) = event_sender + .send(NetworkEvent::ChunkProofVerification { + peer_id: candidate_peer_id, + keys_to_verify, + }) + .await + { + error!("SwarmDriver failed to send event: {}", error); + } - break; + break; + } } - } - }); + }); + } } /// Check among all chunk type records that we have, select those close to the peer, diff --git a/sn_networking/src/event/swarm.rs b/sn_networking/src/event/swarm.rs index 2416b5681c..90a3939f47 100644 --- a/sn_networking/src/event/swarm.rs +++ b/sn_networking/src/event/swarm.rs @@ -7,8 +7,8 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - cmd::NetworkSwarmCmd, event::NodeEvent, multiaddr_is_global, multiaddr_strip_p2p, - relay_manager::is_a_relayed_peer, target_arch::Instant, NetworkEvent, Result, SwarmDriver, + event::NodeEvent, multiaddr_is_global, multiaddr_strip_p2p, relay_manager::is_a_relayed_peer, + target_arch::Instant, NetworkEvent, Result, SwarmDriver, }; #[cfg(feature = "local")] use libp2p::mdns; @@ -25,7 +25,7 @@ use libp2p::{ }; use sn_protocol::version::{IDENTIFY_NODE_VERSION_STR, IDENTIFY_PROTOCOL_STR}; use std::collections::HashSet; -use tokio::{sync::oneshot, time::Duration}; +use tokio::time::Duration; impl SwarmDriver { /// Handle `SwarmEvents` @@ -514,15 +514,6 @@ impl SwarmDriver { self.update_on_peer_removal(*dead_peer.node.key.preimage()); } } - - if !should_clean_peer { - // lets try and redial. - for addr in failed_peer_addresses { - let (sender, _recv) = oneshot::channel(); - - self.queue_network_swarm_cmd(NetworkSwarmCmd::Dial { addr, sender }); - } - } } SwarmEvent::IncomingConnectionError { connection_id,