Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(identify): only report observed address once per connection #4721

Merged
merged 21 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
222415a
Add `SwarmEvent` variants
thomaseizinger Oct 25, 2023
339b602
Emit events first
thomaseizinger Oct 25, 2023
cd92081
Push all pending events in queue
thomaseizinger Oct 25, 2023
04de723
Emit events for candidates and external addresses
thomaseizinger Oct 25, 2023
18cfa8c
Add failing test
thomaseizinger Oct 25, 2023
6c599e7
Only report observed address once
thomaseizinger Oct 25, 2023
8a9246a
Add changeloge entry to metrics
thomaseizinger Oct 25, 2023
65ad746
Support roaming
thomaseizinger Oct 25, 2023
540cc1b
Merge branch 'master' into fix/identify-no-repeated-reporting
thomaseizinger Oct 25, 2023
4683a14
Update changelog
thomaseizinger Oct 25, 2023
700b560
Update protocols/identify/src/behaviour.rs
thomaseizinger Oct 25, 2023
8f05f7e
Merge branch 'master' into fix/identify-no-repeated-reporting
thomaseizinger Oct 27, 2023
d896e9c
Merge branch 'master' into fix/identify-no-repeated-reporting
thomaseizinger Oct 27, 2023
c64590b
Allow new event in dcutr test
thomaseizinger Oct 27, 2023
f63cad7
Merge branch 'master' into fix/identify-no-repeated-reporting
thomaseizinger Oct 27, 2023
59b51b1
Apply suggestions from code review
thomaseizinger Oct 30, 2023
7be13b5
Merge branch 'master' into fix/identify-no-repeated-reporting
thomaseizinger Oct 30, 2023
0b1f7a0
Fix test
thomaseizinger Oct 30, 2023
733dfb8
Update docs
thomaseizinger Oct 30, 2023
436d9ff
Merge branch 'master' into fix/identify-no-repeated-reporting
thomaseizinger Oct 31, 2023
c951045
Merge branch 'master' into fix/identify-no-repeated-reporting
mergify[bot] Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## 0.14.0 - unreleased

- Add metrics for `SwarmEvent::{NewExternalAddrCandidate,ExternalAddrConfirmed,ExternalAddrExpired}`.
See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721).
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

## 0.13.1

Expand Down
51 changes: 50 additions & 1 deletion misc/metrics/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::{Arc, Mutex};

