Skip to content

Commit

Permalink
client/network: Add peers to DHT only if protocols match
Browse files Browse the repository at this point in the history
With libp2p/rust-libp2p#1628 rust-libp2p allows
manually controlling which peers are inserted into the routing table.
Instead of adding each peer to the routing table automatically, insert
them only if they support the local nodes protocol id (e.g. `dot`)
retrieved via the `identify` behaviour.

For now this works around
libp2p/rust-libp2p#1611. In the future one
might add more requirements. For example one might try to exclude
light-clients.
  • Loading branch information
mxinden committed Jul 1, 2020
1 parent 4baa1b4 commit 311f93a
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 51 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ zeroize = { opt-level = 3 }
# Substrate runtime requires unwinding.
panic = "unwind"


[patch.crates-io]
libp2p-kad = { git = "https://github.com/romanb/rust-libp2p", branch = "kad-bucket-control" }
libp2p = { git = "https://github.com/romanb/rust-libp2p", branch = "kad-bucket-control" }
libp2p-kad = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" }
libp2p-tcp = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" }
libp2p = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" }
libp2p-wasm-ext = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" }
29 changes: 21 additions & 8 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
use codec::Encode as _;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
use libp2p::identify::IdentifyInfo;
use libp2p::kad::record;
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use log::debug;
Expand Down Expand Up @@ -364,16 +365,28 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<finality_requests::Even
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<debug_info::DebugInfoEvent>
for Behaviour<B, H> {
fn inject_event(&mut self, event: debug_info::DebugInfoEvent) {
let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event;
if info.listen_addrs.len() > 30 {
debug!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \
it is identified by {:?} and {:?}", peer_id, info.protocol_version,
info.agent_version
let debug_info::DebugInfoEvent::Identified {
peer_id,
info: IdentifyInfo {
protocol_version,
agent_version,
mut listen_addrs,
protocols,
..
},
} = event;

if listen_addrs.len() > 30 {
debug!(
target: "sub-libp2p",
"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
peer_id, protocol_version, agent_version
);
info.listen_addrs.truncate(30);
listen_addrs.truncate(30);
}
for addr in &info.listen_addrs {
self.discovery.add_self_reported_address(&peer_id, addr.clone());

for addr in listen_addrs {
self.discovery.add_self_reported_address(&peer_id, &protocols, addr);
}
self.substrate.add_discovered_nodes(iter::once(peer_id));
}
Expand Down
218 changes: 178 additions & 40 deletions client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use ip_network::IpNetwork;
use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use libp2p::swarm::protocols_handler::multi::MultiHandler;
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record};
use libp2p::kad::{Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record};
use libp2p::kad::GetClosestPeersError;
use libp2p::kad::handler::KademliaHandler;
use libp2p::kad::QueryId;
Expand Down Expand Up @@ -137,17 +137,9 @@ impl DiscoveryConfig {
}

/// Add discovery via Kademlia for the given protocol.
pub fn add_protocol(&mut self, p: ProtocolId) -> &mut Self {
// NB: If this protocol name derivation is changed, check if
// `DiscoveryBehaviour::new_handler` is still correct.
let proto_name = {
let mut v = vec![b'/'];
v.extend_from_slice(p.as_bytes());
v.extend_from_slice(b"/kad");
v
};

self.add_kademlia(p, proto_name);
pub fn add_protocol(&mut self, id: ProtocolId) -> &mut Self {
let name = protocol_name_from_protocol_id(&id);
self.add_kademlia(id, name);
self
}

Expand All @@ -159,6 +151,7 @@ impl DiscoveryConfig {

let mut config = KademliaConfig::default();
config.set_protocol_name(proto_name);
config.set_kbucket_inserts(KademliaBucketInserts::Manual);

let store = MemoryStore::new(self.local_peer_id.clone());
let mut kad = Kademlia::with_config(self.local_peer_id.clone(), store, config);
Expand Down Expand Up @@ -263,13 +256,26 @@ impl DiscoveryBehaviour {
///
/// **Note**: It is important that you call this method, otherwise the discovery mechanism will
/// not properly work.
pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
pub fn add_self_reported_address(&mut self, peer_id: &PeerId, protocols: &[String], addr: Multiaddr) {
if self.allow_non_globals_in_dht || self.can_add_to_dht(&addr) {
for k in self.kademlias.values_mut() {
k.add_address(peer_id, addr.clone());
let mut added = false;
for (protocol_id, kademlia) in self.kademlias.iter_mut() {
// TODO: Look into way around generating the protocol name each time.
if protocols.iter().any(|p| p.as_bytes() == protocol_name_from_protocol_id(protocol_id).as_slice()) {
kademlia.add_address(peer_id, addr.clone());
added = true;
}
}

if !added {
log::trace!(
target: "sub-libp2p",
"Ignoring self-reported address {} from {} as remote node is not part of any \
Kademlia DHTs supported by the local node.", addr, peer_id,
);
}
} else {
log::trace!(target: "sub-libp2p", "Ignoring self-reported address {} from {}", addr, peer_id);
log::trace!(target: "sub-libp2p", "Ignoring self-reported non-global address {} from {}.", addr, peer_id);
}
}

Expand Down Expand Up @@ -339,17 +345,21 @@ impl DiscoveryBehaviour {
}

/// Event generated by the `DiscoveryBehaviour`.
#[derive(Debug)]
pub enum DiscoveryOut {
/// The address of a peer has been added to the Kademlia routing table.
///
/// Can be called multiple times with the same identity.
/// A connection to a peer has been established but the peer has not been
/// added to the routing table because [`KademliaBucketInserts::Manual`] is
/// configured. If the peer is to be included in the routing table, it must
/// be explicitly added via
/// [`DiscoveryBehaviour::add_self_reported_address`].
Discovered(PeerId),

/// A peer connected to this node for whom no listen address is known.
///
/// In order for the peer to be added to the Kademlia routing table, a known
/// listen address must be added via [`DiscoveryBehaviour::add_self_reported_address`],
/// e.g. obtained through the `identify` protocol.
/// listen address must be added via
/// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through
/// the `identify` protocol.
UnroutablePeer(PeerId),

/// The DHT yielded results for the record request, grouped in (key, value) pairs.
Expand Down Expand Up @@ -582,10 +592,22 @@ impl NetworkBehaviour for DiscoveryBehaviour {
let ev = DiscoveryOut::UnroutablePeer(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::RoutingUpdated { peer, .. } => {
// TODO: By ignoring the `addresses` field we ignore the
// addresses reported through Kademlia. Are the
// addresses reported via the identify behaviour always
// superior? We depend on the identify behaviour
// finishing before the connection is closed as
// otherwise identify does not know how to connect to
// the node as we drop the addresses here.
KademliaEvent::RoutablePeer { peer, .. } => {
let ev = DiscoveryOut::Discovered(peer);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
// `KademliaBucketInserts::Manual` is configured for each Kademlia instance.
// Thus the `RoutingUpdated` event or the `PendingRoutablePeer` event was
// preceded by a call to `Kademlia::add_address`. This implies that we are
// already aware of the node and thereby don't need to take any actions.
KademliaEvent::RoutingUpdated { .. } | KademliaEvent::PendingRoutablePeer { .. } => {},
KademliaEvent::QueryResult { result: QueryResult::GetClosestPeers(res), .. } => {
match res {
Err(GetClosestPeersError::Timeout { key, peers }) => {
Expand Down Expand Up @@ -703,25 +725,37 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}

// NB: If this protocol name derivation is changed, check if
// `DiscoveryBehaviour::new_handler` is still correct.
fn protocol_name_from_protocol_id(id: &ProtocolId) -> Vec<u8> {
let mut v = vec![b'/'];
v.extend_from_slice(id.as_bytes());
v.extend_from_slice(b"/kad");
v
}

#[cfg(test)]
mod tests {
use crate::config::ProtocolId;
use futures::prelude::*;
use libp2p::identity::Keypair;
use libp2p::Multiaddr;
use libp2p::{Multiaddr, PeerId};
use libp2p::core::upgrade;
use libp2p::core::transport::{Transport, MemoryTransport};
use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt};
use libp2p::swarm::Swarm;
use std::{collections::HashSet, task::Poll};
use super::{DiscoveryConfig, DiscoveryOut};
use super::{DiscoveryConfig, DiscoveryOut, protocol_name_from_protocol_id};

#[test]
fn discovery_working() {
let mut user_defined = Vec::new();
let mut first_swarm_peer_id_and_addr = None;
// TODO: Should this not be something like `dot` or `sub`?
let protocol_id = ProtocolId::from(b"/test/kad/1.0.0".as_ref());

// Build swarms whose behaviour is `DiscoveryBehaviour`.
let mut swarms = (0..25).map(|_| {
// Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of
// the first swarm via `with_user_defined`.
let mut swarms = (0..25).map(|i| {
let keypair = Keypair::generate_ed25519();
let keypair2 = keypair.clone();

Expand All @@ -744,23 +778,21 @@ mod tests {
});

let behaviour = {
let protocol_id: &[u8] = b"/test/kad/1.0.0";

let mut config = DiscoveryConfig::new(keypair.public());
config.with_user_defined(user_defined.clone())
config.with_user_defined(first_swarm_peer_id_and_addr.clone())
.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(ProtocolId::from(protocol_id));
.add_protocol(protocol_id.clone());

config.finish()
};

let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

if user_defined.is_empty() {
user_defined.push((keypair.public().into_peer_id(), listen_addr.clone()));
if i == 0 {
first_swarm_peer_id_and_addr = Some((keypair.public().into_peer_id(), listen_addr.clone()))
}

Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap();
Expand All @@ -769,7 +801,10 @@ mod tests {

// Build a `Vec<HashSet<PeerId>>` with the list of nodes remaining to be discovered.
let mut to_discover = (0..swarms.len()).map(|n| {
(0..swarms.len()).filter(|p| *p != n)
(0..swarms.len())
// Skip the first swarm as all other swarms already know it.
.skip(1)
.filter(|p| *p != n)
.map(|p| Swarm::local_peer_id(&swarms[p].0).clone())
.collect::<HashSet<_>>()
}).collect::<Vec<_>>();
Expand All @@ -780,7 +815,7 @@ mod tests {
match swarms[swarm_n].0.poll_next_unpin(cx) {
Poll::Ready(Some(e)) => {
match e {
DiscoveryOut::UnroutablePeer(other) => {
DiscoveryOut::UnroutablePeer(other) | DiscoveryOut::Discovered(other) => {
// Call `add_self_reported_address` to simulate identify happening.
let addr = swarms.iter().find_map(|(s, a)|
if s.local_peer_id == other {
Expand All @@ -789,12 +824,16 @@ mod tests {
None
})
.unwrap();
swarms[swarm_n].0.add_self_reported_address(&other, addr);
},
DiscoveryOut::Discovered(other) => {
swarms[swarm_n].0.add_self_reported_address(
&other,
&[String::from_utf8(protocol_name_from_protocol_id(&protocol_id)).unwrap()],
addr,
);

to_discover[swarm_n].remove(&other);
}
_ => {}
},
DiscoveryOut::RandomKademliaStarted(_) => {},
e => {panic!("Unexpected event: {:?}", e)},
}
continue 'polling
}
Expand All @@ -813,4 +852,103 @@ mod tests {

futures::executor::block_on(fut);
}

#[test]
fn discovery_ignores_peers_with_unknown_protocols() {
let supported_protocol_id = ProtocolId::from(b"a".as_ref());
let unsupported_protocol_id = ProtocolId::from(b"b".as_ref());

let mut discovery = {
let keypair = Keypair::generate_ed25519();
let mut config = DiscoveryConfig::new(keypair.public());
config.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(supported_protocol_id.clone());
config.finish()
};

let remote_peer_id = PeerId::random();
let remote_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

// Add remote peer with unsupported protocol.
discovery.add_self_reported_address(
&remote_peer_id,
&[String::from_utf8(protocol_name_from_protocol_id(&unsupported_protocol_id)).unwrap()],
remote_addr.clone(),
);

for kademlia in discovery.kademlias.values_mut() {
assert!(
kademlia.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.is_empty(),
"Expect peer with unsupported protocol not to be added."
);
}

// Add remote peer with supported protocol.
discovery.add_self_reported_address(
&remote_peer_id,
&[String::from_utf8(protocol_name_from_protocol_id(&supported_protocol_id)).unwrap()],
remote_addr.clone(),
);

for kademlia in discovery.kademlias.values_mut() {
assert_eq!(
1,
kademlia.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.num_entries(),
"Expect peer with supported protocol to be added."
);
}
}

#[test]
fn discovery_adds_peer_to_kademlia_of_same_protocol_only() {
let protocol_a = ProtocolId::from(b"a".as_ref());
let protocol_b = ProtocolId::from(b"b".as_ref());

let mut discovery = {
let keypair = Keypair::generate_ed25519();
let mut config = DiscoveryConfig::new(keypair.public());
config.allow_private_ipv4(true)
.allow_non_globals_in_dht(true)
.discovery_limit(50)
.add_protocol(protocol_a.clone())
.add_protocol(protocol_b.clone());
config.finish()
};

let remote_peer_id = PeerId::random();
let remote_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();

// Add remote peer with `protocol_a` only.
discovery.add_self_reported_address(
&remote_peer_id,
&[String::from_utf8(protocol_name_from_protocol_id(&protocol_a)).unwrap()],
remote_addr.clone(),
);

assert_eq!(
1,
discovery.kademlias.get_mut(&protocol_a)
.expect("Kademlia instance to exist.")
.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.num_entries(),
"Expected remote peer to be added to `protocol_a` Kademlia instance.",

);

assert!(
discovery.kademlias.get_mut(&protocol_b)
.expect("Kademlia instance to exist.")
.kbucket(remote_peer_id.clone())
.expect("Remote peer id not to be equal to local peer id.")
.is_empty(),
"Expected remote peer not to be added to `protocol_a` Kademlia instance.",
);
}
}

0 comments on commit 311f93a

Please sign in to comment.