diff --git a/Cargo.toml b/Cargo.toml index 0fde01a443f47..5f677b51ceb9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -255,7 +255,8 @@ zeroize = { opt-level = 3 } # Substrate runtime requires unwinding. panic = "unwind" - [patch.crates-io] -libp2p-kad = { git = "https://github.com/romanb/rust-libp2p", branch = "kad-bucket-control" } -libp2p = { git = "https://github.com/romanb/rust-libp2p", branch = "kad-bucket-control" } +libp2p-kad = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" } +libp2p-tcp = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" } +libp2p = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" } +libp2p-wasm-ext = { git = "https://github.com/libp2p/rust-libp2p", branch = "master" } \ No newline at end of file diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 6b29fadda5c4d..6ef83e1bcb6e7 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -24,6 +24,7 @@ use crate::{ use codec::Encode as _; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; +use libp2p::identify::IdentifyInfo; use libp2p::kad::record; use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; use log::debug; @@ -364,16 +365,28 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: debug_info::DebugInfoEvent) { - let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event; - if info.listen_addrs.len() > 30 { - debug!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \ - it is identified by {:?} and {:?}", peer_id, info.protocol_version, - info.agent_version + let debug_info::DebugInfoEvent::Identified { + peer_id, + info: IdentifyInfo { + protocol_version, + agent_version, + mut listen_addrs, + protocols, + .. + }, + } = event; + + if listen_addrs.len() > 30 { + debug!( + target: "sub-libp2p", + "Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}", + peer_id, protocol_version, agent_version ); - info.listen_addrs.truncate(30); + listen_addrs.truncate(30); } - for addr in &info.listen_addrs { - self.discovery.add_self_reported_address(&peer_id, addr.clone()); + + for addr in listen_addrs { + self.discovery.add_self_reported_address(&peer_id, &protocols, addr); } self.substrate.add_discovered_nodes(iter::once(peer_id)); } diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index a633412aff009..8b42ecaf12cc1 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -52,7 +52,7 @@ use ip_network::IpNetwork; use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey}; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler}; use libp2p::swarm::protocols_handler::multi::MultiHandler; -use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record}; +use libp2p::kad::{Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record}; use libp2p::kad::GetClosestPeersError; use libp2p::kad::handler::KademliaHandler; use libp2p::kad::QueryId; @@ -137,17 +137,9 @@ impl DiscoveryConfig { } /// Add discovery via Kademlia for the given protocol. - pub fn add_protocol(&mut self, p: ProtocolId) -> &mut Self { - // NB: If this protocol name derivation is changed, check if - // `DiscoveryBehaviour::new_handler` is still correct. - let proto_name = { - let mut v = vec![b'/']; - v.extend_from_slice(p.as_bytes()); - v.extend_from_slice(b"/kad"); - v - }; - - self.add_kademlia(p, proto_name); + pub fn add_protocol(&mut self, id: ProtocolId) -> &mut Self { + let name = protocol_name_from_protocol_id(&id); + self.add_kademlia(id, name); self } @@ -159,6 +151,7 @@ impl DiscoveryConfig { let mut config = KademliaConfig::default(); config.set_protocol_name(proto_name); + config.set_kbucket_inserts(KademliaBucketInserts::Manual); let store = MemoryStore::new(self.local_peer_id.clone()); let mut kad = Kademlia::with_config(self.local_peer_id.clone(), store, config); @@ -263,13 +256,26 @@ impl DiscoveryBehaviour { /// /// **Note**: It is important that you call this method, otherwise the discovery mechanism will /// not properly work. - pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) { + pub fn add_self_reported_address(&mut self, peer_id: &PeerId, protocols: &[String], addr: Multiaddr) { if self.allow_non_globals_in_dht || self.can_add_to_dht(&addr) { - for k in self.kademlias.values_mut() { - k.add_address(peer_id, addr.clone()); + let mut added = false; + for (protocol_id, kademlia) in self.kademlias.iter_mut() { + // TODO: Look into way around generating the protocol name each time. + if protocols.iter().any(|p| p.as_bytes() == protocol_name_from_protocol_id(protocol_id).as_slice()) { + kademlia.add_address(peer_id, addr.clone()); + added = true; + } + } + + if !added { + log::trace!( + target: "sub-libp2p", + "Ignoring self-reported address {} from {} as remote node is not part of any \ + Kademlia DHTs supported by the local node.", addr, peer_id, + ); } } else { - log::trace!(target: "sub-libp2p", "Ignoring self-reported address {} from {}", addr, peer_id); + log::trace!(target: "sub-libp2p", "Ignoring self-reported non-global address {} from {}.", addr, peer_id); } } @@ -339,17 +345,21 @@ impl DiscoveryBehaviour { } /// Event generated by the `DiscoveryBehaviour`. +#[derive(Debug)] pub enum DiscoveryOut { - /// The address of a peer has been added to the Kademlia routing table. - /// - /// Can be called multiple times with the same identity. + /// A connection to a peer has been established but the peer has not been + /// added to the routing table because [`KademliaBucketInserts::Manual`] is + /// configured. If the peer is to be included in the routing table, it must + /// be explicitly added via + /// [`DiscoveryBehaviour::add_self_reported_address`]. Discovered(PeerId), /// A peer connected to this node for whom no listen address is known. /// /// In order for the peer to be added to the Kademlia routing table, a known - /// listen address must be added via [`DiscoveryBehaviour::add_self_reported_address`], - /// e.g. obtained through the `identify` protocol. + /// listen address must be added via + /// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through + /// the `identify` protocol. UnroutablePeer(PeerId), /// The DHT yielded results for the record request, grouped in (key, value) pairs. @@ -582,10 +592,22 @@ impl NetworkBehaviour for DiscoveryBehaviour { let ev = DiscoveryOut::UnroutablePeer(peer); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); } - KademliaEvent::RoutingUpdated { peer, .. } => { + // TODO: By ignoring the `addresses` field we ignore the + // addresses reported through Kademlia. Are the + // addresses reported via the identify behaviour always + // superior? We depend on the identify behaviour + // finishing before the connection is closed as + // otherwise identify does not know how to connect to + // the node as we drop the addresses here. + KademliaEvent::RoutablePeer { peer, .. } => { let ev = DiscoveryOut::Discovered(peer); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); } + // `KademliaBucketInserts::Manual` is configured for each Kademlia instance. + // Thus the `RoutingUpdated` event or the `PendingRoutablePeer` event was + // preceded by a call to `Kademlia::add_address`. This implies that we are + // already aware of the node and thereby don't need to take any actions. + KademliaEvent::RoutingUpdated { .. } | KademliaEvent::PendingRoutablePeer { .. } => {}, KademliaEvent::QueryResult { result: QueryResult::GetClosestPeers(res), .. } => { match res { Err(GetClosestPeersError::Timeout { key, peers }) => { @@ -703,25 +725,37 @@ impl NetworkBehaviour for DiscoveryBehaviour { } } +// NB: If this protocol name derivation is changed, check if +// `DiscoveryBehaviour::new_handler` is still correct. +fn protocol_name_from_protocol_id(id: &ProtocolId) -> Vec { + let mut v = vec![b'/']; + v.extend_from_slice(id.as_bytes()); + v.extend_from_slice(b"/kad"); + v +} + #[cfg(test)] mod tests { use crate::config::ProtocolId; use futures::prelude::*; use libp2p::identity::Keypair; - use libp2p::Multiaddr; + use libp2p::{Multiaddr, PeerId}; use libp2p::core::upgrade; use libp2p::core::transport::{Transport, MemoryTransport}; use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt}; use libp2p::swarm::Swarm; use std::{collections::HashSet, task::Poll}; - use super::{DiscoveryConfig, DiscoveryOut}; + use super::{DiscoveryConfig, DiscoveryOut, protocol_name_from_protocol_id}; #[test] fn discovery_working() { - let mut user_defined = Vec::new(); + let mut first_swarm_peer_id_and_addr = None; + // TODO: Should this not be something like `dot` or `sub`? + let protocol_id = ProtocolId::from(b"/test/kad/1.0.0".as_ref()); - // Build swarms whose behaviour is `DiscoveryBehaviour`. - let mut swarms = (0..25).map(|_| { + // Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of + // the first swarm via `with_user_defined`. + let mut swarms = (0..25).map(|i| { let keypair = Keypair::generate_ed25519(); let keypair2 = keypair.clone(); @@ -744,14 +778,12 @@ mod tests { }); let behaviour = { - let protocol_id: &[u8] = b"/test/kad/1.0.0"; - let mut config = DiscoveryConfig::new(keypair.public()); - config.with_user_defined(user_defined.clone()) + config.with_user_defined(first_swarm_peer_id_and_addr.clone()) .allow_private_ipv4(true) .allow_non_globals_in_dht(true) .discovery_limit(50) - .add_protocol(ProtocolId::from(protocol_id)); + .add_protocol(protocol_id.clone()); config.finish() }; @@ -759,8 +791,8 @@ mod tests { let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); - if user_defined.is_empty() { - user_defined.push((keypair.public().into_peer_id(), listen_addr.clone())); + if i == 0 { + first_swarm_peer_id_and_addr = Some((keypair.public().into_peer_id(), listen_addr.clone())) } Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap(); @@ -769,7 +801,10 @@ mod tests { // Build a `Vec>` with the list of nodes remaining to be discovered. let mut to_discover = (0..swarms.len()).map(|n| { - (0..swarms.len()).filter(|p| *p != n) + (0..swarms.len()) + // Skip the first swarm as all other swarms already know it. + .skip(1) + .filter(|p| *p != n) .map(|p| Swarm::local_peer_id(&swarms[p].0).clone()) .collect::>() }).collect::>(); @@ -780,7 +815,7 @@ mod tests { match swarms[swarm_n].0.poll_next_unpin(cx) { Poll::Ready(Some(e)) => { match e { - DiscoveryOut::UnroutablePeer(other) => { + DiscoveryOut::UnroutablePeer(other) | DiscoveryOut::Discovered(other) => { // Call `add_self_reported_address` to simulate identify happening. let addr = swarms.iter().find_map(|(s, a)| if s.local_peer_id == other { @@ -789,12 +824,16 @@ mod tests { None }) .unwrap(); - swarms[swarm_n].0.add_self_reported_address(&other, addr); - }, - DiscoveryOut::Discovered(other) => { + swarms[swarm_n].0.add_self_reported_address( + &other, + &[String::from_utf8(protocol_name_from_protocol_id(&protocol_id)).unwrap()], + addr, + ); + to_discover[swarm_n].remove(&other); - } - _ => {} + }, + DiscoveryOut::RandomKademliaStarted(_) => {}, + e => {panic!("Unexpected event: {:?}", e)}, } continue 'polling } @@ -813,4 +852,103 @@ mod tests { futures::executor::block_on(fut); } + + #[test] + fn discovery_ignores_peers_with_unknown_protocols() { + let supported_protocol_id = ProtocolId::from(b"a".as_ref()); + let unsupported_protocol_id = ProtocolId::from(b"b".as_ref()); + + let mut discovery = { + let keypair = Keypair::generate_ed25519(); + let mut config = DiscoveryConfig::new(keypair.public()); + config.allow_private_ipv4(true) + .allow_non_globals_in_dht(true) + .discovery_limit(50) + .add_protocol(supported_protocol_id.clone()); + config.finish() + }; + + let remote_peer_id = PeerId::random(); + let remote_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + + // Add remote peer with unsupported protocol. + discovery.add_self_reported_address( + &remote_peer_id, + &[String::from_utf8(protocol_name_from_protocol_id(&unsupported_protocol_id)).unwrap()], + remote_addr.clone(), + ); + + for kademlia in discovery.kademlias.values_mut() { + assert!( + kademlia.kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .is_empty(), + "Expect peer with unsupported protocol not to be added." + ); + } + + // Add remote peer with supported protocol. + discovery.add_self_reported_address( + &remote_peer_id, + &[String::from_utf8(protocol_name_from_protocol_id(&supported_protocol_id)).unwrap()], + remote_addr.clone(), + ); + + for kademlia in discovery.kademlias.values_mut() { + assert_eq!( + 1, + kademlia.kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .num_entries(), + "Expect peer with supported protocol to be added." + ); + } + } + + #[test] + fn discovery_adds_peer_to_kademlia_of_same_protocol_only() { + let protocol_a = ProtocolId::from(b"a".as_ref()); + let protocol_b = ProtocolId::from(b"b".as_ref()); + + let mut discovery = { + let keypair = Keypair::generate_ed25519(); + let mut config = DiscoveryConfig::new(keypair.public()); + config.allow_private_ipv4(true) + .allow_non_globals_in_dht(true) + .discovery_limit(50) + .add_protocol(protocol_a.clone()) + .add_protocol(protocol_b.clone()); + config.finish() + }; + + let remote_peer_id = PeerId::random(); + let remote_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + + // Add remote peer with `protocol_a` only. + discovery.add_self_reported_address( + &remote_peer_id, + &[String::from_utf8(protocol_name_from_protocol_id(&protocol_a)).unwrap()], + remote_addr.clone(), + ); + + assert_eq!( + 1, + discovery.kademlias.get_mut(&protocol_a) + .expect("Kademlia instance to exist.") + .kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .num_entries(), + "Expected remote peer to be added to `protocol_a` Kademlia instance.", + + ); + + assert!( + discovery.kademlias.get_mut(&protocol_b) + .expect("Kademlia instance to exist.") + .kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .is_empty(), + "Expected remote peer not to be added to `protocol_a` Kademlia instance.", + ); + } }