Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Less changes for cpu usage #2307

Merged
merged 9 commits into from
Oct 24, 2024
40 changes: 19 additions & 21 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,10 @@ jobs:
CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }}
timeout-minutes: 30

# Sleep for a while to allow restarted nodes can be detected by others
- name: Sleep a while
run: sleep 300

- name: Stop the local network and upload logs
if: always()
uses: maidsafe/sn-local-testnet-action@main
Expand All @@ -653,6 +657,10 @@ jobs:
rg "(\d+) matches" | rg "\d+" -o)
echo "Restarted $restart_count nodes"

# `PeerRemovedFromRoutingTable` now only happens when a peer reported as `BadNode`.
# Otherwise kad will remove a `dropped out node` directly from RT.
# So, the detection of the removal explicity will now have much less chance,
# due to the removal of connection_issue tracking.
- name: Get peers removed from nodes using rg
shell: bash
timeout-minutes: 1
Expand All @@ -665,24 +673,6 @@ jobs:
fi
echo "PeerRemovedFromRoutingTable $peer_removed times"

- name: Verify peers removed exceed restarted node counts
shell: bash
timeout-minutes: 1
# get the counts, then the specific line, and then the digit count only
# then check we have an expected level of restarts
# TODO: make this use an env var, or relate to testnet size
run: |
restart_count=$(rg "Node is restarting in" "${{ matrix.node_data_path }}" -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "Restart $restart_count nodes"
peer_removed=$(rg "PeerRemovedFromRoutingTable" "${{ matrix.node_data_path }}" -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "PeerRemovedFromRoutingTable $peer_removed times"
if [ $peer_removed -lt $restart_count ]; then
echo "PeerRemovedFromRoutingTable times of: $peer_removed is less than the restart count of: $restart_count"
exit 1
fi

# TODO: reenable this once the testnet dir creation is tidied up to avoid a large count here
# if [ $restart_count -lt $node_count ]; then
# echo "Restart count of: $restart_count is less than the node count of: $node_count"
Expand Down Expand Up @@ -795,6 +785,10 @@ jobs:
CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }}
timeout-minutes: 5

# Sleep for a while to allow restarted nodes can be detected by others
- name: Sleep a while
run: sleep 300

