Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node side RBS support #2464

Merged
merged 5 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 80 additions & 275 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion autonomi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ curv = { version = "0.10.1", package = "sn_curv", default-features = false, feat
eip2333 = { version = "0.2.1", package = "sn_bls_ckd" }
const-hex = "1.12.0"
hex = "~0.4.3"
libp2p = "0.54.1"
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2" }
rand = "0.8.5"
rmp-serde = "1.1.1"
self_encryption = "~0.30.0"
Expand Down
2 changes: 1 addition & 1 deletion nat-detection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ clap = { version = "4.5.4", features = ["derive"] }
clap-verbosity-flag = "2.2.0"
color-eyre = { version = "0.6", default-features = false }
futures = "~0.3.13"
libp2p = { version = "0.54.1", features = [
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = [
"tokio",
"tcp",
"noise",
Expand Down
2 changes: 1 addition & 1 deletion sn_evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ custom_debug = "~0.6.1"
evmlib = { path = "../evmlib", version = "0.1.4" }
hex = "~0.4.3"
lazy_static = "~1.4.0"
libp2p = { version = "0.53", features = ["identify", "kad"] }
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = ["identify", "kad"] }
rand = { version = "~0.8.5", features = ["small_rng"] }
rmp-serde = "1.1.1"
serde = { version = "1.0.133", features = ["derive", "rc"] }
Expand Down
4 changes: 2 additions & 2 deletions sn_networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ loud = []

[dependencies]
lazy_static = "~1.4.0"
libp2p = { version = "0.54.1", features = [
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = [
"tokio",
"dns",
"kad",
Expand Down Expand Up @@ -98,7 +98,7 @@ crate-type = ["cdylib", "rlib"]

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2.12", features = ["js"] }
libp2p = { version = "0.54.1", features = [
libp2p = { git = "https://github.com/maqi/rust-libp2p.git", branch = "kad_0.46.2", features = [
"tokio",
"dns",
"kad",
Expand Down
103 changes: 82 additions & 21 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ use crate::{
event::TerminateNodeReason,
log_markers::Marker,
multiaddr_pop_p2p, GetRecordCfg, GetRecordError, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE,
REPLICATION_PEERS_COUNT,
};
use libp2p::{
kad::{
store::{Error as StoreError, RecordStore},
Quorum, Record, RecordKey,
KBucketDistance as Distance, Quorum, Record, RecordKey,
},
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -64,6 +63,12 @@ pub enum LocalSwarmCmd {
GetKBuckets {
sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
},
/// Returns the replicate candidates in range.
/// In case the range is too narrow, returns at lease CLOSE_GROUP_SIZE peers.
GetReplicateCandidates {
data_addr: NetworkAddress,
sender: oneshot::Sender<Vec<PeerId>>,
},
// Returns up to K_VALUE peers from all the k-buckets from the local Routing Table.
// And our PeerId as well.
GetClosestKLocalPeers {
Expand Down Expand Up @@ -136,6 +141,10 @@ pub enum LocalSwarmCmd {
TriggerIntervalReplication,
/// Triggers unrelevant record cleanup
TriggerIrrelevantRecordCleanup,
/// Add a network density sample
AddNetworkDensitySample {
distance: Distance,
},
}

/// Commands to send to the Swarm
Expand Down Expand Up @@ -216,7 +225,9 @@ impl Debug for LocalSwarmCmd {
PrettyPrintRecordKey::from(key)
)
}

LocalSwarmCmd::GetReplicateCandidates { .. } => {
write!(f, "LocalSwarmCmd::GetReplicateCandidates")
}
LocalSwarmCmd::GetClosestKLocalPeers { .. } => {
write!(f, "LocalSwarmCmd::GetClosestKLocalPeers")
}
Expand Down Expand Up @@ -287,6 +298,9 @@ impl Debug for LocalSwarmCmd {
LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
}
LocalSwarmCmd::AddNetworkDensitySample { distance } => {
write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})")
}
}
}
}
Expand Down Expand Up @@ -702,7 +716,7 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.get_farthest_replication_distance_bucket()
.get_farthest_replication_distance()
{
self.replication_fetcher
.set_replication_distance_range(distance);
Expand Down Expand Up @@ -802,7 +816,10 @@ impl SwarmDriver {
cmd_string = "GetClosestKLocalPeers";
let _ = sender.send(self.get_closest_k_value_local_peers());
}

LocalSwarmCmd::GetReplicateCandidates { data_addr, sender } => {
cmd_string = "GetReplicateCandidates";
let _ = sender.send(self.get_replicate_candidates(&data_addr));
}
LocalSwarmCmd::GetSwarmLocalState(sender) => {
cmd_string = "GetSwarmLocalState";
let current_state = SwarmLocalState {
Expand Down Expand Up @@ -868,6 +885,10 @@ impl SwarmDriver {
.store_mut()
.cleanup_irrelevant_records();
}
LocalSwarmCmd::AddNetworkDensitySample { distance } => {
cmd_string = "AddNetworkDensitySample";
self.network_density_samples.add(distance);
}
}

self.log_handling(cmd_string.to_string(), start.elapsed());
Expand Down Expand Up @@ -995,22 +1016,8 @@ impl SwarmDriver {
// Store the current time as the last replication time
self.last_replication = Some(Instant::now());

// get closest peers from buckets, sorted by increasing distance to us
let our_peer_id = self.self_peer_id.into();
let closest_k_peers = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&our_peer_id)
// Map KBucketKey<PeerId> to PeerId.
.map(|key| key.into_preimage());

// Only grab the closest nodes within the REPLICATE_RANGE
let mut replicate_targets = closest_k_peers
.into_iter()
// add some leeway to allow for divergent knowledge
.take(REPLICATION_PEERS_COUNT)
.collect::<Vec<_>>();
let self_addr = NetworkAddress::from_peer(self.self_peer_id);
let mut replicate_targets = self.get_replicate_candidates(&self_addr);

let now = Instant::now();
self.replication_targets
Expand Down Expand Up @@ -1055,4 +1062,58 @@ impl SwarmDriver {

Ok(())
}

// Replies with in-range replicate candidates
// Fall back to CLOSE_GROUP_SIZE peers if range is too narrow.
// Note that:
// * For general replication, replicate candidates shall be the closest to self
// * For replicate fresh records, the replicate candidates shall be the closest to data
pub(crate) fn get_replicate_candidates(&mut self, target: &NetworkAddress) -> Vec<PeerId> {
// get closest peers from buckets, sorted by increasing distance to the target
let kbucket_key = target.as_kbucket_key();
let closest_k_peers: Vec<PeerId> = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&kbucket_key)
// Map KBucketKey<PeerId> to PeerId.
.map(|key| key.into_preimage())
.collect();

if let Some(responsible_range) = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.get_farthest_replication_distance()
{
let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);

if peers_in_range.len() >= CLOSE_GROUP_SIZE {
return peers_in_range;
}
}

// In case the range is too narrow, fall back to at least CLOSE_GROUP_SIZE peers.
closest_k_peers
.iter()
.take(CLOSE_GROUP_SIZE)
.cloned()
.collect()
}
}

/// Returns the nodes that within the defined distance.
fn get_peers_in_range(peers: &[PeerId], address: &NetworkAddress, range: Distance) -> Vec<PeerId> {
peers
.iter()
.filter_map(|peer_id| {
let distance = address.distance(&NetworkAddress::from_peer(*peer_id));
if distance <= range {
Some(*peer_id)
} else {
None
}
})
.collect()
}
95 changes: 59 additions & 36 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
error::{NetworkError, Result},
event::{NetworkEvent, NodeEvent},
external_address::ExternalAddressManager,
fifo_register::FifoRegister,
log_markers::Marker,
multiaddr_pop_p2p,
network_discovery::NetworkDiscovery,
Expand All @@ -35,7 +36,7 @@ use libp2p::mdns;
use libp2p::{core::muxing::StreamMuxerBox, relay};
use libp2p::{
identity::Keypair,
kad::{self, QueryId, Quorum, Record, RecordKey, K_VALUE},
kad::{self, KBucketDistance as Distance, QueryId, Quorum, Record, RecordKey, K_VALUE, U256},
multiaddr::Protocol,
request_response::{self, Config as RequestResponseConfig, OutboundRequestId, ProtocolSupport},
swarm::{
Expand Down Expand Up @@ -736,6 +737,7 @@ impl NetworkBuilder {
replication_targets: Default::default(),
last_replication: None,
last_connection_pruning_time: Instant::now(),
network_density_samples: FifoRegister::new(100),
};

let network = Network::new(
Expand Down Expand Up @@ -841,6 +843,8 @@ pub struct SwarmDriver {
pub(crate) last_replication: Option<Instant>,
/// when was the last outdated connection prunning undertaken.
pub(crate) last_connection_pruning_time: Instant,
/// FIFO cache for the network density samples
pub(crate) network_density_samples: FifoRegister,
}

impl SwarmDriver {
Expand Down Expand Up @@ -922,15 +926,62 @@ impl SwarmDriver {
}
_ = set_farthest_record_interval.tick() => {
if !self.is_client {
let (
_index,
_total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
_kbucket_table_stats,
) = self.kbuckets_status();
let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
if estimated_network_size <= CLOSE_GROUP_SIZE {
info!("Not enough estimated network size {estimated_network_size}, with {peers_in_non_full_buckets} peers_in_non_full_buckets and {num_of_full_buckets}num_of_full_buckets.");
continue;
}
// The entire Distance space is U256
// (U256::MAX is 115792089237316195423570985008687907853269984665640564039457584007913129639935)
// The network density (average distance among nodes) can be estimated as:
// network_density = entire_U256_space / estimated_network_size
let density = U256::MAX / U256::from(estimated_network_size);
let estimated_distance = density * U256::from(CLOSE_GROUP_SIZE);
let density_distance = Distance(estimated_distance);

// Use distanct to close peer to avoid the situation that
// the estimated density_distance is too narrow.
let closest_k_peers = self.get_closest_k_value_local_peers();

if let Some(distance) = self.get_responsbile_range_estimate(&closest_k_peers) {
info!("Set responsible range to {distance}");
// set any new distance to farthest record in the store
self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance);
// the distance range within the replication_fetcher shall be in sync as well
self.replication_fetcher.set_replication_distance_range(distance);
if closest_k_peers.len() <= CLOSE_GROUP_SIZE + 2 {
continue;
}
// Results are sorted, hence can calculate distance directly
// Note: self is included
let self_addr = NetworkAddress::from_peer(self.self_peer_id);
let close_peers_distance = self_addr.distance(&NetworkAddress::from_peer(closest_k_peers[CLOSE_GROUP_SIZE + 1]));

let distance = std::cmp::max(density_distance, close_peers_distance);

// let distance = if let Some(distance) = self.network_density_samples.get_median() {
// distance
// } else {
// // In case sampling not triggered or yet,
// // fall back to use the distance to CLOSE_GROUP_SIZEth closest
// let closest_k_peers = self.get_closest_k_value_local_peers();
// if closest_k_peers.len() <= CLOSE_GROUP_SIZE + 1 {
// continue;
// }
// // Results are sorted, hence can calculate distance directly
// // Note: self is included
// let self_addr = NetworkAddress::from_peer(self.self_peer_id);
// self_addr.distance(&NetworkAddress::from_peer(closest_k_peers[CLOSE_GROUP_SIZE]))

// };

info!("Set responsible range to {distance:?}({:?})", distance.ilog2());

// set any new distance to farthest record in the store
self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance);
// the distance range within the replication_fetcher shall be in sync as well
self.replication_fetcher.set_replication_distance_range(distance);
}
}
_ = relay_manager_reservation_interval.tick() => self.relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes),
Expand All @@ -942,34 +993,6 @@ impl SwarmDriver {
// ---------- Crate helpers -------------------
// --------------------------------------------

/// Uses the closest k peers to estimate the farthest address as
/// `K_VALUE / 2`th peer's bucket.
fn get_responsbile_range_estimate(
&mut self,
// Sorted list of closest k peers to our peer id.
closest_k_peers: &[PeerId],
) -> Option<u32> {
// if we don't have enough peers we don't set the distance range yet.
let mut farthest_distance = None;

if closest_k_peers.is_empty() {
return farthest_distance;
}

let our_address = NetworkAddress::from_peer(self.self_peer_id);

// get `K_VALUE / 2`th peer's address distance
// This is a rough estimate of the farthest address we might be responsible for.
// We want this to be higher than actually necessary, so we retain more data
// and can be sure to pass bad node checks
let target_index = std::cmp::min(K_VALUE.get() / 2, closest_k_peers.len()) - 1;

let address = NetworkAddress::from_peer(closest_k_peers[target_index]);
farthest_distance = our_address.distance(&address).ilog2();

farthest_distance
}

/// Pushes NetworkSwarmCmd off thread so as to be non-blocking
/// this is a wrapper around the `mpsc::Sender::send` call
pub(crate) fn queue_network_swarm_cmd(&self, event: NetworkSwarmCmd) {
Expand Down
Loading
Loading