Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
client/network: Add peers to DHT only if protocols match (#6549)
Browse files Browse the repository at this point in the history
* client/network/src/discovery: Adjust to Kademlia  API changes

* client/network: Add peers to DHT only if protocols match

With libp2p/rust-libp2p#1628 rust-libp2p allows
manually controlling which peers are inserted into the routing table.
Instead of adding each peer to the routing table automatically, insert
them only if they support the local nodes protocol id (e.g. `dot`)
retrieved via the `identify` behaviour.

For now this works around
libp2p/rust-libp2p#1611. In the future one
might add more requirements. For example one might try to exclude
light-clients.

* Cargo.toml: Remove crates.io patch for libp2p

* client/network/src/behaviour: Adjust to PeerInfo name change

* client/network/src/discovery: Rework Kademlia event matching

* client/network/discovery: Add trace on adding peer to DHT

* client/network/discovery: Retrieve protocol name from kad behaviour

* client/network/discovery: Fix formatting

* client/network: Change DiscoveryBehaviour::add_self_reported signature

* client/network: Document manual insertion strategy

* client/network/discovery: Remove TODO for ignoring DHT address

Co-authored-by: Pierre Krieger <[email protected]>
  • Loading branch information
mxinden and tomaka authored Jul 29, 2020
1 parent 075182e commit 075b2d7
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 54 deletions.
29 changes: 21 additions & 8 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use bytes::Bytes;
use codec::Encode as _;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
use libp2p::identify::IdentifyInfo;
use libp2p::kad::record;
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use log::debug;
Expand Down Expand Up @@ -413,16 +414,28 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<finality_requests::Even
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<peer_info::PeerInfoEvent>
for Behaviour<B, H> {
fn inject_event(&mut self, event: peer_info::PeerInfoEvent) {
let peer_info::PeerInfoEvent::Identified { peer_id, mut info } = event;
if info.listen_addrs.len() > 30 {
debug!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \
it is identified by {:?} and {:?}", peer_id, info.protocol_version,
info.agent_version
let peer_info::PeerInfoEvent::Identified {
peer_id,
info: IdentifyInfo {
protocol_version,
agent_version,
mut listen_addrs,
protocols,
..
},
} = event;

if listen_addrs.len() > 30 {
debug!(
target: "sub-libp2p",
"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
peer_id, protocol_version, agent_version
);
info.listen_addrs.truncate(30);
listen_addrs.truncate(30);
}
for addr in &info.listen_addrs {
self.discovery.add_self_reported_address(&peer_id, addr.clone());

for addr in listen_addrs {
self.discovery.add_self_reported_address(&peer_id, protocols.iter(), addr);
}
self.substrate.add_discovered_nodes(iter::once(peer_id));
}
Expand Down
237 changes: 191 additions & 46 deletions client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use ip_network::IpNetwork;
use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use libp2p::swarm::protocols_handler::multi::MultiHandler;
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record};
use libp2p::kad::{Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record};
use libp2p::kad::GetClosestPeersError;
use libp2p::kad::handler::KademliaHandler;
use libp2p::kad::QueryId;
Expand Down Expand Up @@ -137,17 +137,9 @@ impl DiscoveryConfig {
}

/// Add discovery via Kademlia for the given protocol.
pub fn add_protocol(&mut self, p: ProtocolId) -> &mut Self {
// NB: If this protocol name derivation is changed, check if
// `DiscoveryBehaviour::new_handler` is still correct.
let proto_name = {
let mut v = vec![b'/'];
v.extend_from_slice(p.as_bytes());
v.extend_from_slice(b"/kad");
v
};

self.add_kademlia(p, proto_name);
pub fn add_protocol(&mut self, id: ProtocolId) -> &mut Self {
let name = protocol_name_from_protocol_id(&id);
self.add_kademlia(id, name);
self
}

Expand All @@ -159,6 +151,10 @@ impl DiscoveryConfig {

let mut config = KademliaConfig::default();
config.set_protocol_name(proto_name);
// By default Kademlia attempts to insert all peers into its routing table once a dialing
// attempt succeeds. In order to control which peer is added, disable the auto-insertion and
// instead add peers manually.
config.set_kbucket_inserts(KademliaBucketInserts::Manual);

let store = MemoryStore::new(self.local_peer_id.clone());
let mut kad = Kademlia::with_config(self.local_peer_id.clone(), store, config);
Expand Down Expand Up @@ -259,17 +255,43 @@ impl DiscoveryBehaviour {
}
}

/// Call this method when a node reports an address for itself.
/// Add a self-reported address of a remote peer to the k-buckets of the supported
/// DHTs (`supported_protocols`).
///
/// **Note**: It is important that you call this method, otherwise the discovery mechanism will
/// not properly work.
pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
if self.allow_non_globals_in_dht || self.can_add_to_dht(&addr) {
for k in self.kademlias.values_mut() {
k.add_address(peer_id, addr.clone());
/// **Note**: It is important that you call this method. The discovery mechanism will not
/// automatically add connecting peers to the Kademlia k-buckets.
pub fn add_self_reported_address(
&mut self,
peer_id: &PeerId,
supported_protocols: impl Iterator<Item = impl AsRef<[u8]>>,
addr: Multiaddr
) {
if !self.allow_non_globals_in_dht && !self.can_add_to_dht(&addr) {
log::trace!(target: "sub-libp2p", "Ignoring self-reported non-global address {} from {}.", addr, peer_id);
return
}

let mut added = false;
for protocol in supported_protocols {
for kademlia in self.kademlias.values_mut() {
if protocol.as_ref() == kademlia.protocol_name() {
log::trace!(
target: "sub-libp2p",
"Adding self-reported address {} from {} to Kademlia DHT {}.",
addr, peer_id, String::from_utf8_lossy(kademlia.protocol_name()),
);
kademlia.add_address(peer_id, addr.clone());
added = true;
}
}
} else {
log::trace!(target: "sub-libp2p", "Ignoring self-reported address {} from {}", addr, peer_id);
}

if !added {
log::trace!(
target: "sub-libp2p",
"Ignoring self-reported address {} from {} as remote node is not part of any \
Kademlia DHTs supported by the local node.", addr, peer_id,
);
}
}

Expand Down Expand Up @@ -340,17 +362,21 @@ impl DiscoveryBehaviour {
}

/// Event generated by the `DiscoveryBehaviour`.
#[derive(Debug)]
pub enum DiscoveryOut {
/// The address of a peer has been added to the Kademlia routing table.
///
/// Can be called multiple times with the same identity.
/// A connection to a peer has been established but the peer has not been
/// added to the routing table because [`KademliaBucketInserts::Manual`] is
/// configured. If the peer is to be included in the routing table, it must
/// be explicitly added via
/// [`DiscoveryBehaviour::add_self_reported_address`].
Discovered(PeerId),

/// A peer connected to this node for whom no listen address is known.
///
/// In order for the peer to be added to the Kademlia routing table, a known
/// listen address must be added via [`DiscoveryBehaviour::add_self_reported_address`],
/// e.g. obtained through the `identify` protocol.
/// listen address must be added via
/// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through
/// the `identify` protocol.
UnroutablePeer(PeerId),

/// The DHT yielded results for the record request, grouped in (key, value) pairs.
Expand Down Expand Up @@ -569,8 +595,12 @@ impl NetworkBehaviour for DiscoveryBehaviour {
let ev = DiscoveryOut::UnroutablePeer(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::RoutablePeer { .. } | KademliaEvent::PendingRoutablePeer { .. } => {
// We are not interested in these events at the moment.
KademliaEvent::RoutablePeer { peer, .. } => {
let ev = DiscoveryOut::Discovered(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::PendingRoutablePeer { .. } => {
// We are not interested in this event at the moment.
}
KademliaEvent::QueryResult { result: QueryResult::GetClosestPeers(res), .. } => {
match res {
Expand Down Expand Up @@ -689,25 +719,36 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}

// NB: If this protocol name derivation is changed, check if
// `DiscoveryBehaviour::new_handler` is still correct.
fn protocol_name_from_protocol_id(id: &ProtocolId) -> Vec<u8> {
let mut v = vec![b'/'];
v.extend_from_slice(id.as_bytes());
v.extend_from_slice(b"/kad");
v
}

#[cfg(test)]
mod tests {
use crate::config::ProtocolId;
use futures::prelude::*;
use libp2p::identity::Keypair;
use libp2p::Multiaddr;
use libp2p::{Multiaddr, PeerId};
use libp2p::core::upgrade;
use libp2p::core::transport::{Transport, MemoryTransport};
use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt};
use libp2p::swarm::Swarm;
use std::{collections::HashSet, task::Poll};
use super::{DiscoveryConfig, DiscoveryOut};
use super::{DiscoveryConfig, DiscoveryOut, protocol_name_from_protocol_id};

#[test]
fn discovery_working() {
let mut user_defined = Vec::new();
let mut first_swarm_peer_id_and_addr = None;
let protocol_id = ProtocolId::from(b"dot".as_ref());

// Build swarms whose behaviour is `DiscoveryBehaviour`.
let mut swarms = (0..25).map(|_| {
// Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of
// the first swarm via `with_user_defined`.
let mut swarms = (0..25).map(|i| {
let keypair = Keypair::generate_ed25519();
let keypair2 = keypair.clone();

Expand All @@ -730,23 +771,21 @@ mod tests {
});

let behaviour = {
let protocol_id: &[u8] = b"/test/kad/1.0.0";

let mut config = DiscoveryConfig::new(keypair.public());
config.with_user_defined(user_defined.clone())
config.with_user_defined(first_swarm_peer_id_and_addr.clone())
.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(ProtocolId::from(protocol_id));
.add_protocol(protocol_id.clone());

config.finish()
};

let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

if user_defined.is_empty() {
user_defined.push((keypair.public().into_peer_id(), listen_addr.clone()));
if i == 0 {
first_swarm_peer_id_and_addr = Some((keypair.public().into_peer_id(), listen_addr.clone()))
}

Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap();
Expand All @@ -755,7 +794,10 @@ mod tests {

// Build a `Vec<HashSet<PeerId>>` with the list of nodes remaining to be discovered.
let mut to_discover = (0..swarms.len()).map(|n| {
(0..swarms.len()).filter(|p| *p != n)
(0..swarms.len())
// Skip the first swarm as all other swarms already know it.
.skip(1)
.filter(|p| *p != n)
.map(|p| Swarm::local_peer_id(&swarms[p].0).clone())
.collect::<HashSet<_>>()
}).collect::<Vec<_>>();
Expand All @@ -766,7 +808,7 @@ mod tests {
match swarms[swarm_n].0.poll_next_unpin(cx) {
Poll::Ready(Some(e)) => {
match e {
DiscoveryOut::UnroutablePeer(other) => {
DiscoveryOut::UnroutablePeer(other) | DiscoveryOut::Discovered(other) => {
// Call `add_self_reported_address` to simulate identify happening.
let addr = swarms.iter().find_map(|(s, a)|
if s.local_peer_id == other {
Expand All @@ -775,12 +817,16 @@ mod tests {
None
})
.unwrap();
swarms[swarm_n].0.add_self_reported_address(&other, addr);
},
DiscoveryOut::Discovered(other) => {
swarms[swarm_n].0.add_self_reported_address(
&other,
[protocol_name_from_protocol_id(&protocol_id)].iter(),
addr,
);

to_discover[swarm_n].remove(&other);
}
_ => {}
},
DiscoveryOut::RandomKademliaStarted(_) => {},
e => {panic!("Unexpected event: {:?}", e)},
}
continue 'polling
}
Expand All @@ -799,4 +845,103 @@ mod tests {

futures::executor::block_on(fut);
}

#[test]
fn discovery_ignores_peers_with_unknown_protocols() {
let supported_protocol_id = ProtocolId::from(b"a".as_ref());
let unsupported_protocol_id = ProtocolId::from(b"b".as_ref());

let mut discovery = {
let keypair = Keypair::generate_ed25519();
let mut config = DiscoveryConfig::new(keypair.public());
config.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(supported_protocol_id.clone());
config.finish()
};

let remote_peer_id = PeerId::random();
let remote_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

// Add remote peer with unsupported protocol.
discovery.add_self_reported_address(
&remote_peer_id,
[protocol_name_from_protocol_id(&unsupported_protocol_id)].iter(),
remote_addr.clone(),
);

for kademlia in discovery.kademlias.values_mut() {
assert!(
kademlia.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.is_empty(),
"Expect peer with unsupported protocol not to be added."
);
}

// Add remote peer with supported protocol.
discovery.add_self_reported_address(
&remote_peer_id,
[protocol_name_from_protocol_id(&supported_protocol_id)].iter(),
remote_addr.clone(),
);

for kademlia in discovery.kademlias.values_mut() {
assert_eq!(
1,
kademlia.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.num_entries(),
"Expect peer with supported protocol to be added."
);
}
}

#[test]
fn discovery_adds_peer_to_kademlia_of_same_protocol_only() {
let protocol_a = ProtocolId::from(b"a".as_ref());
let protocol_b = ProtocolId::from(b"b".as_ref());

let mut discovery = {
let keypair = Keypair::generate_ed25519();
let mut config = DiscoveryConfig::new(keypair.public());
config.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(protocol_a.clone())
.add_protocol(protocol_b.clone());
config.finish()
};

let remote_peer_id = PeerId::random();
let remote_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

// Add remote peer with `protocol_a` only.
discovery.add_self_reported_address(
&remote_peer_id,
[protocol_name_from_protocol_id(&protocol_a)].iter(),
remote_addr.clone(),
);

assert_eq!(
1,
discovery.kademlias.get_mut(&protocol_a)
.expect("Kademlia instance to exist.")
.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.num_entries(),
"Expected remote peer to be added to `protocol_a` Kademlia instance.",

);

assert!(
discovery.kademlias.get_mut(&protocol_b)
.expect("Kademlia instance to exist.")
.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.is_empty(),
"Expected remote peer not to be added to `protocol_b` Kademlia instance.",
);
}
}

0 comments on commit 075b2d7

Please sign in to comment.