From 311f93ae64de3f9f9b067def5911646be7cc69b0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 1 Jul 2020 07:59:42 +0200 Subject: [PATCH] client/network: Add peers to DHT only if protocols match With https://github.com/libp2p/rust-libp2p/pull/1628 rust-libp2p allows manually controlling which peers are inserted into the routing table. Instead of adding each peer to the routing table automatically, insert them only if they support the local nodes protocol id (e.g. `dot`) retrieved via the `identify` behaviour. For now this works around https://github.com/libp2p/rust-libp2p/issues/1611. In the future one might add more requirements. For example one might try to exclude light-clients. --- Cargo.toml | 7 +- client/network/src/behaviour.rs | 29 +++-- client/network/src/discovery.rs | 218 ++++++++++++++++++++++++++------ 3 files changed, 203 insertions(+), 51 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0fde01a443f47..5f677b51ceb9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -255,7 +255,8 @@ zeroize = { opt-level = 3 } # Substrate runtime requires unwinding. panic = "unwind" - [patch.crates-io] -libp2p-kad = { git = "https://github.com/romanb/rust-libp2p", branch = "kad-bucket-control" } -libp2p = { git = "https://github.com/romanb/rust-libp2p", branch = "kad-bucket-control" } +libp2p-kad = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" } +libp2p-tcp = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" } +libp2p = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" } +libp2p-wasm-ext = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" } \ No newline at end of file diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 6b29fadda5c4d..6ef83e1bcb6e7 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -24,6 +24,7 @@ use crate::{ use codec::Encode as _; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; +use libp2p::identify::IdentifyInfo; use libp2p::kad::record; use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; use log::debug; @@ -364,16 +365,28 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: debug_info::DebugInfoEvent) { - let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event; - if info.listen_addrs.len() > 30 { - debug!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \ - it is identified by {:?} and {:?}", peer_id, info.protocol_version, - info.agent_version + let debug_info::DebugInfoEvent::Identified { + peer_id, + info: IdentifyInfo { + protocol_version, + agent_version, + mut listen_addrs, + protocols, + .. + }, + } = event; + + if listen_addrs.len() > 30 { + debug!( + target: "sub-libp2p", + "Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}", + peer_id, protocol_version, agent_version ); - info.listen_addrs.truncate(30); + listen_addrs.truncate(30); } - for addr in &info.listen_addrs { - self.discovery.add_self_reported_address(&peer_id, addr.clone()); + + for addr in listen_addrs { + self.discovery.add_self_reported_address(&peer_id, &protocols, addr); } self.substrate.add_discovered_nodes(iter::once(peer_id)); } diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index a633412aff009..8b42ecaf12cc1 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -52,7 +52,7 @@ use ip_network::IpNetwork; use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey}; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler}; use libp2p::swarm::protocols_handler::multi::MultiHandler; -use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record}; +use libp2p::kad::{Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record}; use libp2p::kad::GetClosestPeersError; use libp2p::kad::handler::KademliaHandler; use libp2p::kad::QueryId; @@ -137,17 +137,9 @@ impl DiscoveryConfig { } /// Add discovery via Kademlia for the given protocol. - pub fn add_protocol(&mut self, p: ProtocolId) -> &mut Self { - // NB: If this protocol name derivation is changed, check if - // `DiscoveryBehaviour::new_handler` is still correct. - let proto_name = { - let mut v = vec![b'/']; - v.extend_from_slice(p.as_bytes()); - v.extend_from_slice(b"/kad"); - v - }; - - self.add_kademlia(p, proto_name); + pub fn add_protocol(&mut self, id: ProtocolId) -> &mut Self { + let name = protocol_name_from_protocol_id(&id); + self.add_kademlia(id, name); self } @@ -159,6 +151,7 @@ impl DiscoveryConfig { let mut config = KademliaConfig::default(); config.set_protocol_name(proto_name); + config.set_kbucket_inserts(KademliaBucketInserts::Manual); let store = MemoryStore::new(self.local_peer_id.clone()); let mut kad = Kademlia::with_config(self.local_peer_id.clone(), store, config); @@ -263,13 +256,26 @@ impl DiscoveryBehaviour { /// /// **Note**: It is important that you call this method, otherwise the discovery mechanism will /// not properly work. - pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) { + pub fn add_self_reported_address(&mut self, peer_id: &PeerId, protocols: &[String], addr: Multiaddr) { if self.allow_non_globals_in_dht || self.can_add_to_dht(&addr) { - for k in self.kademlias.values_mut() { - k.add_address(peer_id, addr.clone()); + let mut added = false; + for (protocol_id, kademlia) in self.kademlias.iter_mut() { + // TODO: Look into way around generating the protocol name each time. + if protocols.iter().any(|p| p.as_bytes() == protocol_name_from_protocol_id(protocol_id).as_slice()) { + kademlia.add_address(peer_id, addr.clone()); + added = true; + } + } + + if !added { + log::trace!( + target: "sub-libp2p", + "Ignoring self-reported address {} from {} as remote node is not part of any \ + Kademlia DHTs supported by the local node.", addr, peer_id, + ); } } else { - log::trace!(target: "sub-libp2p", "Ignoring self-reported address {} from {}", addr, peer_id); + log::trace!(target: "sub-libp2p", "Ignoring self-reported non-global address {} from {}.", addr, peer_id); } } @@ -339,17 +345,21 @@ impl DiscoveryBehaviour { } /// Event generated by the `DiscoveryBehaviour`. +#[derive(Debug)] pub enum DiscoveryOut { - /// The address of a peer has been added to the Kademlia routing table. - /// - /// Can be called multiple times with the same identity. + /// A connection to a peer has been established but the peer has not been + /// added to the routing table because [`KademliaBucketInserts::Manual`] is + /// configured. If the peer is to be included in the routing table, it must + /// be explicitly added via + /// [`DiscoveryBehaviour::add_self_reported_address`]. Discovered(PeerId), /// A peer connected to this node for whom no listen address is known. /// /// In order for the peer to be added to the Kademlia routing table, a known - /// listen address must be added via [`DiscoveryBehaviour::add_self_reported_address`], - /// e.g. obtained through the `identify` protocol. + /// listen address must be added via + /// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through + /// the `identify` protocol. UnroutablePeer(PeerId), /// The DHT yielded results for the record request, grouped in (key, value) pairs. @@ -582,10 +592,22 @@ impl NetworkBehaviour for DiscoveryBehaviour { let ev = DiscoveryOut::UnroutablePeer(peer); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); } - KademliaEvent::RoutingUpdated { peer, .. } => { + // TODO: By ignoring the `addresses` field we ignore the + // addresses reported through Kademlia. Are the + // addresses reported via the identify behaviour always + // superior? We depend on the identify behaviour + // finishing before the connection is closed as + // otherwise identify does not know how to connect to + // the node as we drop the addresses here. + KademliaEvent::RoutablePeer { peer, .. } => { let ev = DiscoveryOut::Discovered(peer); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); } + // `KademliaBucketInserts::Manual` is configured for each Kademlia instance. + // Thus the `RoutingUpdated` event or the `PendingRoutablePeer` event was + // preceded by a call to `Kademlia::add_address`. This implies that we are + // already aware of the node and thereby don't need to take any actions. + KademliaEvent::RoutingUpdated { .. } | KademliaEvent::PendingRoutablePeer { .. } => {}, KademliaEvent::QueryResult { result: QueryResult::GetClosestPeers(res), .. } => { match res { Err(GetClosestPeersError::Timeout { key, peers }) => { @@ -703,25 +725,37 @@ impl NetworkBehaviour for DiscoveryBehaviour { } } +// NB: If this protocol name derivation is changed, check if +// `DiscoveryBehaviour::new_handler` is still correct. +fn protocol_name_from_protocol_id(id: &ProtocolId) -> Vec { + let mut v = vec![b'/']; + v.extend_from_slice(id.as_bytes()); + v.extend_from_slice(b"/kad"); + v +} + #[cfg(test)] mod tests { use crate::config::ProtocolId; use futures::prelude::*; use libp2p::identity::Keypair; - use libp2p::Multiaddr; + use libp2p::{Multiaddr, PeerId}; use libp2p::core::upgrade; use libp2p::core::transport::{Transport, MemoryTransport}; use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt}; use libp2p::swarm::Swarm; use std::{collections::HashSet, task::Poll}; - use super::{DiscoveryConfig, DiscoveryOut}; + use super::{DiscoveryConfig, DiscoveryOut, protocol_name_from_protocol_id}; #[test] fn discovery_working() { - let mut user_defined = Vec::new(); + let mut first_swarm_peer_id_and_addr = None; + // TODO: Should this not be something like `dot` or `sub`? + let protocol_id = ProtocolId::from(b"/test/kad/1.0.0".as_ref()); - // Build swarms whose behaviour is `DiscoveryBehaviour`. - let mut swarms = (0..25).map(|_| { + // Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of + // the first swarm via `with_user_defined`. + let mut swarms = (0..25).map(|i| { let keypair = Keypair::generate_ed25519(); let keypair2 = keypair.clone(); @@ -744,14 +778,12 @@ mod tests { }); let behaviour = { - let protocol_id: &[u8] = b"/test/kad/1.0.0"; - let mut config = DiscoveryConfig::new(keypair.public()); - config.with_user_defined(user_defined.clone()) + config.with_user_defined(first_swarm_peer_id_and_addr.clone()) .allow_private_ipv4(true) .allow_non_globals_in_dht(true) .discovery_limit(50) - .add_protocol(ProtocolId::from(protocol_id)); + .add_protocol(protocol_id.clone()); config.finish() }; @@ -759,8 +791,8 @@ mod tests { let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); - if user_defined.is_empty() { - user_defined.push((keypair.public().into_peer_id(), listen_addr.clone())); + if i == 0 { + first_swarm_peer_id_and_addr = Some((keypair.public().into_peer_id(), listen_addr.clone())) } Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap(); @@ -769,7 +801,10 @@ mod tests { // Build a `Vec>` with the list of nodes remaining to be discovered. let mut to_discover = (0..swarms.len()).map(|n| { - (0..swarms.len()).filter(|p| *p != n) + (0..swarms.len()) + // Skip the first swarm as all other swarms already know it. + .skip(1) + .filter(|p| *p != n) .map(|p| Swarm::local_peer_id(&swarms[p].0).clone()) .collect::>() }).collect::>(); @@ -780,7 +815,7 @@ mod tests { match swarms[swarm_n].0.poll_next_unpin(cx) { Poll::Ready(Some(e)) => { match e { - DiscoveryOut::UnroutablePeer(other) => { + DiscoveryOut::UnroutablePeer(other) | DiscoveryOut::Discovered(other) => { // Call `add_self_reported_address` to simulate identify happening. let addr = swarms.iter().find_map(|(s, a)| if s.local_peer_id == other { @@ -789,12 +824,16 @@ mod tests { None }) .unwrap(); - swarms[swarm_n].0.add_self_reported_address(&other, addr); - }, - DiscoveryOut::Discovered(other) => { + swarms[swarm_n].0.add_self_reported_address( + &other, + &[String::from_utf8(protocol_name_from_protocol_id(&protocol_id)).unwrap()], + addr, + ); + to_discover[swarm_n].remove(&other); - } - _ => {} + }, + DiscoveryOut::RandomKademliaStarted(_) => {}, + e => {panic!("Unexpected event: {:?}", e)}, } continue 'polling } @@ -813,4 +852,103 @@ mod tests { futures::executor::block_on(fut); } + + #[test] + fn discovery_ignores_peers_with_unknown_protocols() { + let supported_protocol_id = ProtocolId::from(b"a".as_ref()); + let unsupported_protocol_id = ProtocolId::from(b"b".as_ref()); + + let mut discovery = { + let keypair = Keypair::generate_ed25519(); + let mut config = DiscoveryConfig::new(keypair.public()); + config.allow_private_ipv4(true) + .allow_non_globals_in_dht(true) + .discovery_limit(50) + .add_protocol(supported_protocol_id.clone()); + config.finish() + }; + + let remote_peer_id = PeerId::random(); + let remote_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + + // Add remote peer with unsupported protocol. + discovery.add_self_reported_address( + &remote_peer_id, + &[String::from_utf8(protocol_name_from_protocol_id(&unsupported_protocol_id)).unwrap()], + remote_addr.clone(), + ); + + for kademlia in discovery.kademlias.values_mut() { + assert!( + kademlia.kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .is_empty(), + "Expect peer with unsupported protocol not to be added." + ); + } + + // Add remote peer with supported protocol. + discovery.add_self_reported_address( + &remote_peer_id, + &[String::from_utf8(protocol_name_from_protocol_id(&supported_protocol_id)).unwrap()], + remote_addr.clone(), + ); + + for kademlia in discovery.kademlias.values_mut() { + assert_eq!( + 1, + kademlia.kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .num_entries(), + "Expect peer with supported protocol to be added." + ); + } + } + + #[test] + fn discovery_adds_peer_to_kademlia_of_same_protocol_only() { + let protocol_a = ProtocolId::from(b"a".as_ref()); + let protocol_b = ProtocolId::from(b"b".as_ref()); + + let mut discovery = { + let keypair = Keypair::generate_ed25519(); + let mut config = DiscoveryConfig::new(keypair.public()); + config.allow_private_ipv4(true) + .allow_non_globals_in_dht(true) + .discovery_limit(50) + .add_protocol(protocol_a.clone()) + .add_protocol(protocol_b.clone()); + config.finish() + }; + + let remote_peer_id = PeerId::random(); + let remote_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + + // Add remote peer with `protocol_a` only. + discovery.add_self_reported_address( + &remote_peer_id, + &[String::from_utf8(protocol_name_from_protocol_id(&protocol_a)).unwrap()], + remote_addr.clone(), + ); + + assert_eq!( + 1, + discovery.kademlias.get_mut(&protocol_a) + .expect("Kademlia instance to exist.") + .kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .num_entries(), + "Expected remote peer to be added to `protocol_a` Kademlia instance.", + + ); + + assert!( + discovery.kademlias.get_mut(&protocol_b) + .expect("Kademlia instance to exist.") + .kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .is_empty(), + "Expected remote peer not to be added to `protocol_a` Kademlia instance.", + ); + } }