- name: Stop the local network and upload logs
if: always()
uses: maidsafe/sn-local-testnet-action@main
Expand All @@ -808,16 +802,20 @@ jobs:
timeout-minutes: 1
# get the counts, then the specific line, and then the digit count only
# then check we have an expected level of restarts
# TODO: make this use an env var, or relate to testnet size
#
# `PeerRemovedFromRoutingTable` now only happens when a peer reported as `BadNode`.
# Otherwise kad will remove a `dropped out node` directly from RT.
# So, the detection of the removal explicity will now have much less chance,
# due to the removal of connection_issue tracking.
run: |
restart_count=$(rg "Node is restarting in" "${{ matrix.node_data_path }}" -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "Restart $restart_count nodes"
peer_removed=$(rg "PeerRemovedFromRoutingTable" "${{ matrix.node_data_path }}" -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "PeerRemovedFromRoutingTable $peer_removed times"
if [ $peer_removed -lt $restart_count ]; then
echo "PeerRemovedFromRoutingTable times of: $peer_removed is less than the restart count of: $restart_count"
if [ -z "$peer_removed" ]; then
echo "No peer removal count found"
exit 1
fi
node_count=$(ls "${{ matrix.node_data_path }}" | wc -l)
Expand Down
14 changes: 12 additions & 2 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use std::{
fs,
io::{Read, Write},
net::SocketAddr,
num::NonZeroUsize,
path::PathBuf,
};
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -130,6 +131,13 @@ const NETWORKING_CHANNEL_SIZE: usize = 10_000;
/// Time before a Kad query times out if no response is received
const KAD_QUERY_TIMEOUT_S: Duration = Duration::from_secs(10);

// Init during compilation, instead of runtime error that should never happen
// Option<T>::expect will be stabilised as const in the future (https://github.com/rust-lang/rust/issues/67441)
const REPLICATION_FACTOR: NonZeroUsize = match NonZeroUsize::new(CLOSE_GROUP_SIZE) {
Some(v) => v,
None => panic!("CLOSE_GROUP_SIZE should not be zero"),
};

/// The various settings to apply to when fetching a record from network
#[derive(Clone)]
pub struct GetRecordCfg {
Expand Down Expand Up @@ -354,6 +362,7 @@ impl NetworkBuilder {
.disjoint_query_paths(true)
// Records never expire
.set_record_ttl(None)
.set_replication_factor(REPLICATION_FACTOR)
// Emit PUT events for validation prior to insertion into the RecordStore.
// This is no longer needed as the record_storage::put now can carry out validation.
// .set_record_filtering(KademliaStoreInserts::FilterBoth)
Expand Down Expand Up @@ -437,6 +446,7 @@ impl NetworkBuilder {
let _ = kad_cfg
.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual)
.set_max_packet_size(MAX_PACKET_SIZE)
.set_replication_factor(REPLICATION_FACTOR)
// Require iterative queries to use disjoint paths for increased resiliency in the presence of potentially adversarial nodes.
.disjoint_query_paths(true);

Expand Down Expand Up @@ -912,7 +922,7 @@ impl SwarmDriver {
let farthest_peer_to_check = self
.get_all_local_peers_excluding_self()
.len()
.checked_div(3 * CLOSE_GROUP_SIZE)
.checked_div(5 * CLOSE_GROUP_SIZE)
.unwrap_or(1);

info!("Farthest peer we'll check: {:?}", farthest_peer_to_check);
Expand Down Expand Up @@ -947,7 +957,7 @@ impl SwarmDriver {

sorted_distances.sort_unstable();

let median_index = sorted_distances.len() / 2;
let median_index = sorted_distances.len() / 8;

let default = KBucketDistance::default();
let median = sorted_distances.get(median_index).cloned();
Expand Down
228 changes: 123 additions & 105 deletions sn_networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,128 +493,146 @@ impl SwarmDriver {
/// SplitRecord if there are multiple content hash versions.
fn handle_get_record_finished(&mut self, query_id: QueryId, step: ProgressStep) -> Result<()> {
// return error if the entry cannot be found
if let Some((_key, senders, result_map, cfg)) = self.pending_get_record.remove(&query_id) {
if let Some((r_key, senders, result_map, cfg)) = self.pending_get_record.remove(&query_id) {
let num_of_versions = result_map.len();
let (result, log_string) = if let Some((record, from_peers)) =
result_map.values().next()
{
let data_key_address = NetworkAddress::from_record_key(&record.key);
let expected_get_range = self.get_request_range();

let we_have_searched_thoroughly = Self::have_we_have_searched_thoroughly_for_quorum(
expected_get_range,
from_peers,
&data_key_address,
&cfg.get_quorum,
);

let pretty_key = PrettyPrintRecordKey::from(&record.key);
info!("RANGE: {pretty_key:?} we_have_searched_far_enough: {we_have_searched_thoroughly:?}");
let data_key_address = NetworkAddress::from_record_key(&r_key);
let expected_get_range = self.get_request_range();
let all_seen_peers: HashSet<_> = result_map
.values()
.flat_map(|(_, peers)| peers)
.cloned()
.collect();
let we_have_searched_thoroughly = Self::have_we_have_searched_thoroughly_for_quorum(
expected_get_range,
&all_seen_peers,
&data_key_address,
&cfg.get_quorum,
);

// we have a split record, return it
if num_of_versions > 1 {
warn!("RANGE: Multiple versions found over range");
for sender in senders {
sender
.send(Err(GetRecordError::SplitRecord {
result_map: result_map.clone(),
}))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}

let result = if num_of_versions > 1 {
warn!("RANGE: more than one version found!");
Err(GetRecordError::SplitRecord {
result_map: result_map.clone(),
})
} else if we_have_searched_thoroughly {
warn!("RANGE: Get record finished: {pretty_key:?} Enough of the network has responded or it's not sensitive data... and we only have one copy...");
for (record, _peers) in result_map.values() {
self.reput_data_to_range(record, &data_key_address, &all_seen_peers)?;
}

Ok(record.clone())
} else {
// We have not searched enough of the network range.
let result = Err(GetRecordError::NotEnoughCopiesInRange {
record: record.clone(),
expected: get_quorum_value(&cfg.get_quorum),
got: from_peers.len(),
range: expected_get_range.ilog2().unwrap_or(0),
});
return Ok(());
}

// This should be a backstop... Quorum::All is the only one that enforces
// a full search of the network range.
if matches!(cfg.get_quorum, Quorum::All) {
warn!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has responded, we need to extend the range and PUT the data. {result:?}");
// we have no results, bail
if num_of_versions == 0 {
warn!("RANGE: No versions found!");
for sender in senders {
sender
.send(Err(GetRecordError::RecordNotFound))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}
return Ok(());
}

warn!("Reputting data to network {pretty_key:?}...");
// if we have searched thoroughly, we can return the record
if num_of_versions == 1 {
let result = if let Some((record, peers)) = result_map.values().next() {
warn!("RANGE: one version found!");

if we_have_searched_thoroughly {
Ok(record.clone())
} else {
self.reput_data_to_range(record, &data_key_address, &all_seen_peers)?;
Err(GetRecordError::NotEnoughCopiesInRange {
record: record.clone(),
expected: get_quorum_value(&cfg.get_quorum),
got: peers.len(),
range: expected_get_range.ilog2().unwrap_or(0),
})
}
} else {
debug!("Getting record task {query_id:?} completed with step count {:?}, but no copy found.", step.count);
Err(GetRecordError::RecordNotFound)
};
for sender in senders {
sender
.send(result.clone())
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}

// let's ensure we have an updated network view
self.trigger_network_discovery();
#[cfg(feature = "open-metrics")]
if self.metrics_recorder.is_some() {
self.check_for_change_in_our_close_group();
}
}
} else {
debug!("Can't locate query task {query_id:?} during GetRecord finished. We might have already returned the result to the sender.");
}
Ok(())
}

warn!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has responded, we need PUT the data back into nodes in that range.");
/// Repost data to the network if we didn't get enough responses.
fn reput_data_to_range(
&mut self,
record: &Record,
data_key_address: &NetworkAddress,
// all peers who responded with any version of the record
from_peers: &HashSet<PeerId>,
) -> Result<()> {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
// This should be a backstop... Quorum::All is the only one that enforces
// a full search of the network range.
info!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has the record, or same state, we need to extend the range and PUT the data.");

let record_type = get_type_from_record(record)?;
info!("Reputting data to network {pretty_key:?}...");

let replicate_targets: HashSet<_> = self
.get_filtered_peers_exceeding_range_or_closest_nodes(&data_key_address)
.iter()
.cloned()
.collect();
warn!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has responded, we need PUT the data back into nodes in that range.");

if from_peers == &replicate_targets {
warn!("RANGE: {pretty_key:?} We asked everyone we know of in that range already!");
}
let record_type = get_type_from_record(record)?;

// set holder to someone that has the data
let holder = NetworkAddress::from_peer(
from_peers
.iter()
.next()
.cloned()
.unwrap_or(self.self_peer_id),
);
let replicate_targets: HashSet<_> = self
.get_filtered_peers_exceeding_range_or_closest_nodes(data_key_address)
.iter()
.cloned()
.collect();

for peer in replicate_targets {
warn!("Reputting data to {peer:?} for {pretty_key:?} if needed...");
// Do not send to any peer that has already informed us
if from_peers.contains(&peer) {
continue;
}

debug!("RANGE: (insufficient, so ) Sending data to unresponded peer: {peer:?} for {pretty_key:?}");

// nodes will try/fail to trplicate it from us, but grab from the network thereafter
self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
req: Request::Cmd(Cmd::Replicate {
holder: holder.clone(),
keys: vec![(data_key_address.clone(), record_type.clone())],
}),
peer,
sender: None,
});
}
}
if from_peers == &replicate_targets {
warn!("RANGE: {pretty_key:?} We asked everyone we know of in that range already!");
}

result
};
// set holder to someone that has the data
let holder = NetworkAddress::from_peer(
from_peers
.iter()
.next()
.cloned()
.unwrap_or(self.self_peer_id),
);

(
result,
format!("Getting record {:?} completed with only {:?} copies received, and {num_of_versions} versions.",
PrettyPrintRecordKey::from(&record.key), usize::from(step.count) - 1)
)
} else {
(
Err(GetRecordError::RecordNotFound),
format!("Getting record task {query_id:?} completed with step count {:?}, but no copy found.", step.count),
)
};

if cfg.expected_holders.is_empty() {
debug!("{log_string}");
} else {
debug!(
"{log_string}, and {:?} expected holders not responded",
cfg.expected_holders
);
for peer in replicate_targets {
warn!("Reputting data to {peer:?} for {pretty_key:?} if needed...");
// Do not send to any peer that has already informed us
if from_peers.contains(&peer) {
continue;
}

for sender in senders {
sender
.send(result.clone())
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}
} else {
debug!("Can't locate query task {query_id:?} during GetRecord finished. We might have already returned the result to the sender.");
debug!("RANGE: (insufficient, so ) Sending data to unresponded peer: {peer:?} for {pretty_key:?}");

// nodes will try/fail to trplicate it from us, but grab from the network thereafter
self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
req: Request::Cmd(Cmd::Replicate {
holder: holder.clone(),
keys: vec![(data_key_address.clone(), record_type.clone())],
}),
peer,
sender: None,
});
}

Ok(())
}

Expand Down
Loading
Loading