diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 7b8c01ae2f7..acad2043fc8 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.14.0 - unreleased +- Add metrics for `SwarmEvent::{NewExternalAddrCandidate,ExternalAddrConfirmed,ExternalAddrExpired}`. + See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721). ## 0.13.1 diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index 8837457d36a..20d3ce2eff3 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -23,7 +23,7 @@ use std::sync::{Arc, Mutex}; use crate::protocol_stack; use instant::Instant; -use libp2p_swarm::ConnectionId; +use libp2p_swarm::{ConnectionId, SwarmEvent}; use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue}; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; @@ -41,6 +41,10 @@ pub(crate) struct Metrics { new_listen_addr: Family, expired_listen_addr: Family, + external_addr_candidates: Family, + external_addr_confirmed: Family, + external_addr_expired: Family, + listener_closed: Family, listener_error: Counter, @@ -82,6 +86,27 @@ impl Metrics { expired_listen_addr.clone(), ); + let external_addr_candidates = Family::default(); + sub_registry.register( + "external_addr_candidates", + "Number of new external address candidates", + external_addr_candidates.clone(), + ); + + let external_addr_confirmed = Family::default(); + sub_registry.register( + "external_addr_confirmed", + "Number of confirmed external addresses", + external_addr_confirmed.clone(), + ); + + let external_addr_expired = Family::default(); + sub_registry.register( + "external_addr_expired", + "Number of expired external addresses", + external_addr_expired.clone(), + ); + let listener_closed = Family::default(); sub_registry.register( "listener_closed", @@ -146,6 +171,9 @@ impl Metrics { connections_established, new_listen_addr, expired_listen_addr, + external_addr_candidates, + external_addr_confirmed, + external_addr_expired, listener_closed, listener_error, dial_attempt, @@ -296,6 +324,27 @@ impl super::Recorder { self.dial_attempt.inc(); } + SwarmEvent::NewExternalAddrCandidate { address } => { + self.external_addr_candidates + .get_or_create(&AddressLabels { + protocols: protocol_stack::as_string(address), + }) + .inc(); + } + SwarmEvent::ExternalAddrConfirmed { address } => { + self.external_addr_confirmed + .get_or_create(&AddressLabels { + protocols: protocol_stack::as_string(address), + }) + .inc(); + } + SwarmEvent::ExternalAddrExpired { address } => { + self.external_addr_expired + .get_or_create(&AddressLabels { + protocols: protocol_stack::as_string(address), + }) + .inc(); + } } } } diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 93661f1cba5..f43144154a7 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -69,13 +69,8 @@ async fn connect() { src.dial_and_wait(dst_relayed_addr.clone()).await; - loop { - match src - .next_swarm_event() - .await - .try_into_behaviour_event() - .unwrap() - { + while let Ok(event) = src.next_swarm_event().await.try_into_behaviour_event() { + match event { ClientEvent::Dcutr(dcutr::Event::RemoteInitiatedDirectConnectionUpgrade { remote_peer_id, remote_relayed_addr, @@ -215,6 +210,7 @@ async fn wait_for_reservation( addr_observed = true; } SwarmEvent::Behaviour(ClientEvent::Identify(_)) => {} + SwarmEvent::NewExternalAddrCandidate { .. } => {} e => panic!("{e:?}"), } } diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index e5d7dc98ece..960ed530682 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -5,6 +5,10 @@ - Remove deprecated `initial_delay`. Identify requests are always sent instantly after the connection has been established. See [PR 4735](https://github.com/libp2p/rust-libp2p/pull/4735) +- Don't repeatedly report the same observed address as a `NewExternalAddrCandidate`. + Instead, only report each observed address once per connection. + This allows users to probabilistically deem an address as external if it gets reported as a candidate repeatedly. + See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721). ## 0.43.1 diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 631a68d77a2..4f017dd1a9e 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -30,6 +30,7 @@ use libp2p_swarm::{ }; use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent}; use lru::LruCache; +use std::collections::hash_map::Entry; use std::num::NonZeroUsize; use std::{ collections::{HashMap, HashSet, VecDeque}, @@ -48,6 +49,10 @@ pub struct Behaviour { config: Config, /// For each peer we're connected to, the observed address to send back to it. connected: HashMap>, + + /// The address a remote observed for us. + our_observed_addresses: HashMap, + /// Pending events to be emitted when polled. events: VecDeque>, /// The addresses of all peers that we have discovered. @@ -148,6 +153,7 @@ impl Behaviour { Self { config, connected: HashMap::new(), + our_observed_addresses: Default::default(), events: VecDeque::new(), discovered_peers, listen_addresses: Default::default(), @@ -253,7 +259,7 @@ impl NetworkBehaviour for Behaviour { fn on_connection_handler_event( &mut self, peer_id: PeerId, - _: ConnectionId, + id: ConnectionId, event: THandlerOutEvent, ) { match event { @@ -269,8 +275,27 @@ impl NetworkBehaviour for Behaviour { let observed = info.observed_addr.clone(); self.events .push_back(ToSwarm::GenerateEvent(Event::Received { peer_id, info })); - self.events - .push_back(ToSwarm::NewExternalAddrCandidate(observed)); + + match self.our_observed_addresses.entry(id) { + Entry::Vacant(not_yet_observed) => { + not_yet_observed.insert(observed.clone()); + self.events + .push_back(ToSwarm::NewExternalAddrCandidate(observed)); + } + Entry::Occupied(already_observed) if already_observed.get() == &observed => { + // No-op, we already observed this address. + } + Entry::Occupied(mut already_observed) => { + log::info!( + "Our observed address on connection {id} changed from {} to {observed}", + already_observed.get() + ); + + *already_observed.get_mut() = observed.clone(); + self.events + .push_back(ToSwarm::NewExternalAddrCandidate(observed)); + } + } } handler::Event::Identification => { self.events @@ -356,6 +381,8 @@ impl NetworkBehaviour for Behaviour { } else if let Some(addrs) = self.connected.get_mut(&peer_id) { addrs.remove(&connection_id); } + + self.our_observed_addresses.remove(&connection_id); } FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => { if let Some(entry) = peer_id.and_then(|id| self.discovered_peers.get_mut(&id)) { diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index 2dc4ca9d9fd..9a61ccccdd4 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -1,7 +1,9 @@ +use futures::StreamExt; use libp2p_core::multiaddr::Protocol; use libp2p_identify as identify; use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; +use std::collections::HashSet; use std::iter; use std::time::{Duration, Instant}; @@ -79,6 +81,75 @@ async fn periodic_identify() { other => panic!("Unexpected events: {other:?}"), } } +#[async_std::test] +async fn only_emits_address_candidate_once_per_connection() { + let _ = env_logger::try_init(); + + let mut swarm1 = Swarm::new_ephemeral(|identity| { + identify::Behaviour::new( + identify::Config::new("a".to_string(), identity.public()) + .with_agent_version("b".to_string()) + .with_interval(Duration::from_secs(1)), + ) + }); + let mut swarm2 = Swarm::new_ephemeral(|identity| { + identify::Behaviour::new( + identify::Config::new("c".to_string(), identity.public()) + .with_agent_version("d".to_string()), + ) + }); + + swarm2.listen().with_memory_addr_external().await; + swarm1.connect(&mut swarm2).await; + + async_std::task::spawn(swarm2.loop_on_next()); + + let swarm_events = futures::stream::poll_fn(|cx| swarm1.poll_next_unpin(cx)) + .take(5) + .collect::>() + .await; + + let infos = swarm_events + .iter() + .filter_map(|e| match e { + SwarmEvent::Behaviour(identify::Event::Received { info, .. }) => Some(info.clone()), + _ => None, + }) + .collect::>(); + + assert!( + infos.len() > 1, + "should exchange identify payload more than once" + ); + + let varying_observed_addresses = infos + .iter() + .map(|i| i.observed_addr.clone()) + .collect::>(); + assert_eq!( + varying_observed_addresses.len(), + 1, + "Observed address should not vary on persistent connection" + ); + + let external_address_candidates = swarm_events + .iter() + .filter_map(|e| match e { + SwarmEvent::NewExternalAddrCandidate { address } => Some(address.clone()), + _ => None, + }) + .collect::>(); + + assert_eq!( + external_address_candidates.len(), + 1, + "To only have one external address candidate" + ); + assert_eq!( + &external_address_candidates[0], + varying_observed_addresses.iter().next().unwrap() + ); +} #[async_std::test] async fn identify_push() { diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index df85fbdcd49..6e2e9bb1c0c 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -16,6 +16,8 @@ See [PR 4715](https://github.com/libp2p/rust-libp2p/pull/4715). - Log `PeerId` of `Swarm` even when constructed with new `SwarmBuilder`. See [PR 4671](https://github.com/libp2p/rust-libp2p/pull/4671). +- Add `SwarmEvent::{NewExternalAddrCandidate,ExternalAddrConfirmed,ExternalAddrExpired}` variants. + See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721). - Remove deprecated symbols. See [PR 4737](https://github.com/libp2p/rust-libp2p/pull/4737). diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index c89796f8e25..27e62f71831 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -261,15 +261,20 @@ pub enum ToSwarm { event: TInEvent, }, - /// Reports a new candidate for an external address to the [`Swarm`](crate::Swarm). + /// Reports a **new** candidate for an external address to the [`Swarm`](crate::Swarm). /// + /// The emphasis on a **new** candidate is important. + /// Protocols MUST take care to only emit a candidate once per "source". + /// For example, the observed address of a TCP connection does not change throughout its lifetime. + /// Thus, only one candidate should be emitted per connection. + /// + /// This makes the report frequency of an address a meaningful data-point for consumers of this event. /// This address will be shared with all [`NetworkBehaviour`]s via [`FromSwarm::NewExternalAddrCandidate`]. /// /// This address could come from a variety of sources: /// - A protocol such as identify obtained it from a remote. /// - The user provided it based on configuration. /// - We made an educated guess based on one of our listen addresses. - /// - We established a new relay connection. NewExternalAddrCandidate(Multiaddr), /// Indicates to the [`Swarm`](crate::Swarm) that the provided address is confirmed to be externally reachable. diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 908936069e0..228c8281a70 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -142,7 +142,7 @@ use libp2p_core::{ }; use libp2p_identity::PeerId; use smallvec::SmallVec; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize}; use std::time::Duration; use std::{ @@ -294,6 +294,12 @@ pub enum SwarmEvent { /// Identifier of the connection. connection_id: ConnectionId, }, + /// We have discovered a new candidate for an external address for us. + NewExternalAddrCandidate { address: Multiaddr }, + /// An external address of the local node was confirmed. + ExternalAddrConfirmed { address: Multiaddr }, + /// An external address of the local node expired, i.e. is no-longer confirmed. + ExternalAddrExpired { address: Multiaddr }, } impl SwarmEvent { @@ -339,7 +345,9 @@ where /// Pending event to be delivered to connection handlers /// (or dropped if the peer disconnected) before the `behaviour` /// can be polled again. - pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent)>, + pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent)>, + + pending_swarm_events: VecDeque>>, } impl Unpin for Swarm where TBehaviour: NetworkBehaviour {} @@ -366,7 +374,8 @@ where supported_protocols: Default::default(), confirmed_external_addr: Default::default(), listened_addrs: HashMap::new(), - pending_event: None, + pending_handler_event: None, + pending_swarm_events: VecDeque::default(), } } @@ -663,10 +672,7 @@ where &mut self.behaviour } - fn handle_pool_event( - &mut self, - event: PoolEvent>, - ) -> Option>> { + fn handle_pool_event(&mut self, event: PoolEvent>) { match event { PoolEvent::ConnectionEstablished { peer_id, @@ -698,11 +704,14 @@ where }, )); - return Some(SwarmEvent::OutgoingConnectionError { - peer_id: Some(peer_id), - connection_id: id, - error: dial_error, - }); + self.pending_swarm_events.push_back( + SwarmEvent::OutgoingConnectionError { + peer_id: Some(peer_id), + connection_id: id, + error: dial_error, + }, + ); + return; } } } @@ -728,12 +737,15 @@ where }, )); - return Some(SwarmEvent::IncomingConnectionError { - connection_id: id, - send_back_addr, - local_addr, - error: listen_error, - }); + self.pending_swarm_events.push_back( + SwarmEvent::IncomingConnectionError { + connection_id: id, + send_back_addr, + local_addr, + error: listen_error, + }, + ); + return; } } } @@ -783,14 +795,15 @@ where }, )); self.supported_protocols = supported_protocols; - return Some(SwarmEvent::ConnectionEstablished { - peer_id, - connection_id: id, - num_established, - endpoint, - concurrent_dial_errors, - established_in, - }); + self.pending_swarm_events + .push_back(SwarmEvent::ConnectionEstablished { + peer_id, + connection_id: id, + num_established, + endpoint, + concurrent_dial_errors, + established_in, + }); } PoolEvent::PendingOutboundConnectionError { id: connection_id, @@ -812,11 +825,12 @@ where log::debug!("Connection attempt to unknown peer failed with {:?}", error); } - return Some(SwarmEvent::OutgoingConnectionError { - peer_id: peer, - connection_id, - error, - }); + self.pending_swarm_events + .push_back(SwarmEvent::OutgoingConnectionError { + peer_id: peer, + connection_id, + error, + }); } PoolEvent::PendingInboundConnectionError { id, @@ -834,12 +848,13 @@ where error: &error, connection_id: id, })); - return Some(SwarmEvent::IncomingConnectionError { - connection_id: id, - local_addr, - send_back_addr, - error, - }); + self.pending_swarm_events + .push_back(SwarmEvent::IncomingConnectionError { + connection_id: id, + local_addr, + send_back_addr, + error, + }); } PoolEvent::ConnectionClosed { id, @@ -874,13 +889,14 @@ where endpoint: &endpoint, remaining_established: num_established as usize, })); - return Some(SwarmEvent::ConnectionClosed { - peer_id, - connection_id: id, - endpoint, - cause: error, - num_established, - }); + self.pending_swarm_events + .push_back(SwarmEvent::ConnectionClosed { + peer_id, + connection_id: id, + endpoint, + cause: error, + num_established, + }); } PoolEvent::ConnectionEvent { peer_id, id, event } => { self.behaviour @@ -901,8 +917,6 @@ where })); } } - - None } fn handle_transport_event( @@ -911,7 +925,7 @@ where as Transport>::ListenerUpgrade, io::Error, >, - ) -> Option>> { + ) { match event { TransportEvent::Incoming { listener_id: _, @@ -938,12 +952,14 @@ where connection_id, })); - return Some(SwarmEvent::IncomingConnectionError { - connection_id, - local_addr, - send_back_addr, - error: listen_error, - }); + self.pending_swarm_events + .push_back(SwarmEvent::IncomingConnectionError { + connection_id, + local_addr, + send_back_addr, + error: listen_error, + }); + return; } } @@ -956,11 +972,12 @@ where connection_id, ); - Some(SwarmEvent::IncomingConnection { - connection_id, - local_addr, - send_back_addr, - }) + self.pending_swarm_events + .push_back(SwarmEvent::IncomingConnection { + connection_id, + local_addr, + send_back_addr, + }) } TransportEvent::NewAddress { listener_id, @@ -976,10 +993,11 @@ where listener_id, addr: &listen_addr, })); - Some(SwarmEvent::NewListenAddr { - listener_id, - address: listen_addr, - }) + self.pending_swarm_events + .push_back(SwarmEvent::NewListenAddr { + listener_id, + address: listen_addr, + }) } TransportEvent::AddressExpired { listener_id, @@ -998,10 +1016,11 @@ where listener_id, addr: &listen_addr, })); - Some(SwarmEvent::ExpiredListenAddr { - listener_id, - address: listen_addr, - }) + self.pending_swarm_events + .push_back(SwarmEvent::ExpiredListenAddr { + listener_id, + address: listen_addr, + }) } TransportEvent::ListenerClosed { listener_id, @@ -1019,11 +1038,12 @@ where listener_id, reason: reason.as_ref().copied(), })); - Some(SwarmEvent::ListenerClosed { - listener_id, - addresses: addrs.to_vec(), - reason, - }) + self.pending_swarm_events + .push_back(SwarmEvent::ListenerClosed { + listener_id, + addresses: addrs.to_vec(), + reason, + }) } TransportEvent::ListenerError { listener_id, error } => { self.behaviour @@ -1031,7 +1051,8 @@ where listener_id, err: &error, })); - Some(SwarmEvent::ListenerError { listener_id, error }) + self.pending_swarm_events + .push_back(SwarmEvent::ListenerError { listener_id, error }) } } } @@ -1039,14 +1060,17 @@ where fn handle_behaviour_event( &mut self, event: ToSwarm>, - ) -> Option>> { + ) { match event { - ToSwarm::GenerateEvent(event) => return Some(SwarmEvent::Behaviour(event)), + ToSwarm::GenerateEvent(event) => { + self.pending_swarm_events + .push_back(SwarmEvent::Behaviour(event)); + } ToSwarm::Dial { opts } => { let peer_id = opts.get_peer_id(); let connection_id = opts.connection_id(); if let Ok(()) = self.dial(opts) { - return Some(SwarmEvent::Dialing { + self.pending_swarm_events.push_back(SwarmEvent::Dialing { peer_id, connection_id, }); @@ -1064,7 +1088,7 @@ where handler, event, } => { - assert!(self.pending_event.is_none()); + assert!(self.pending_handler_event.is_none()); let handler = match handler { NotifyHandler::One(connection) => PendingNotifyHandler::One(connection), NotifyHandler::Any => { @@ -1076,7 +1100,7 @@ where } }; - self.pending_event = Some((peer_id, handler, event)); + self.pending_handler_event = Some((peer_id, handler, event)); } ToSwarm::NewExternalAddrCandidate(addr) => { // Apply address translation to the candidate address. @@ -1101,20 +1125,28 @@ where .on_swarm_event(FromSwarm::NewExternalAddrCandidate( NewExternalAddrCandidate { addr: &addr }, )); + self.pending_swarm_events + .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr }); } else { for addr in translated_addresses { self.behaviour .on_swarm_event(FromSwarm::NewExternalAddrCandidate( NewExternalAddrCandidate { addr: &addr }, )); + self.pending_swarm_events + .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr }); } } } ToSwarm::ExternalAddrConfirmed(addr) => { - self.add_external_address(addr); + self.add_external_address(addr.clone()); + self.pending_swarm_events + .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr }); } ToSwarm::ExternalAddrExpired(addr) => { self.remove_external_address(&addr); + self.pending_swarm_events + .push_back(SwarmEvent::ExternalAddrExpired { address: addr }); } ToSwarm::CloseConnection { peer_id, @@ -1130,8 +1162,6 @@ where } }, } - - None } /// Internal function used by everything event-related. @@ -1155,7 +1185,11 @@ where // // (2) is polled before (3) to prioritize existing connections over upgrading new incoming connections. loop { - match this.pending_event.take() { + if let Some(swarm_event) = this.pending_swarm_events.pop_front() { + return Poll::Ready(swarm_event); + } + + match this.pending_handler_event.take() { // Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the previous // iteration to the connection handler(s). Some((peer_id, handler, event)) => match handler { @@ -1164,7 +1198,7 @@ where Some(conn) => match notify_one(conn, event, cx) { None => continue, Some(event) => { - this.pending_event = Some((peer_id, handler, event)); + this.pending_handler_event = Some((peer_id, handler, event)); } }, None => continue, @@ -1175,7 +1209,7 @@ where None => continue, Some((event, ids)) => { let handler = PendingNotifyHandler::Any(ids); - this.pending_event = Some((peer_id, handler, event)); + this.pending_handler_event = Some((peer_id, handler, event)); } } } @@ -1184,9 +1218,7 @@ where None => match this.behaviour.poll(cx) { Poll::Pending => {} Poll::Ready(behaviour_event) => { - if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event) { - return Poll::Ready(swarm_event); - } + this.handle_behaviour_event(behaviour_event); continue; } @@ -1197,10 +1229,7 @@ where match this.pool.poll(cx) { Poll::Pending => {} Poll::Ready(pool_event) => { - if let Some(swarm_event) = this.handle_pool_event(pool_event) { - return Poll::Ready(swarm_event); - } - + this.handle_pool_event(pool_event); continue; } }; @@ -1209,10 +1238,7 @@ where match Pin::new(&mut this.transport).poll(cx) { Poll::Pending => {} Poll::Ready(transport_event) => { - if let Some(swarm_event) = this.handle_transport_event(transport_event) { - return Poll::Ready(swarm_event); - } - + this.handle_transport_event(transport_event); continue; } }