Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Range calculation tryout #2460

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,10 @@ jobs:
exit 1
fi

# Sleep for a while to allow network density sampling
- name: Sleep a while
run: sleep 300

- name: Stop the local network and upload logs
if: always()
uses: maidsafe/sn-local-testnet-action@main
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sn_networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ libp2p = { version = "0.54.1", features = [
"yamux",
"websocket",
] }
alloy = { version = "0.5.3", default-features = false, features = ["std", "reqwest-rustls-tls", "provider-anvil-node", "sol-types", "json", "signers", "contract", "signer-local", "network"] }
async-trait = "0.1"
bytes = { version = "1.0.1", features = ["serde"] }
exponential-backoff = "2.0.0"
Expand Down
13 changes: 12 additions & 1 deletion sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
use libp2p::{
kad::{
store::{Error as StoreError, RecordStore},
Quorum, Record, RecordKey,
KBucketDistance as Distance, Quorum, Record, RecordKey,
},
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -136,6 +136,10 @@ pub enum LocalSwarmCmd {
TriggerIntervalReplication,
/// Triggers unrelevant record cleanup
TriggerIrrelevantRecordCleanup,
/// Add a network density sample
AddNetworkDensitySample {
distance: Distance,
},
}

/// Commands to send to the Swarm
Expand Down Expand Up @@ -287,6 +291,9 @@ impl Debug for LocalSwarmCmd {
LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
}
LocalSwarmCmd::AddNetworkDensitySample { distance } => {
write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})")
}
}
}
}
Expand Down Expand Up @@ -868,6 +875,10 @@ impl SwarmDriver {
.store_mut()
.cleanup_irrelevant_records();
}
LocalSwarmCmd::AddNetworkDensitySample { distance } => {
cmd_string = "AddNetworkDensitySample";
self.network_density_samples.add(distance);
}
}

self.log_handling(cmd_string.to_string(), start.elapsed());
Expand Down
29 changes: 28 additions & 1 deletion sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
error::{NetworkError, Result},
event::{NetworkEvent, NodeEvent},
external_address::ExternalAddressManager,
fifo_register::FifoRegister,
log_markers::Marker,
multiaddr_pop_p2p,
network_discovery::NetworkDiscovery,
Expand All @@ -28,6 +29,7 @@ use crate::{
metrics::service::run_metrics_server, metrics::NetworkMetricsRecorder, MetricsRegistries,
};
use crate::{transport, NodeIssue};
use alloy::primitives::U256;
use futures::future::Either;
use futures::StreamExt;
#[cfg(feature = "local")]
Expand Down Expand Up @@ -730,6 +732,7 @@ impl NetworkBuilder {
replication_targets: Default::default(),
last_replication: None,
last_connection_pruning_time: Instant::now(),
network_density_samples: FifoRegister::new(100),
};

