From 92c8cc451f4eb2219bb012828912264588d8a059 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 May 2023 10:02:19 +0100 Subject: [PATCH] feat(kad): implement automatic client mode Currently, the kademlia behaviour can only learn that the remote node supports kademlia on a particular connection if we successfully negotiate a stream to them. Using the newly introduced abstractions from #3651, we don't have to attempt to establish a stream to the remote to learn whether they support kademlia on a connection but we can directly learn it from the `ConnectionEvent::RemoteProtocolsChange` event. This happens directly once a connection is established which should overall benefit the DHT. Clients do not advertise the kademlia protocol and thus we will immediately learn that a given connection is not suitable for kadmelia requests. We may receive inbound messages from it but this does not affect the routing table. Resolves: #2032. Pull-Request: #3877. --- Cargo.lock | 3 + protocols/kad/CHANGELOG.md | 6 + protocols/kad/Cargo.toml | 4 + protocols/kad/src/behaviour.rs | 151 ++++++++++++++++++---- protocols/kad/src/behaviour/test.rs | 1 + protocols/kad/src/handler.rs | 150 ++++++++++++++------- protocols/kad/tests/client_mode.rs | 133 +++++++++++++++++++ swarm/src/behaviour/external_addresses.rs | 145 +++++++++++++++++---- 8 files changed, 490 insertions(+), 103 deletions(-) create mode 100644 protocols/kad/tests/client_mode.rs diff --git a/Cargo.lock b/Cargo.lock index ab88f17c2e8..29b30f03cf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2638,6 +2638,7 @@ name = "libp2p-kad" version = "0.44.0" dependencies = [ "arrayvec", + "async-std", "asynchronous-codec", "bytes", "either", @@ -2647,9 +2648,11 @@ dependencies = [ "futures-timer", "instant", "libp2p-core", + "libp2p-identify", "libp2p-identity", "libp2p-noise", "libp2p-swarm", + "libp2p-swarm-test", "libp2p-yamux", "log", "quick-protobuf", diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index d7a35b1b8fb..061420b676b 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -6,7 +6,13 @@ - Remove deprecated public modules `handler`, `protocol` and `kbucket`. See [PR 3896]. +- Automatically configure client/server mode based on external addresses. + If we have or learn about an external address of our node, we operate in server-mode and thus allow inbound requests. + By default, a node is in client-mode and only allows outbound requests. + See [PR 3877]. + [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 +[PR 3877]: https://github.com/libp2p/rust-libp2p/pull/3877 [PR 3896]: https://github.com/libp2p/rust-libp2p/pull/3896 ## 0.43.3 diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 5fdfbd9bc77..d98f21561e0 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -34,9 +34,13 @@ serde = { version = "1.0", optional = true, features = ["derive"] } thiserror = "1" [dev-dependencies] +async-std = { version = "1.12.0", features = ["attributes"] } env_logger = "0.10.0" futures-timer = "3.0" +libp2p-identify = { path = "../identify" } libp2p-noise = { workspace = true } +libp2p-swarm = { path = "../../swarm", features = ["macros"] } +libp2p-swarm-test = { path = "../../swarm-test" } libp2p-yamux = { workspace = true } quickcheck = { workspace = true } diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index a690b8153b6..5d4b84c65f6 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -23,10 +23,7 @@ mod test; use crate::addresses::Addresses; -use crate::handler::{ - KademliaHandler, KademliaHandlerConfig, KademliaHandlerEvent, KademliaHandlerIn, - KademliaRequestId, -}; +use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId}; use crate::jobs::*; use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus}; use crate::protocol::{KadConnectionType, KadPeer, KademliaProtocolConfig}; @@ -52,7 +49,7 @@ use libp2p_swarm::{ }; use log::{debug, info, warn}; use smallvec::SmallVec; -use std::collections::{BTreeMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::fmt; use std::num::NonZeroUsize; use std::task::{Context, Poll}; @@ -109,11 +106,15 @@ pub struct Kademlia { external_addresses: ExternalAddresses, + connections: HashMap, + /// See [`KademliaConfig::caching`]. caching: KademliaCaching, local_peer_id: PeerId, + mode: Mode, + /// The record storage. store: TStore, } @@ -453,6 +454,8 @@ where connection_idle_timeout: config.connection_idle_timeout, external_addresses: Default::default(), local_peer_id: id, + connections: Default::default(), + mode: Mode::Client, } } @@ -1937,9 +1940,12 @@ where ConnectionClosed { peer_id, remaining_established, + connection_id, .. }: ConnectionClosed<::ConnectionHandler>, ) { + self.connections.remove(&connection_id); + if remaining_established == 0 { for query in self.queries.iter_mut() { query.on_failure(&peer_id); @@ -1964,43 +1970,45 @@ where fn handle_established_inbound_connection( &mut self, - _connection_id: ConnectionId, + connection_id: ConnectionId, peer: PeerId, local_addr: &Multiaddr, remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { + let connected_point = ConnectedPoint::Listener { + local_addr: local_addr.clone(), + send_back_addr: remote_addr.clone(), + }; + self.connections.insert(connection_id, peer); + Ok(KademliaHandler::new( - KademliaHandlerConfig { - protocol_config: self.protocol_config.clone(), - allow_listening: true, - idle_timeout: self.connection_idle_timeout, - }, - ConnectedPoint::Listener { - local_addr: local_addr.clone(), - send_back_addr: remote_addr.clone(), - }, + self.protocol_config.clone(), + self.connection_idle_timeout, + connected_point, peer, + self.mode, )) } fn handle_established_outbound_connection( &mut self, - _connection_id: ConnectionId, + connection_id: ConnectionId, peer: PeerId, addr: &Multiaddr, role_override: Endpoint, ) -> Result, ConnectionDenied> { + let connected_point = ConnectedPoint::Dialer { + address: addr.clone(), + role_override, + }; + self.connections.insert(connection_id, peer); + Ok(KademliaHandler::new( - KademliaHandlerConfig { - protocol_config: self.protocol_config.clone(), - allow_listening: true, - idle_timeout: self.connection_idle_timeout, - }, - ConnectedPoint::Dialer { - address: addr.clone(), - role_override, - }, + self.protocol_config.clone(), + self.connection_idle_timeout, + connected_point, peer, + self.mode, )) } @@ -2055,9 +2063,18 @@ where ConnectedPoint::Dialer { address, .. } => Some(address), ConnectedPoint::Listener { .. } => None, }; + self.connection_updated(source, address, NodeStatus::Connected); } + KademliaHandlerEvent::ProtocolNotSupported { endpoint } => { + let address = match endpoint { + ConnectedPoint::Dialer { address, .. } => Some(address), + ConnectedPoint::Listener { .. } => None, + }; + self.connection_updated(source, address, NodeStatus::Disconnected); + } + KademliaHandlerEvent::FindNodeReq { key, request_id } => { let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); @@ -2419,7 +2436,63 @@ where fn on_swarm_event(&mut self, event: FromSwarm) { self.listen_addresses.on_swarm_event(&event); - self.external_addresses.on_swarm_event(&event); + let external_addresses_changed = self.external_addresses.on_swarm_event(&event); + + self.mode = match (self.external_addresses.as_slice(), self.mode) { + ([], Mode::Server) => { + log::debug!("Switching to client-mode because we no longer have any confirmed external addresses"); + + Mode::Client + } + ([], Mode::Client) => { + // Previously client-mode, now also client-mode because no external addresses. + + Mode::Client + } + (confirmed_external_addresses, Mode::Client) => { + if log::log_enabled!(log::Level::Debug) { + let confirmed_external_addresses = + to_comma_separated_list(confirmed_external_addresses); + + log::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable"); + } + + Mode::Server + } + (confirmed_external_addresses, Mode::Server) => { + debug_assert!( + !confirmed_external_addresses.is_empty(), + "Previous match arm handled empty list" + ); + + // Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam. + + Mode::Server + } + }; + + if external_addresses_changed && !self.connections.is_empty() { + let num_connections = self.connections.len(); + + log::debug!( + "External addresses changed, re-configuring {} established connection{}", + num_connections, + if num_connections > 1 { "s" } else { "" } + ); + + self.queued_events + .extend( + self.connections + .iter() + .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler { + peer_id: *peer_id, + handler: NotifyHandler::One(*conn_id), + event: KademliaHandlerIn::ReconfigureMode { + new_mode: self.mode, + }, + }), + ); + } match event { FromSwarm::ConnectionEstablished(connection_established) => { @@ -3187,3 +3260,29 @@ pub enum RoutingUpdate { /// peer ID). Failed, } + +#[derive(PartialEq, Copy, Clone, Debug)] +pub enum Mode { + Client, + Server, +} + +impl fmt::Display for Mode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Mode::Client => write!(f, "client"), + Mode::Server => write!(f, "server"), + } + } +} + +fn to_comma_separated_list(confirmed_external_addresses: &[T]) -> String +where + T: ToString, +{ + confirmed_external_addresses + .iter() + .map(|addr| addr.to_string()) + .collect::>() + .join(", ") +} diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 43145f0c79e..ce7712b9e8e 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -71,6 +71,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { let address: Multiaddr = Protocol::Memory(random::()).into(); swarm.listen_on(address.clone()).unwrap(); + swarm.add_external_address(address.clone()); (address, swarm) } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index a874b466286..948f14f30eb 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::behaviour::Mode; use crate::protocol::{ KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg, KademliaProtocolConfig, @@ -35,7 +36,7 @@ use libp2p_swarm::handler::{ }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamUpgradeError, - SubstreamProtocol, + SubstreamProtocol, SupportedProtocols, }; use log::trace; use std::collections::VecDeque; @@ -54,8 +55,14 @@ const MAX_NUM_SUBSTREAMS: usize = 32; /// /// It also handles requests made by the remote. pub struct KademliaHandler { - /// Configuration for the Kademlia protocol. - config: KademliaHandlerConfig, + /// Configuration of the wire protocol. + protocol_config: KademliaProtocolConfig, + + /// In client mode, we don't accept inbound substreams. + mode: Mode, + + /// Time after which we close an idle connection. + idle_timeout: Duration, /// Next unique ID of a connection. next_connec_unique_id: UniqueConnecId, @@ -85,35 +92,27 @@ pub struct KademliaHandler { /// The current state of protocol confirmation. protocol_status: ProtocolStatus, + + remote_supported_protocols: SupportedProtocols, } /// The states of protocol confirmation that a connection /// handler transitions through. +#[derive(Copy, Clone)] enum ProtocolStatus { /// It is as yet unknown whether the remote supports the /// configured protocol name. - Unconfirmed, + Unknown, /// The configured protocol name has been confirmed by the remote /// but has not yet been reported to the `Kademlia` behaviour. Confirmed, + /// The configured protocol name(s) are not or no longer supported by the remote. + NotSupported, /// The configured protocol has been confirmed by the remote /// and the confirmation reported to the `Kademlia` behaviour. Reported, } -/// Configuration of a [`KademliaHandler`]. -#[derive(Debug, Clone)] -pub struct KademliaHandlerConfig { - /// Configuration of the wire protocol. - pub protocol_config: KademliaProtocolConfig, - - /// If false, we deny incoming requests. - pub allow_listening: bool, - - /// Time after which we close an idle connection. - pub idle_timeout: Duration, -} - /// State of an active outbound substream. enum OutboundSubstreamState { /// Waiting to send a message to the remote. @@ -214,13 +213,11 @@ impl InboundSubstreamState { #[derive(Debug)] pub enum KademliaHandlerEvent { /// The configured protocol name has been confirmed by the peer through - /// a successfully negotiated substream. - /// - /// This event is only emitted once by a handler upon the first - /// successfully negotiated inbound or outbound substream and - /// indicates that the connected peer participates in the Kademlia - /// overlay network identified by the configured protocol name. + /// a successfully negotiated substream or by learning the supported protocols of the remote. ProtocolConfirmed { endpoint: ConnectedPoint }, + /// The configured protocol name(s) are not or no longer supported by the peer on the provided + /// connection and it should be removed from the routing table. + ProtocolNotSupported { endpoint: ConnectedPoint }, /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes /// returned is not specified, but should be around 20. @@ -368,6 +365,9 @@ pub enum KademliaHandlerIn { /// for the query on the remote. Reset(KademliaRequestId), + /// Change the connection to the specified mode. + ReconfigureMode { new_mode: Mode }, + /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes /// returned is not specified, but should be around 20. FindNodeReq { @@ -468,16 +468,32 @@ pub struct KademliaRequestId { struct UniqueConnecId(u64); impl KademliaHandler { - /// Create a [`KademliaHandler`] using the given configuration. pub fn new( - config: KademliaHandlerConfig, + protocol_config: KademliaProtocolConfig, + idle_timeout: Duration, endpoint: ConnectedPoint, remote_peer_id: PeerId, + mode: Mode, ) -> Self { - let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout); + match &endpoint { + ConnectedPoint::Dialer { .. } => { + log::debug!( + "Operating in {mode}-mode on new outbound connection to {remote_peer_id}" + ); + } + ConnectedPoint::Listener { .. } => { + log::debug!( + "Operating in {mode}-mode on new inbound connection to {remote_peer_id}" + ); + } + } + + let keep_alive = KeepAlive::Until(Instant::now() + idle_timeout); KademliaHandler { - config, + protocol_config, + mode, + idle_timeout, endpoint, remote_peer_id, next_connec_unique_id: UniqueConnecId(0), @@ -486,7 +502,8 @@ impl KademliaHandler { num_requested_outbound_streams: 0, pending_messages: Default::default(), keep_alive, - protocol_status: ProtocolStatus::Unconfirmed, + protocol_status: ProtocolStatus::Unknown, + remote_supported_protocols: Default::default(), } } @@ -506,7 +523,7 @@ impl KademliaHandler { self.num_requested_outbound_streams -= 1; - if let ProtocolStatus::Unconfirmed = self.protocol_status { + if let ProtocolStatus::Unknown = self.protocol_status { // Upon the first successfully negotiated substream, we know that the // remote is configured with the same protocol name and we want // the behaviour to add this peer to the routing table, if possible. @@ -528,7 +545,7 @@ impl KademliaHandler { future::Either::Right(p) => void::unreachable(p), }; - if let ProtocolStatus::Unconfirmed = self.protocol_status { + if let ProtocolStatus::Unknown = self.protocol_status { // Upon the first successfully negotiated substream, we know that the // remote is configured with the same protocol name and we want // the behaviour to add this peer to the routing table, if possible. @@ -559,7 +576,6 @@ impl KademliaHandler { } } - debug_assert!(self.config.allow_listening); let connec_unique_id = self.next_connec_unique_id; self.next_connec_unique_id.0 += 1; self.inbound_substreams @@ -601,11 +617,9 @@ impl ConnectionHandler for KademliaHandler { type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - if self.config.allow_listening { - SubstreamProtocol::new(self.config.protocol_config.clone(), ()) - .map_upgrade(Either::Left) - } else { - SubstreamProtocol::new(Either::Right(upgrade::DeniedUpgrade), ()) + match self.mode { + Mode::Server => SubstreamProtocol::new(Either::Left(self.protocol_config.clone()), ()), + Mode::Client => SubstreamProtocol::new(Either::Right(upgrade::DeniedUpgrade), ()), } } @@ -680,6 +694,22 @@ impl ConnectionHandler for KademliaHandler { } => { self.answer_pending_request(request_id, KadResponseMsg::PutValue { key, value }); } + KademliaHandlerIn::ReconfigureMode { new_mode } => { + let peer = self.remote_peer_id; + + match &self.endpoint { + ConnectedPoint::Dialer { .. } => { + log::debug!( + "Now operating in {new_mode}-mode on outbound connection with {peer}" + ) + } + ConnectedPoint::Listener { local_addr, .. } => { + log::debug!("Now operating in {new_mode}-mode on inbound connection with {peer} assuming that one of our external addresses routes to {local_addr}") + } + } + + self.mode = new_mode; + } } } @@ -722,7 +752,7 @@ impl ConnectionHandler for KademliaHandler { { self.num_requested_outbound_streams += 1; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.config.protocol_config.clone(), ()), + protocol: SubstreamProtocol::new(self.protocol_config.clone(), ()), }); } @@ -731,7 +761,7 @@ impl ConnectionHandler for KademliaHandler { // No open streams. Preserve the existing idle timeout. (true, k @ KeepAlive::Until(_)) => k, // No open streams. Set idle timeout. - (true, _) => KeepAlive::Until(Instant::now() + self.config.idle_timeout), + (true, _) => KeepAlive::Until(Instant::now() + self.idle_timeout), // Keep alive for open streams. (false, _) => KeepAlive::Yes, }; @@ -760,8 +790,38 @@ impl ConnectionHandler for KademliaHandler { } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) - | ConnectionEvent::LocalProtocolsChange(_) - | ConnectionEvent::RemoteProtocolsChange(_) => {} + | ConnectionEvent::LocalProtocolsChange(_) => {} + ConnectionEvent::RemoteProtocolsChange(change) => { + let dirty = self.remote_supported_protocols.on_protocols_change(change); + + if dirty { + let remote_supports_our_kademlia_protocols = self + .remote_supported_protocols + .iter() + .any(|p| self.protocol_config.protocol_names().contains(p)); + + match (remote_supports_our_kademlia_protocols, self.protocol_status) { + (true, ProtocolStatus::Confirmed | ProtocolStatus::Reported) => {} + (true, _) => { + log::info!( + "Remote {} now supports our kademlia protocol", + self.remote_peer_id + ); + + self.protocol_status = ProtocolStatus::Confirmed; + } + (false, ProtocolStatus::Confirmed | ProtocolStatus::Reported) => { + log::info!( + "Remote {} no longer supports our kademlia protocol", + self.remote_peer_id + ); + + self.protocol_status = ProtocolStatus::NotSupported; + } + (false, _) => {} + } + } + } } } } @@ -781,16 +841,6 @@ impl KademliaHandler { } } -impl Default for KademliaHandlerConfig { - fn default() -> Self { - KademliaHandlerConfig { - protocol_config: Default::default(), - allow_listening: true, - idle_timeout: Duration::from_secs(10), - } - } -} - impl futures::Stream for OutboundSubstreamState { type Item = ConnectionHandlerEvent; diff --git a/protocols/kad/tests/client_mode.rs b/protocols/kad/tests/client_mode.rs new file mode 100644 index 00000000000..b187b6b9685 --- /dev/null +++ b/protocols/kad/tests/client_mode.rs @@ -0,0 +1,133 @@ +use libp2p_identify as identify; +use libp2p_identity as identity; +use libp2p_kad::store::MemoryStore; +use libp2p_kad::{Kademlia, KademliaConfig, KademliaEvent}; +use libp2p_swarm::Swarm; +use libp2p_swarm_test::SwarmExt; + +#[async_std::test] +async fn server_gets_added_to_routing_table_by_client() { + let _ = env_logger::try_init(); + + let mut client = Swarm::new_ephemeral(MyBehaviour::new); + let mut server = Swarm::new_ephemeral(MyBehaviour::new); + + server.listen().await; + client.connect(&mut server).await; + + let server_peer_id = *server.local_peer_id(); + + match libp2p_swarm_test::drive(&mut client, &mut server).await { + ( + [MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(KademliaEvent::RoutingUpdated { peer, .. })], + [MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_)], + ) => { + assert_eq!(peer, server_peer_id) + } + other => panic!("Unexpected events: {other:?}"), + } +} + +#[async_std::test] +async fn two_servers_add_each_other_to_routing_table() { + let _ = env_logger::try_init(); + + let mut server1 = Swarm::new_ephemeral(MyBehaviour::new); + let mut server2 = Swarm::new_ephemeral(MyBehaviour::new); + + server2.listen().await; + server1.connect(&mut server2).await; + + let server1_peer_id = *server1.local_peer_id(); + let server2_peer_id = *server2.local_peer_id(); + + use KademliaEvent::*; + use MyBehaviourEvent::*; + + match libp2p_swarm_test::drive(&mut server1, &mut server2).await { + ( + [Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer1, .. })], + [Identify(_), Identify(_)], + ) => { + assert_eq!(peer1, server2_peer_id); + } + other => panic!("Unexpected events: {other:?}"), + } + + server1.listen().await; + server2.connect(&mut server1).await; + + match libp2p_swarm_test::drive(&mut server2, &mut server1).await { + ( + [Identify(_), Kad(RoutingUpdated { peer: peer2, .. }), Identify(_)], + [Identify(_), Identify(_)], + ) => { + assert_eq!(peer2, server1_peer_id); + } + other => panic!("Unexpected events: {other:?}"), + } +} + +#[async_std::test] +async fn adding_an_external_addresses_activates_server_mode_on_existing_connections() { + let _ = env_logger::try_init(); + + let mut client = Swarm::new_ephemeral(MyBehaviour::new); + let mut server = Swarm::new_ephemeral(MyBehaviour::new); + let server_peer_id = *server.local_peer_id(); + + let (memory_addr, _) = server.listen().await; + + // Remove memory address to simulate a server that doesn't know its external address. + server.remove_external_address(&memory_addr); + client.dial(memory_addr.clone()).unwrap(); + + use MyBehaviourEvent::*; + + // Do the usual identify send/receive dance. + match libp2p_swarm_test::drive(&mut client, &mut server).await { + ([Identify(_), Identify(_)], [Identify(_), Identify(_)]) => {} + other => panic!("Unexpected events: {other:?}"), + } + + use KademliaEvent::*; + + // Server learns its external address (this could be through AutoNAT or some other mechanism). + server.add_external_address(memory_addr); + + // The server reconfigured its connection to the client to be in server mode, pushes that information to client which as a result updates its routing table. + match libp2p_swarm_test::drive(&mut client, &mut server).await { + ( + [Identify(identify::Event::Received { .. }), Kad(RoutingUpdated { peer: peer1, .. })], + [Identify(identify::Event::Pushed { .. })], + ) => { + assert_eq!(peer1, server_peer_id); + } + other => panic!("Unexpected events: {other:?}"), + } +} + +#[derive(libp2p_swarm::NetworkBehaviour)] +#[behaviour(prelude = "libp2p_swarm::derive_prelude")] +struct MyBehaviour { + identify: identify::Behaviour, + kad: Kademlia, +} + +impl MyBehaviour { + fn new(k: identity::Keypair) -> Self { + let local_peer_id = k.public().to_peer_id(); + + Self { + identify: identify::Behaviour::new(identify::Config::new( + "/test/1.0.0".to_owned(), + k.public(), + )), + kad: Kademlia::with_config( + local_peer_id, + MemoryStore::new(local_peer_id), + KademliaConfig::default(), + ), + } + } +} diff --git a/swarm/src/behaviour/external_addresses.rs b/swarm/src/behaviour/external_addresses.rs index 89e06e26608..a3d6e3c0814 100644 --- a/swarm/src/behaviour/external_addresses.rs +++ b/swarm/src/behaviour/external_addresses.rs @@ -1,6 +1,5 @@ use crate::behaviour::{ExternalAddrConfirmed, ExternalAddrExpired, FromSwarm}; use libp2p_core::Multiaddr; -use std::collections::HashSet; /// The maximum number of local external addresses. When reached any /// further externally reported addresses are ignored. The behaviour always @@ -8,19 +7,9 @@ use std::collections::HashSet; const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20; /// Utility struct for tracking the external addresses of a [`Swarm`](crate::Swarm). -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct ExternalAddresses { - addresses: HashSet, - limit: usize, -} - -impl Default for ExternalAddresses { - fn default() -> Self { - Self { - addresses: Default::default(), - limit: MAX_LOCAL_EXTERNAL_ADDRS, - } - } + addresses: Vec, } impl ExternalAddresses { @@ -29,24 +18,64 @@ impl ExternalAddresses { self.addresses.iter() } + pub fn as_slice(&self) -> &[Multiaddr] { + self.addresses.as_slice() + } + /// Feed a [`FromSwarm`] event to this struct. /// /// Returns whether the event changed our set of external addresses. pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool { match event { FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr }) => { - if self.addresses.len() < self.limit { - return self.addresses.insert((*addr).clone()); + if let Some(pos) = self + .addresses + .iter() + .position(|candidate| candidate == *addr) + { + // Refresh the existing confirmed address. + self.addresses.remove(pos); + self.push_front(addr); + + log::debug!("Refreshed external address {addr}"); + + return false; // No changes to our external addresses. + } + + self.push_front(addr); + + if self.addresses.len() > MAX_LOCAL_EXTERNAL_ADDRS { + let expired = self.addresses.pop().expect("list to be not empty"); + + log::debug!("Removing previously confirmed external address {expired} because we reached the limit of {MAX_LOCAL_EXTERNAL_ADDRS} addresses"); } + + return true; } - FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr, .. }) => { - return self.addresses.remove(addr) + FromSwarm::ExternalAddrExpired(ExternalAddrExpired { + addr: expired_addr, .. + }) => { + let pos = match self + .addresses + .iter() + .position(|candidate| candidate == *expired_addr) + { + None => return false, + Some(p) => p, + }; + + self.addresses.remove(pos); + return true; } _ => {} } false } + + fn push_front(&mut self, addr: &Multiaddr) { + self.addresses.insert(0, addr.clone()); // We have at most `MAX_LOCAL_EXTERNAL_ADDRS` so this isn't very expensive. + } } #[cfg(test)] @@ -55,38 +84,100 @@ mod tests { use crate::dummy; use libp2p_core::multiaddr::Protocol; use once_cell::sync::Lazy; + use rand::Rng; #[test] fn new_external_addr_returns_correct_changed_value() { let mut addresses = ExternalAddresses::default(); - let changed = addresses.on_swarm_event(&new_external_addr()); + let changed = addresses.on_swarm_event(&new_external_addr1()); assert!(changed); - let changed = addresses.on_swarm_event(&new_external_addr()); + let changed = addresses.on_swarm_event(&new_external_addr1()); assert!(!changed) } #[test] fn expired_external_addr_returns_correct_changed_value() { let mut addresses = ExternalAddresses::default(); - addresses.on_swarm_event(&new_external_addr()); + addresses.on_swarm_event(&new_external_addr1()); - let changed = addresses.on_swarm_event(&expired_external_addr()); + let changed = addresses.on_swarm_event(&expired_external_addr1()); assert!(changed); - let changed = addresses.on_swarm_event(&expired_external_addr()); + let changed = addresses.on_swarm_event(&expired_external_addr1()); assert!(!changed) } - fn new_external_addr() -> FromSwarm<'static, dummy::ConnectionHandler> { - FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr: &MEMORY_ADDR }) + #[test] + fn more_recent_external_addresses_are_prioritized() { + let mut addresses = ExternalAddresses::default(); + + addresses.on_swarm_event(&new_external_addr1()); + addresses.on_swarm_event(&new_external_addr2()); + + assert_eq!( + addresses.as_slice(), + &[(*MEMORY_ADDR_2000).clone(), (*MEMORY_ADDR_1000).clone()] + ); + } + + #[test] + fn when_pushing_more_than_max_addresses_oldest_is_evicted() { + let mut addresses = ExternalAddresses::default(); + + for _ in 0..MAX_LOCAL_EXTERNAL_ADDRS { + let random_address = + Multiaddr::empty().with(Protocol::Memory(rand::thread_rng().gen_range(0..1000))); + addresses.on_swarm_event( + &FromSwarm::<'_, dummy::ConnectionHandler>::ExternalAddrConfirmed( + ExternalAddrConfirmed { + addr: &random_address, + }, + ), + ); + } + + addresses.on_swarm_event(&new_external_addr2()); + + assert_eq!(addresses.as_slice().len(), 20); + assert_eq!(addresses.as_slice()[0], (*MEMORY_ADDR_2000).clone()); + } + + #[test] + fn reporting_existing_external_address_moves_it_to_the_front() { + let mut addresses = ExternalAddresses::default(); + + addresses.on_swarm_event(&new_external_addr1()); + addresses.on_swarm_event(&new_external_addr2()); + addresses.on_swarm_event(&new_external_addr1()); + + assert_eq!( + addresses.as_slice(), + &[(*MEMORY_ADDR_1000).clone(), (*MEMORY_ADDR_2000).clone()] + ); + } + + fn new_external_addr1() -> FromSwarm<'static, dummy::ConnectionHandler> { + FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { + addr: &MEMORY_ADDR_1000, + }) + } + + fn new_external_addr2() -> FromSwarm<'static, dummy::ConnectionHandler> { + FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { + addr: &MEMORY_ADDR_2000, + }) } - fn expired_external_addr() -> FromSwarm<'static, dummy::ConnectionHandler> { - FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr: &MEMORY_ADDR }) + fn expired_external_addr1() -> FromSwarm<'static, dummy::ConnectionHandler> { + FromSwarm::ExternalAddrExpired(ExternalAddrExpired { + addr: &MEMORY_ADDR_1000, + }) } - static MEMORY_ADDR: Lazy = + static MEMORY_ADDR_1000: Lazy = Lazy::new(|| Multiaddr::empty().with(Protocol::Memory(1000))); + static MEMORY_ADDR_2000: Lazy = + Lazy::new(|| Multiaddr::empty().with(Protocol::Memory(2000))); }