use crate::protocol_stack;
use instant::Instant;
use libp2p_swarm::ConnectionId;
use libp2p_swarm::{ConnectionId, SwarmEvent};
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
Expand All @@ -41,6 +41,10 @@ pub(crate) struct Metrics {
new_listen_addr: Family<AddressLabels, Counter>,
expired_listen_addr: Family<AddressLabels, Counter>,

external_addr_candidates: Family<AddressLabels, Counter>,
external_addr_confirmed: Family<AddressLabels, Counter>,
external_addr_expired: Family<AddressLabels, Counter>,

listener_closed: Family<AddressLabels, Counter>,
listener_error: Counter,

Expand Down Expand Up @@ -82,6 +86,27 @@ impl Metrics {
expired_listen_addr.clone(),
);

let external_addr_candidates = Family::default();
sub_registry.register(
"external_addr_candidates",
"Number of new external address candidates",
external_addr_candidates.clone(),
);

let external_addr_confirmed = Family::default();
sub_registry.register(
"external_addr_confirmed",
"Number of confirmed external addresses",
external_addr_confirmed.clone(),
);

let external_addr_expired = Family::default();
sub_registry.register(
"external_addr_expired",
"Number of expired external addresses",
external_addr_expired.clone(),
);

let listener_closed = Family::default();
sub_registry.register(
"listener_closed",
Expand Down Expand Up @@ -146,6 +171,9 @@ impl Metrics {
connections_established,
new_listen_addr,
expired_listen_addr,
external_addr_candidates,
external_addr_confirmed,
external_addr_expired,
listener_closed,
listener_error,
dial_attempt,
Expand Down Expand Up @@ -296,6 +324,27 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
libp2p_swarm::SwarmEvent::Dialing { .. } => {
self.dial_attempt.inc();
}
SwarmEvent::NewExternalAddrCandidate { address } => {
self.external_addr_candidates
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(address),
})
.inc();
}
SwarmEvent::ExternalAddrConfirmed { address } => {
self.external_addr_confirmed
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(address),
})
.inc();
}
SwarmEvent::ExternalAddrExpired { address } => {
self.external_addr_expired
.get_or_create(&AddressLabels {
protocols: protocol_stack::as_string(address),
})
.inc();
}
}
}
}
Expand Down
10 changes: 3 additions & 7 deletions protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,8 @@ async fn connect() {

src.dial_and_wait(dst_relayed_addr.clone()).await;

loop {
match src
.next_swarm_event()
.await
.try_into_behaviour_event()
.unwrap()
{
while let Ok(event) = src.next_swarm_event().await.try_into_behaviour_event() {
match event {
ClientEvent::Dcutr(dcutr::Event::RemoteInitiatedDirectConnectionUpgrade {
remote_peer_id,
remote_relayed_addr,
Expand Down Expand Up @@ -215,6 +210,7 @@ async fn wait_for_reservation(
addr_observed = true;
}
SwarmEvent::Behaviour(ClientEvent::Identify(_)) => {}
SwarmEvent::NewExternalAddrCandidate { .. } => {}
e => panic!("{e:?}"),
}
}
Expand Down
4 changes: 4 additions & 0 deletions protocols/identify/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
- Remove deprecated `initial_delay`.
Identify requests are always sent instantly after the connection has been established.
See [PR 4735](https://github.com/libp2p/rust-libp2p/pull/4735)
- Don't repeatedly report the same observed address as a `NewExternalAddrCandidate`.
Instead, only report each observed address once per connection.
This allows users to probabilistically deem an address as external if it gets reported as a candidate repeatedly.
See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721).

## 0.43.1

Expand Down
33 changes: 30 additions & 3 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use libp2p_swarm::{
};
use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
use lru::LruCache;
use std::collections::hash_map::Entry;
use std::num::NonZeroUsize;
use std::{
collections::{HashMap, HashSet, VecDeque},
Expand All @@ -48,6 +49,10 @@ pub struct Behaviour {
config: Config,
/// For each peer we're connected to, the observed address to send back to it.
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,

/// The address a remote observed for us.
our_observed_addresses: HashMap<ConnectionId, Multiaddr>,

/// Pending events to be emitted when polled.
events: VecDeque<ToSwarm<Event, InEvent>>,
/// The addresses of all peers that we have discovered.
Expand Down Expand Up @@ -148,6 +153,7 @@ impl Behaviour {
Self {
config,
connected: HashMap::new(),
our_observed_addresses: Default::default(),
events: VecDeque::new(),
discovered_peers,
listen_addresses: Default::default(),
Expand Down Expand Up @@ -253,7 +259,7 @@ impl NetworkBehaviour for Behaviour {
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
_: ConnectionId,
id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
match event {
Expand All @@ -269,8 +275,27 @@ impl NetworkBehaviour for Behaviour {
let observed = info.observed_addr.clone();
self.events
.push_back(ToSwarm::GenerateEvent(Event::Received { peer_id, info }));
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed));

match self.our_observed_addresses.entry(id) {
Entry::Vacant(not_yet_observed) => {
not_yet_observed.insert(observed.clone());
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed));
}
Entry::Occupied(already_observed) if already_observed.get() == &observed => {
// No-op, we already observed this address.
}
Entry::Occupied(mut already_observed) => {
log::info!(
"Our observed address on connection {id} changed from {} to {observed}",
already_observed.get()
);

*already_observed.get_mut() = observed.clone();
self.events
.push_back(ToSwarm::NewExternalAddrCandidate(observed));
}
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
}
handler::Event::Identification => {
self.events
Expand Down Expand Up @@ -356,6 +381,8 @@ impl NetworkBehaviour for Behaviour {
} else if let Some(addrs) = self.connected.get_mut(&peer_id) {
addrs.remove(&connection_id);
}

self.our_observed_addresses.remove(&connection_id);
}
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let Some(entry) = peer_id.and_then(|id| self.discovered_peers.get_mut(&id)) {
Expand Down
71 changes: 71 additions & 0 deletions protocols/identify/tests/smoke.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use futures::StreamExt;
use libp2p_core::multiaddr::Protocol;
use libp2p_identify as identify;
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_swarm_test::SwarmExt;
use std::collections::HashSet;
use std::iter;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -79,6 +81,75 @@ async fn periodic_identify() {
other => panic!("Unexpected events: {other:?}"),
}
}
#[async_std::test]
async fn only_emits_address_candidate_once_per_connection() {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
let _ = env_logger::try_init();

let mut swarm1 = Swarm::new_ephemeral(|identity| {
identify::Behaviour::new(
identify::Config::new("a".to_string(), identity.public())
.with_agent_version("b".to_string())
.with_interval(Duration::from_secs(1)),
)
});
let mut swarm2 = Swarm::new_ephemeral(|identity| {
identify::Behaviour::new(
identify::Config::new("c".to_string(), identity.public())
.with_agent_version("d".to_string()),
)
});

swarm2.listen().with_memory_addr_external().await;
swarm1.connect(&mut swarm2).await;

async_std::task::spawn(swarm2.loop_on_next());

let swarm_events = futures::stream::poll_fn(|cx| swarm1.poll_next_unpin(cx))
.take(5)
.collect::<Vec<_>>()
.await;

let infos = swarm_events
.iter()
.filter_map(|e| match e {
SwarmEvent::Behaviour(identify::Event::Received { info, .. }) => Some(info.clone()),
_ => None,
})
.collect::<Vec<_>>();

assert!(
infos.len() > 1,
"should exchange identify payload more than once"
);

let varying_observed_addresses = infos
.iter()
.map(|i| i.observed_addr.clone())
.collect::<HashSet<_>>();
assert_eq!(
varying_observed_addresses.len(),
1,
"Observed address should not vary on persistent connection"
);

let external_address_candidates = swarm_events
.iter()
.filter_map(|e| match e {
SwarmEvent::NewExternalAddrCandidate { address } => Some(address.clone()),
_ => None,
})
.collect::<Vec<_>>();

assert_eq!(
external_address_candidates.len(),
1,
"To only have one external address candidate"
);
assert_eq!(
&external_address_candidates[0],
varying_observed_addresses.iter().next().unwrap()
);
}

#[async_std::test]
async fn identify_push() {
Expand Down
2 changes: 2 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
See [PR 4715](https://github.com/libp2p/rust-libp2p/pull/4715).
- Log `PeerId` of `Swarm` even when constructed with new `SwarmBuilder`.
See [PR 4671](https://github.com/libp2p/rust-libp2p/pull/4671).
- Add `SwarmEvent::{NewExternalAddrCandidate,ExternalAddrConfirmed,ExternalAddrExpired}` variants.
See [PR 4721](https://github.com/libp2p/rust-libp2p/pull/4721).
- Remove deprecated symbols.
See [PR 4737](https://github.com/libp2p/rust-libp2p/pull/4737).

Expand Down
9 changes: 7 additions & 2 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,20 @@ pub enum ToSwarm<TOutEvent, TInEvent> {
event: TInEvent,
},

/// Reports a new candidate for an external address to the [`Swarm`](crate::Swarm).
/// Reports a **new** candidate for an external address to the [`Swarm`](crate::Swarm).
///
/// The emphasis on a **new** candidate is important.
/// Protocols MUST take care to only emit a candidate once per "source".
/// For example, the observed address of a TCP connection does not change throughout its lifetime.
/// Thus, only one candidate should be emitted per connection.
///
/// This makes the report frequency of an address a meaningful data-point for consumers of this event.
/// This address will be shared with all [`NetworkBehaviour`]s via [`FromSwarm::NewExternalAddrCandidate`].
///
/// This address could come from a variety of sources:
/// - A protocol such as identify obtained it from a remote.
/// - The user provided it based on configuration.
/// - We made an educated guess based on one of our listen addresses.
/// - We established a new relay connection.
NewExternalAddrCandidate(Multiaddr),

/// Indicates to the [`Swarm`](crate::Swarm) that the provided address is confirmed to be externally reachable.
Expand Down
Loading