let network = Network::new(
Expand Down Expand Up @@ -835,6 +838,8 @@ pub struct SwarmDriver {
pub(crate) last_replication: Option<Instant>,
/// when was the last outdated connection prunning undertaken.
pub(crate) last_connection_pruning_time: Instant,
/// FIFO cache for the network density samples
pub(crate) network_density_samples: FifoRegister,
}

impl SwarmDriver {
Expand Down Expand Up @@ -919,7 +924,29 @@ impl SwarmDriver {
let closest_k_peers = self.get_closest_k_value_local_peers();

if let Some(distance) = self.get_responsbile_range_estimate(&closest_k_peers) {
info!("Set responsible range to {distance}");
let (
_index,
_total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
_kbucket_table_stats,
) = self.kbuckets_status();
let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
// The entire Distance space is U256
// The density can be estimated as: distance/space
let density = U256::MAX / U256::from(estimated_network_size);
let estimated_distance = density * U256::from(CLOSE_GROUP_SIZE);

let sampled_distance = self.network_density_samples.get_median();
let ilog2 = if let Some(distance) = sampled_distance {
distance.ilog2()
} else {
None
};
info!("Set responsible range to {distance}, current sampled_distance is {ilog2:?}({sampled_distance:?}), \
estimated_distance is {:?}({estimated_distance:?}) with network_size of {estimated_network_size}",
estimated_distance.log2());
// set any new distance to farthest record in the store
self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance);
// the distance range within the replication_fetcher shall be in sync as well
Expand Down
35 changes: 28 additions & 7 deletions sn_networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ use std::{
};
use tokio::sync::oneshot;

// (total_buckets, total_peers, peers_in_non_full_buckets, num_of_full_buckets, kbucket_table_stats)
type KBucketStatus = (usize, usize, usize, usize, Vec<(usize, usize, u32)>);

/// NodeEvent enum
#[derive(CustomDebug)]
pub(super) enum NodeEvent {
Expand Down Expand Up @@ -295,12 +298,8 @@ impl SwarmDriver {
}
}

/// Logs the kbuckets also records the bucket info.
pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) {
let distance = NetworkAddress::from_peer(self.self_peer_id)
.distance(&NetworkAddress::from_peer(*peer));
info!("Peer {peer:?} has a {:?} distance to us", distance.ilog2());

/// Collect kbuckets status
pub(crate) fn kbuckets_status(&mut self) -> KBucketStatus {
let mut kbucket_table_stats = vec![];
let mut index = 0;
let mut total_peers = 0;
Expand All @@ -327,6 +326,28 @@ impl SwarmDriver {
}
index += 1;
}
(
index,
total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
kbucket_table_stats,
)
}

/// Logs the kbuckets also records the bucket info.
pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) {
let distance = NetworkAddress::from_peer(self.self_peer_id)
.distance(&NetworkAddress::from_peer(*peer));
info!("Peer {peer:?} has a {:?} distance to us", distance.ilog2());

let (
index,
total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
kbucket_table_stats,
) = self.kbuckets_status();

let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
Expand All @@ -353,7 +374,7 @@ impl SwarmDriver {
}

/// Estimate the number of nodes in the network
fn estimate_network_size(
pub(crate) fn estimate_network_size(
peers_in_non_full_buckets: usize,
num_of_full_buckets: usize,
) -> usize {
Expand Down
62 changes: 62 additions & 0 deletions sn_networking/src/fifo_register.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use libp2p::kad::KBucketDistance as Distance;
use std::collections::VecDeque;

pub(crate) struct FifoRegister {
queue: VecDeque<Distance>,
max_length: usize,
cached_median: Option<Distance>, // Cache for the median result
is_dirty: bool, // Flag indicating if cache is valid
}

impl FifoRegister {
// Creates a new FifoRegister with a specified maximum length
pub(crate) fn new(max_length: usize) -> Self {
FifoRegister {
queue: VecDeque::with_capacity(max_length),
max_length,
cached_median: None,
is_dirty: true,
}
}

// Adds an entry to the register, removing excess elements if over max_length
pub(crate) fn add(&mut self, entry: Distance) {
if self.queue.len() == self.max_length {
self.queue.pop_front(); // Remove the oldest element to maintain length
}
self.queue.push_back(entry);

// Mark the cache as invalid since the data has changed
self.is_dirty = true;
}

// Returns the median of the maximum values of the entries
pub(crate) fn get_median(&mut self) -> Option<Distance> {
if self.queue.is_empty() {
return None; // No median if the queue is empty
}

if !self.is_dirty {
return self.cached_median; // Return cached result if it's valid
}

let mut max_values: Vec<Distance> = self.queue.iter().copied().collect();

max_values.sort_unstable();

let len = max_values.len();
// Cache the result and mark the cache as valid
self.cached_median = Some(max_values[len / 2]);
self.is_dirty = false;

self.cached_median
}
}
5 changes: 5 additions & 0 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod driver;
mod error;
mod event;
mod external_address;
mod fifo_register;
mod log_markers;
#[cfg(feature = "open-metrics")]
mod metrics;
Expand Down Expand Up @@ -1007,6 +1008,10 @@ impl Network {
self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup)
}

pub fn add_network_density_sample(&self, distance: KBucketDistance) {
self.send_local_swarm_cmd(LocalSwarmCmd::AddNetworkDensitySample { distance })
}

/// Helper to send NetworkSwarmCmd
fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) {
send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd);
Expand Down
33 changes: 33 additions & 0 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
/// Interval to clean up unrelevant records
const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600);

/// Interval to carryout network density sampling
const NETWORK_DENSITY_SAMPLING_INTERVAL: Duration = Duration::from_secs(113);

/// Helper to build and run a Node
pub struct NodeBuilder {
identity_keypair: Keypair,
Expand Down Expand Up @@ -277,6 +280,10 @@ impl Node {
tokio::time::interval(UNRELEVANT_RECORDS_CLEANUP_INTERVAL);
let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately

let mut network_density_sampling_interval =
tokio::time::interval(NETWORK_DENSITY_SAMPLING_INTERVAL);
let _ = network_density_sampling_interval.tick().await; // first tick completes immediately

loop {
let peers_connected = &peers_connected;

Expand Down Expand Up @@ -341,6 +348,16 @@ impl Node {
Self::trigger_irrelevant_record_cleanup(network);
});
}
_ = network_density_sampling_interval.tick() => {
let start = Instant::now();
debug!("Periodic network density sampling triggered");
let network = self.network().clone();

let _handle = spawn(async move {
Self::network_density_sampling(network).await;
trace!("Periodic network density sampling took {:?}", start.elapsed());
});
}
}
}
});
Expand Down Expand Up @@ -712,6 +729,22 @@ impl Node {
Response::Query(resp)
}

async fn network_density_sampling(network: Network) {
for _ in 0..10 {
let target = NetworkAddress::from_peer(PeerId::random());
// Result is sorted and only return CLOSE_GROUP_SIZE entries
let peers = network.node_get_closest_peers(&target).await;
if let Ok(peers) = peers {
if peers.len() >= CLOSE_GROUP_SIZE {
// Calculate the distance to the farthest.
let distance =
target.distance(&NetworkAddress::from_peer(peers[CLOSE_GROUP_SIZE - 1]));
network.add_network_density_sample(distance);
}
}
}
}

async fn try_bad_nodes_check(network: Network, rolling_index: usize) {
if let Ok(kbuckets) = network.get_kbuckets().await {
let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum();
Expand Down
Loading