From ea48b129ceb368568fabc64e74c2b079dfbf3a10 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 19 Oct 2023 12:58:51 +0200 Subject: [PATCH 1/6] refactor: properly abstract BestAddr --- iroh-net/src/magicsock/endpoint.rs | 299 ++++++------------- iroh-net/src/magicsock/endpoint/best_addr.rs | 202 +++++++++++++ 2 files changed, 295 insertions(+), 206 deletions(-) create mode 100644 iroh-net/src/magicsock/endpoint/best_addr.rs diff --git a/iroh-net/src/magicsock/endpoint.rs b/iroh-net/src/magicsock/endpoint.rs index 9d15fafb47..414898d750 100644 --- a/iroh-net/src/magicsock/endpoint.rs +++ b/iroh-net/src/magicsock/endpoint.rs @@ -23,6 +23,9 @@ use crate::{ use super::{metrics::Metrics as MagicsockMetrics, ActorMessage, QuicMappedAddr, SendAddr}; +mod best_addr; +use best_addr::{BestAddr, BestAddrClearReason, BestAddrSource, BestAddrState}; + /// How long we wait for a pong reply before assuming it's never coming. const PING_TIMEOUT_DURATION: Duration = Duration::from_secs(5); @@ -86,11 +89,7 @@ pub(super) struct Endpoint { /// The fallback/bootstrap path, if non-zero (non-zero for well-behaved clients). derp_region: Option<(u16, EndpointState)>, /// Best non-DERP path. - best_addr: Option, - /// Time best address re-confirmed. - best_addr_at: Option, - /// Time when best_addr expires. - trust_best_addr_until: Option, + best_addr: BestAddr, /// [`EndpointState`] for this peer's direct addresses. direct_addr_state: HashMap, is_call_me_maybe_ep: HashMap, @@ -136,9 +135,7 @@ impl Endpoint { derp_region: options .derp_region .map(|region| (region, EndpointState::default())), - best_addr: None, - best_addr_at: None, - trust_best_addr_until: None, + best_addr: Default::default(), sent_ping: HashMap::new(), direct_addr_state: HashMap::new(), is_call_me_maybe_ep: HashMap::new(), @@ -153,8 +150,9 @@ impl Endpoint { /// Returns info about this endpoint pub fn info(&self) -> EndpointInfo { - let (conn_type, latency) = if self.is_best_addr_valid(Instant::now()) { - let addr_info = self.best_addr.as_ref().expect("checked"); + // TODO: I think we should return direct here for outdated. + let (conn_type, latency) = if self.best_addr.is_valid(Instant::now()) { + let addr_info = self.best_addr.addr_info().expect("checked"); (ConnectionType::Direct(addr_info.addr), addr_info.latency) } else if let Some((region_id, relay_state)) = self.derp_region.as_ref() { let latency = relay_state.recent_pong().map(|pong| pong.latency); @@ -203,36 +201,40 @@ impl Endpoint { debug!("in `DEV_DERP_ONLY` mode, giving the DERP address as the only viable address for this endpoint"); return (None, self.derp_region(), false); } - match self.best_addr { - Some(ref best_addr) => { - if !self.is_best_addr_valid(*now) { - // We had a best_addr but it expired so send both to it and DERP. - trace!(addr = %best_addr.addr, latency = ?best_addr.latency, "best_addr is set but outdated, use best_addr and derp"); - (Some(best_addr.addr), self.derp_region(), true) - } else { - trace!(addr = %best_addr.addr, latency = ?best_addr.latency, "best_addr is set and valid, use best_addr only"); - // Address is current and can be used - (Some(best_addr.addr), None, false) - } + // Update our best addr from candidate addresses (only if it is empty and if we have recent + // pongs). + self.update_best_addr_from_candidates(); + match self.best_addr.state(*now) { + // we have a valid address: use it! + BestAddrState::Valid(best_addr) => { + trace!(addr = %best_addr.addr, latency = ?best_addr.latency, "best_addr is set and valid, use best_addr only"); + (Some(best_addr.addr), None, false) } - None => { - let (addr, should_ping) = self.get_candidate_udp_addr(); - - // Provide backup derp region if no known latency or no addr. - let derp_region = if should_ping || addr.is_none() { - self.derp_region() - } else { - None - }; - - trace!(udp_addr = ?addr, derp_region = ?derp_region, ?should_ping, "best_addr is unset, use candidate addr and derp"); - (addr, derp_region, should_ping) + // we have an outdated address: use it, but use derp as well. + BestAddrState::Outdated(best_addr) => { + trace!(addr = %best_addr.addr, latency = ?best_addr.latency, "best_addr is set but outdated, use best_addr and derp"); + (Some(best_addr.addr), self.derp_region(), true) + } + // we have no best address: use a random canidate if available, and derp as backup. + BestAddrState::Empty => { + let addr = self + .direct_addr_state + .keys() + .choose_stable(&mut rand::thread_rng()) + .map(|ipp| SocketAddr::from(*ipp)); + trace!(udp_addr = ?addr, "best_addr is unset, use candidate addr and derp"); + (addr, self.derp_region(), addr.is_some()) } } } - /// Determines a potential best addr for this endpoint. And if the endpoint needs a ping. - fn get_candidate_udp_addr(&mut self) -> (Option, bool) { + /// Update our best_addr (if empty) with the candidate udp addr with the lowest latency. + /// + /// Returns true if the address was updated. + fn update_best_addr_from_candidates(&mut self) -> bool { + if !self.best_addr.is_empty() { + return false; + } let mut lowest_latency = Duration::from_secs(60 * 60); let mut last_pong = None; for (ipp, state) in self.direct_addr_state.iter() { @@ -249,45 +251,17 @@ impl Endpoint { // If we found a candidate, set to best addr if let Some(pong) = last_pong { - if self.best_addr.is_none() { - // we now have a direct connection, adjust direct connection count - inc!(MagicsockMetrics, num_direct_conns_added); - if self.derp_region.is_some() { - // we no longer rely on the relay connection, decrease the relay connection - // count - inc!(MagicsockMetrics, num_relay_conns_removed); - } - } - - let addr = pong.from.as_socket_addr(); - let trust_best_addr_until = pong.pong_at + Duration::from_secs(60 * 60); - - info!( - %addr, - latency = ?lowest_latency, - trust_for = ?trust_best_addr_until.duration_since(Instant::now()), - "new best_addr (candidate address with most recent pong)" + self.best_addr.insert( + pong.from.as_socket_addr(), + Some(lowest_latency), + BestAddrSource::BestCandidate, + pong.pong_at, + self.derp_region.is_some(), ); - - self.best_addr = Some(AddrLatency { - addr, - latency: Some(lowest_latency), - }); - self.trust_best_addr_until.replace(trust_best_addr_until); - - // No need to ping, we already have a latency. - return (Some(pong.from.as_socket_addr()), false); + true + } else { + false } - - // Randomly select an address to use until we retrieve latency information. - let udp_addr = self - .direct_addr_state - .keys() - .choose_stable(&mut rand::thread_rng()) - .copied() - .map(SocketAddr::from); - - (udp_addr, udp_addr.is_some()) } /// Reports whether we should ping to all our direct addresses looking for a better path. @@ -297,15 +271,14 @@ impl Endpoint { debug!("full ping: no full ping done"); return true; } - if !self.is_best_addr_valid(*now) { + if !self.best_addr.is_valid(*now) { debug!("full ping: best addr expired"); return true; } if self .best_addr - .as_ref() - .and_then(|addr| addr.latency) + .latency() .map(|l| l > GOOD_ENOUGH_LATENCY) .unwrap_or(true) && *now - *self.last_full_ping.as_ref().unwrap() >= UPGRADE_INTERVAL @@ -313,9 +286,7 @@ impl Endpoint { debug!( "full ping: full ping interval expired and latency is only {}ms", self.best_addr - .as_ref() - .unwrap() - .latency + .latency() .map(|l| l.as_millis()) .unwrap_or_default() ); @@ -347,7 +318,7 @@ impl Endpoint { } } if let Some(udp_addr) = udp_addr { - if self.is_best_addr_valid(now) { + if self.best_addr.is_valid(now) { // Already have an active session, so just ping the address we're using. // Otherwise "tailscale ping" results to a node on the local network // can look like they're bouncing between, say 10.0.0.0/9 and the peer's @@ -391,6 +362,13 @@ impl Endpoint { if let Some(ep_state) = self.direct_addr_state.get_mut(&addr.into()) { ep_state.last_ping = None; } + + // If we fail to ping our current best addr, it is not that good anymore. + self.best_addr.clear_if_equals( + addr, + BestAddrClearReason::PongTimeout, + self.derp_region.is_some(), + ); } SendAddr::Derp(region) => { if let Some((home_derp, relay_state)) = self.derp_region.as_mut() { @@ -401,21 +379,6 @@ impl Endpoint { } } } - - // If we fail to ping our current best addr, it is not that good anymore. - if let Some(ref addr) = self.best_addr { - if sp.to == addr.addr { - // we had a direct connection that is no longer valid - inc!(MagicsockMetrics, num_direct_conns_removed); - if self.derp_region.is_some() { - // we can only connect through a relay connection - inc!(MagicsockMetrics, num_relay_conns_added); - } - debug!(addr = %sp.to, tx = %hex::encode(txid), "drop best_addr (no pong received in timeout)"); - self.best_addr = None; - self.trust_best_addr_until = None; - } - } } } @@ -565,7 +528,7 @@ impl Endpoint { } pub fn update_from_node_addr(&mut self, n: &AddrInfo) { - if self.best_addr.is_none() { + if self.best_addr.is_empty() { // we do not have a direct connection, so changing the derp information may // have an effect on our connection status if self.derp_region.is_none() && n.derp_region.is_some() { @@ -598,19 +561,9 @@ impl Endpoint { /// Clears all the endpoint's p2p state, reverting it to a DERP-only endpoint. #[instrument(skip_all, fields(peer = %self.public_key.fmt_short()))] fn reset(&mut self) { - if self.best_addr.is_some() { - // we no longer rely on a direct connection - inc!(MagicsockMetrics, num_relay_conns_removed); - if self.derp_region.is_some() { - // we are now relying on a relay connection - inc!(MagicsockMetrics, num_direct_conns_added); - } - } - warn!("drop best_addr (reset state)"); self.last_full_ping = None; - self.best_addr = None; - self.best_addr_at = None; - self.trust_best_addr_until = None; + self.best_addr + .clear(BestAddrClearReason::Reset, self.derp_region.is_some()); for es in self.direct_addr_state.values_mut() { es.last_ping = None; @@ -712,18 +665,11 @@ impl Endpoint { None => trace!(%peer, %ip_port, last_seen=%"never", "pruning address"), } - if let Some(addr_and_latency) = self.best_addr.as_ref() { - if addr_and_latency.addr == ip_port.into() { - warn!(addr = %addr_and_latency.addr, "drop best_addr (prune for inactivity)"); - self.best_addr = None; - // no longer relying on a direct connection, remove conn count - inc!(MagicsockMetrics, num_direct_conns_removed); - if self.derp_region.is_some() { - // we now rely on a relay connection, add a relay count - inc!(MagicsockMetrics, num_relay_conns_added); - } - } - } + self.best_addr.clear_if_equals( + ip_port.into(), + BestAddrClearReason::Inactive, + self.derp_region.is_some(), + ); } } @@ -732,7 +678,7 @@ impl Endpoint { #[instrument("disco", skip_all, fields(peer = %self.public_key.fmt_short()))] pub(super) fn note_connectivity_change(&mut self) { trace!("connectivity changed"); - self.trust_best_addr_until = None; + self.best_addr.clear_trust(); } /// Handles a Pong message (a reply to an earlier ping). @@ -842,33 +788,13 @@ impl Endpoint { // TODO(bradfitz): decide how latency vs. preference order affects decision if let SendAddr::Udp(to) = sp.to { debug_assert!(!is_derp, "missmatching derp & udp"); - let this_pong = AddrLatency { - addr: to, - latency: Some(latency), - }; - let is_better = self.best_addr.is_none() - || this_pong.is_better_than(self.best_addr.as_ref().unwrap()); - - if is_better { - if self.best_addr.is_none() { - // we now have direct connection! - inc!(MagicsockMetrics, num_direct_conns_added); - if self.derp_region.is_some() { - // no long relying on a relay connection, remove a relay conn - inc!(MagicsockMetrics, num_relay_conns_removed); - } - } - info!(addr = %sp.to, "new best_addr (from pong)"); - self.best_addr.replace(this_pong.clone()); - } - let best_addr = self.best_addr.as_mut().expect("just set"); - if best_addr.addr == this_pong.addr { - trace!(addr = %best_addr.addr, trust_for = ?TRUST_UDP_ADDR_DURATION, "best_addr: update trust time"); - best_addr.latency.replace(latency); - self.best_addr_at.replace(now); - self.trust_best_addr_until - .replace(now + TRUST_UDP_ADDR_DURATION); - } + self.best_addr.insert_if_better_or_reconfirm( + to, + Some(latency), + BestAddrSource::ReceivedPong, + now, + self.derp_region.is_some(), + ); } peer_map_insert @@ -920,22 +846,11 @@ impl Endpoint { // Delete any prior CallMeMaybe endpoints that weren't included in this message. self.is_call_me_maybe_ep.retain(|ep, want| { if !*want { - if Some(ep) - == self - .best_addr - .as_ref() - .map(|addr_and_latency| addr_and_latency.addr) - .as_ref() - { - warn!("drop best_addr (received call-me-maybe)"); - self.best_addr = None; - // no longer relying on the direct connection - inc!(MagicsockMetrics, num_direct_conns_removed); - if self.derp_region.is_some() { - // we are now relying on the relay connection, add a relay conn - inc!(MagicsockMetrics, num_relay_conns_added); - } - } + self.best_addr.clear_if_equals( + *ep, + BestAddrClearReason::PruneCallMeMaybe, + self.derp_region.is_some(), + ); false } else { true @@ -999,7 +914,7 @@ impl Endpoint { } // Send heartbeat ping to keep the current addr going as long as we need it. - let udp_addr = self.best_addr.as_ref().map(|a| a.addr); + let udp_addr = self.best_addr.addr(); if let Some(udp_addr) = udp_addr { let elapsed = self.last_ping(&SendAddr::Udp(udp_addr)).map(|l| now - l); // Send a ping if the last ping is older than 2 seconds. @@ -1047,30 +962,6 @@ impl Endpoint { (udp_addr, derp_region, msgs) } - fn is_best_addr_valid(&self, now: Instant) -> bool { - match &self.best_addr { - None => { - trace!("best_addr invalid: not set"); - false - } - Some(addr) => match self.trust_best_addr_until { - Some(expiry) => { - if now < expiry { - trace!(addr = %addr.addr, remaining=?expiry.duration_since(now), "best_addr valid"); - true - } else { - trace!(addr = %addr.addr, since=?expiry.duration_since(now), "best_addr invalid: expired"); - false - } - } - None => { - trace!(addr = %addr.addr, "best_addr invalid: trust_best_addr_until not set"); - false - } - }, - } - } - /// Get the direct addresses for this endpoint. pub fn direct_addresses(&self) -> impl Iterator + '_ { self.direct_addr_state.keys().copied() @@ -1734,12 +1625,12 @@ mod tests { public_key: key.public(), last_full_ping: None, derp_region: new_relay_and_state(Some(0)), - best_addr: Some(AddrLatency { - addr: ip_port.into(), - latency: Some(latency), - }), - best_addr_at: Some(now), - trust_best_addr_until: now.checked_add(Duration::from_secs(100)), + best_addr: BestAddr::from_parts( + ip_port.into(), + Some(latency), + now, + now + Duration::from_secs(100), + ), direct_addr_state: endpoint_state, is_call_me_maybe_ep: HashMap::new(), pending_cli_pings: Vec::new(), @@ -1769,9 +1660,7 @@ mod tests { public_key: key.public(), last_full_ping: None, derp_region: Some((0, relay_state)), - best_addr: None, - best_addr_at: None, - trust_best_addr_until: now.checked_sub(Duration::from_secs(100)), + best_addr: BestAddr::default(), direct_addr_state: HashMap::default(), is_call_me_maybe_ep: HashMap::new(), pending_cli_pings: Vec::new(), @@ -1792,9 +1681,7 @@ mod tests { public_key: key.public(), last_full_ping: None, derp_region: new_relay_and_state(Some(0)), - best_addr: None, - best_addr_at: None, - trust_best_addr_until: now.checked_sub(Duration::from_secs(100)), + best_addr: BestAddr::default(), direct_addr_state: endpoint_state, is_call_me_maybe_ep: HashMap::new(), pending_cli_pings: Vec::new(), @@ -1837,12 +1724,12 @@ mod tests { public_key: key.public(), last_full_ping: None, derp_region: Some((0, relay_state)), - best_addr: Some(AddrLatency { - addr: socket_addr, - latency: Some(Duration::from_millis(80)), - }), - best_addr_at: Some(now), - trust_best_addr_until: Some(expired), + best_addr: BestAddr::from_parts( + socket_addr, + Some(Duration::from_millis(80)), + now, + expired, + ), direct_addr_state: endpoint_state, is_call_me_maybe_ep: HashMap::new(), pending_cli_pings: Vec::new(), diff --git a/iroh-net/src/magicsock/endpoint/best_addr.rs b/iroh-net/src/magicsock/endpoint/best_addr.rs new file mode 100644 index 0000000000..58074847fa --- /dev/null +++ b/iroh-net/src/magicsock/endpoint/best_addr.rs @@ -0,0 +1,202 @@ +//! The [`BestAddr`] is the currently active best address for UDP sends. + +use std::{ + net::SocketAddr, + time::{Duration, Instant}, +}; + +use iroh_metrics::inc; +use tracing::{debug, info, trace}; + +use super::{AddrLatency, TRUST_UDP_ADDR_DURATION}; +use crate::magicsock::metrics::Metrics as MagicsockMetrics; + +#[derive(Debug, Default)] +pub(super) struct BestAddr(Option); + +#[derive(Debug)] +struct BestAddrInner { + addr: AddrLatency, + trust_until: Option, + confirmed_at: Instant, +} + +#[derive(Debug)] +pub(super) enum BestAddrSource { + ReceivedPong, + BestCandidate, +} + +impl BestAddrSource { + fn trust_until(&self, from: Instant) -> Instant { + match self { + BestAddrSource::ReceivedPong => from + TRUST_UDP_ADDR_DURATION, + // TODO: Fix time + BestAddrSource::BestCandidate => from + Duration::from_secs(60 * 60), + } + } +} + +#[derive(Debug)] +pub(super) enum BestAddrState<'a> { + Valid(&'a AddrLatency), + Outdated(&'a AddrLatency), + Empty, +} + +#[derive(Debug)] +pub enum BestAddrClearReason { + Reset, + Inactive, + PruneCallMeMaybe, + PongTimeout, +} + +impl BestAddr { + #[cfg(test)] + pub fn from_parts( + addr: SocketAddr, + latency: Option, + confirmed_at: Instant, + trust_until: Instant, + ) -> Self { + let inner = BestAddrInner { + addr: AddrLatency { addr, latency }, + confirmed_at, + trust_until: Some(trust_until), + }; + Self(Some(inner)) + } + pub fn is_empty(&self) -> bool { + self.0.is_none() + } + + pub fn is_valid(&self, now: Instant) -> bool { + matches!(self.state(now), BestAddrState::Valid(_)) + } + + pub fn clear(&mut self, reason: BestAddrClearReason, has_derp: bool) -> bool { + if let Some(addr) = self.addr() { + self.0 = None; + debug!(?reason, ?has_derp, old_addr = %addr, "remove best_addr"); + // no longer relying on the direct connection + inc!(MagicsockMetrics, num_direct_conns_removed); + if has_derp { + // we are now relying on the relay connection, add a relay conn + inc!(MagicsockMetrics, num_relay_conns_added); + } + true + } else { + false + } + } + + pub fn clear_if_equals( + &mut self, + addr: SocketAddr, + reason: BestAddrClearReason, + has_derp: bool, + ) -> bool { + match &self.addr() { + Some(best_addr) if *best_addr == addr => self.clear(reason, has_derp), + _ => false, + } + } + + pub fn clear_trust(&mut self) { + if let Some(state) = self.0.as_mut() { + state.trust_until = None; + } + } + + pub fn insert_if_better_or_reconfirm( + &mut self, + addr: SocketAddr, + latency: Option, + source: BestAddrSource, + confirmed_at: Instant, + has_derp: bool, + ) { + match self.0.as_mut() { + None => self.insert(addr, latency, source, confirmed_at, has_derp), + Some(state) => { + let candidate = AddrLatency { addr, latency }; + if candidate.is_better_than(&state.addr) { + self.insert(addr, latency, source, confirmed_at, has_derp) + } else if state.addr.addr == addr { + state.confirmed_at = confirmed_at; + state.trust_until = Some(source.trust_until(confirmed_at)); + } + } + } + } + + pub fn insert( + &mut self, + addr: SocketAddr, + latency: Option, + source: BestAddrSource, + confirmed_at: Instant, + has_derp: bool, + ) { + let trust_until = source.trust_until(confirmed_at); + + info!( + %addr, + latency = ?latency, + trust_for = ?trust_until.duration_since(Instant::now()), + "new best_addr (candidate address with most recent pong)" + ); + let was_empty = self.is_empty(); + let inner = BestAddrInner { + addr: AddrLatency { addr, latency }, + trust_until: Some(trust_until), + confirmed_at, + }; + self.0 = Some(inner); + if was_empty && has_derp { + // we now have a direct connection, adjust direct connection count + inc!(MagicsockMetrics, num_direct_conns_added); + if has_derp { + // we no longer rely on the relay connection, decrease the relay connection + // count + inc!(MagicsockMetrics, num_relay_conns_removed); + } + } + } + + pub fn state(&self, now: Instant) -> BestAddrState { + match &self.0 { + None => { + trace!("best_addr invalid: not set"); + BestAddrState::Empty + } + Some(state) => match state.trust_until { + Some(expiry) if now < expiry => { + trace!(addr = %state.addr.addr, remaining=?expiry.duration_since(now), "best_addr valid"); + BestAddrState::Valid(&state.addr) + } + Some(expiry) => { + trace!(addr = %state.addr.addr, since=?expiry.duration_since(now), "best_addr invalid: expired"); + BestAddrState::Outdated(&state.addr) + } + None => { + trace!(addr = %state.addr.addr, "best_addr invalid: trust_best_addr_until not set"); + BestAddrState::Outdated(&state.addr) + } + }, + } + } + + pub fn addr(&self) -> Option { + self.0.as_ref().map(|a| a.addr.addr) + } + + pub fn addr_info(&self) -> Option<&AddrLatency> { + self.0.as_ref().map(|a| &a.addr) + } + + pub fn latency(&self) -> Option { + self.addr_info().and_then(|a| a.latency) + } +} From 566d1956d8372c6a941afcd25e5b791af79a4fbb Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 20 Oct 2023 18:09:23 +0200 Subject: [PATCH 2/6] fix: no moudle names in structs --- iroh-net/src/magicsock/endpoint.rs | 20 ++++++------ iroh-net/src/magicsock/endpoint/best_addr.rs | 32 ++++++++++---------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/iroh-net/src/magicsock/endpoint.rs b/iroh-net/src/magicsock/endpoint.rs index 414898d750..11c506f973 100644 --- a/iroh-net/src/magicsock/endpoint.rs +++ b/iroh-net/src/magicsock/endpoint.rs @@ -24,7 +24,7 @@ use crate::{ use super::{metrics::Metrics as MagicsockMetrics, ActorMessage, QuicMappedAddr, SendAddr}; mod best_addr; -use best_addr::{BestAddr, BestAddrClearReason, BestAddrSource, BestAddrState}; +use best_addr::{BestAddr, ClearReason}; /// How long we wait for a pong reply before assuming it's never coming. const PING_TIMEOUT_DURATION: Duration = Duration::from_secs(5); @@ -206,17 +206,17 @@ impl Endpoint { self.update_best_addr_from_candidates(); match self.best_addr.state(*now) { // we have a valid address: use it! - BestAddrState::Valid(best_addr) => { + best_addr::State::Valid(best_addr) => { trace!(addr = %best_addr.addr, latency = ?best_addr.latency, "best_addr is set and valid, use best_addr only"); (Some(best_addr.addr), None, false) } // we have an outdated address: use it, but use derp as well. - BestAddrState::Outdated(best_addr) => { + best_addr::State::Outdated(best_addr) => { trace!(addr = %best_addr.addr, latency = ?best_addr.latency, "best_addr is set but outdated, use best_addr and derp"); (Some(best_addr.addr), self.derp_region(), true) } // we have no best address: use a random canidate if available, and derp as backup. - BestAddrState::Empty => { + best_addr::State::Empty => { let addr = self .direct_addr_state .keys() @@ -254,7 +254,7 @@ impl Endpoint { self.best_addr.insert( pong.from.as_socket_addr(), Some(lowest_latency), - BestAddrSource::BestCandidate, + best_addr::Source::BestCandidate, pong.pong_at, self.derp_region.is_some(), ); @@ -366,7 +366,7 @@ impl Endpoint { // If we fail to ping our current best addr, it is not that good anymore. self.best_addr.clear_if_equals( addr, - BestAddrClearReason::PongTimeout, + ClearReason::PongTimeout, self.derp_region.is_some(), ); } @@ -563,7 +563,7 @@ impl Endpoint { fn reset(&mut self) { self.last_full_ping = None; self.best_addr - .clear(BestAddrClearReason::Reset, self.derp_region.is_some()); + .clear(ClearReason::Reset, self.derp_region.is_some()); for es in self.direct_addr_state.values_mut() { es.last_ping = None; @@ -667,7 +667,7 @@ impl Endpoint { self.best_addr.clear_if_equals( ip_port.into(), - BestAddrClearReason::Inactive, + ClearReason::Inactive, self.derp_region.is_some(), ); } @@ -791,7 +791,7 @@ impl Endpoint { self.best_addr.insert_if_better_or_reconfirm( to, Some(latency), - BestAddrSource::ReceivedPong, + best_addr::Source::ReceivedPong, now, self.derp_region.is_some(), ); @@ -848,7 +848,7 @@ impl Endpoint { if !*want { self.best_addr.clear_if_equals( *ep, - BestAddrClearReason::PruneCallMeMaybe, + ClearReason::PruneCallMeMaybe, self.derp_region.is_some(), ); false diff --git a/iroh-net/src/magicsock/endpoint/best_addr.rs b/iroh-net/src/magicsock/endpoint/best_addr.rs index 58074847fa..aba2bf6ea3 100644 --- a/iroh-net/src/magicsock/endpoint/best_addr.rs +++ b/iroh-net/src/magicsock/endpoint/best_addr.rs @@ -22,30 +22,30 @@ struct BestAddrInner { } #[derive(Debug)] -pub(super) enum BestAddrSource { +pub(super) enum Source { ReceivedPong, BestCandidate, } -impl BestAddrSource { +impl Source { fn trust_until(&self, from: Instant) -> Instant { match self { - BestAddrSource::ReceivedPong => from + TRUST_UDP_ADDR_DURATION, + Source::ReceivedPong => from + TRUST_UDP_ADDR_DURATION, // TODO: Fix time - BestAddrSource::BestCandidate => from + Duration::from_secs(60 * 60), + Source::BestCandidate => from + Duration::from_secs(60 * 60), } } } #[derive(Debug)] -pub(super) enum BestAddrState<'a> { +pub(super) enum State<'a> { Valid(&'a AddrLatency), Outdated(&'a AddrLatency), Empty, } #[derive(Debug)] -pub enum BestAddrClearReason { +pub enum ClearReason { Reset, Inactive, PruneCallMeMaybe, @@ -72,10 +72,10 @@ impl BestAddr { } pub fn is_valid(&self, now: Instant) -> bool { - matches!(self.state(now), BestAddrState::Valid(_)) + matches!(self.state(now), State::Valid(_)) } - pub fn clear(&mut self, reason: BestAddrClearReason, has_derp: bool) -> bool { + pub fn clear(&mut self, reason: ClearReason, has_derp: bool) -> bool { if let Some(addr) = self.addr() { self.0 = None; debug!(?reason, ?has_derp, old_addr = %addr, "remove best_addr"); @@ -94,7 +94,7 @@ impl BestAddr { pub fn clear_if_equals( &mut self, addr: SocketAddr, - reason: BestAddrClearReason, + reason: ClearReason, has_derp: bool, ) -> bool { match &self.addr() { @@ -113,7 +113,7 @@ impl BestAddr { &mut self, addr: SocketAddr, latency: Option, - source: BestAddrSource, + source: Source, confirmed_at: Instant, has_derp: bool, ) { @@ -135,7 +135,7 @@ impl BestAddr { &mut self, addr: SocketAddr, latency: Option, - source: BestAddrSource, + source: Source, confirmed_at: Instant, has_derp: bool, ) { @@ -165,24 +165,24 @@ impl BestAddr { } } - pub fn state(&self, now: Instant) -> BestAddrState { + pub fn state(&self, now: Instant) -> State { match &self.0 { None => { trace!("best_addr invalid: not set"); - BestAddrState::Empty + State::Empty } Some(state) => match state.trust_until { Some(expiry) if now < expiry => { trace!(addr = %state.addr.addr, remaining=?expiry.duration_since(now), "best_addr valid"); - BestAddrState::Valid(&state.addr) + State::Valid(&state.addr) } Some(expiry) => { trace!(addr = %state.addr.addr, since=?expiry.duration_since(now), "best_addr invalid: expired"); - BestAddrState::Outdated(&state.addr) + State::Outdated(&state.addr) } None => { trace!(addr = %state.addr.addr, "best_addr invalid: trust_best_addr_until not set"); - BestAddrState::Outdated(&state.addr) + State::Outdated(&state.addr) } }, } From e5774082f2f31cc302f75d21dc2e71af96539ba3 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 23 Oct 2023 11:52:08 +0200 Subject: [PATCH 3/6] refactor: improvements from review --- iroh-net/src/magicsock/endpoint.rs | 117 +++++++------------ iroh-net/src/magicsock/endpoint/best_addr.rs | 67 ++++++----- 2 files changed, 81 insertions(+), 103 deletions(-) diff --git a/iroh-net/src/magicsock/endpoint.rs b/iroh-net/src/magicsock/endpoint.rs index 11c506f973..d189587269 100644 --- a/iroh-net/src/magicsock/endpoint.rs +++ b/iroh-net/src/magicsock/endpoint.rs @@ -43,9 +43,6 @@ const SESSION_ACTIVE_TIMEOUT: Duration = Duration::from_secs(45); /// How often we try to upgrade to a better patheven if we have some non-DERP route that works. const UPGRADE_INTERVAL: Duration = Duration::from_secs(60); -/// How long we trust a UDP address as the exclusive path (without using DERP) without having heard a Pong reply. -const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); - /// How long until we send a stayin alive ping const STAYIN_ALIVE_MIN_ELAPSED: Duration = Duration::from_secs(2); @@ -150,9 +147,14 @@ impl Endpoint { /// Returns info about this endpoint pub fn info(&self) -> EndpointInfo { - // TODO: I think we should return direct here for outdated. - let (conn_type, latency) = if self.best_addr.is_valid(Instant::now()) { - let addr_info = self.best_addr.addr_info().expect("checked"); + // If the best address is valid or outdated, report that the address is in use. + // Actually, if the best_addr is outdated, we are using *both* the direct address and derp + // in parallel. We don't have a way to report that in [`EndpointInfo`] at the moment though. + let direct = match self.best_addr.state(Instant::now()) { + best_addr::State::Valid(addr) | best_addr::State::Outdated(addr) => Some(addr), + best_addr::State::Empty => None, + }; + let (conn_type, latency) = if let Some(addr_info) = direct { (ConnectionType::Direct(addr_info.addr), addr_info.latency) } else if let Some((region_id, relay_state)) = self.derp_region.as_ref() { let latency = relay_state.recent_pong().map(|pong| pong.latency); @@ -203,7 +205,7 @@ impl Endpoint { } // Update our best addr from candidate addresses (only if it is empty and if we have recent // pongs). - self.update_best_addr_from_candidates(); + self.assign_best_addr_from_candidates_if_empty(); match self.best_addr.state(*now) { // we have a valid address: use it! best_addr::State::Valid(best_addr) => { @@ -229,11 +231,9 @@ impl Endpoint { } /// Update our best_addr (if empty) with the candidate udp addr with the lowest latency. - /// - /// Returns true if the address was updated. - fn update_best_addr_from_candidates(&mut self) -> bool { + fn assign_best_addr_from_candidates_if_empty(&mut self) { if !self.best_addr.is_empty() { - return false; + return; } let mut lowest_latency = Duration::from_secs(60 * 60); let mut last_pong = None; @@ -253,48 +253,48 @@ impl Endpoint { if let Some(pong) = last_pong { self.best_addr.insert( pong.from.as_socket_addr(), - Some(lowest_latency), + Some(pong.latency), best_addr::Source::BestCandidate, pong.pong_at, self.derp_region.is_some(), ); - true - } else { - false } } /// Reports whether we should ping to all our direct addresses looking for a better path. fn want_full_ping(&self, now: &Instant) -> bool { trace!("full ping: wanted?"); - if self.last_full_ping.is_none() { + let Some(last_full_ping) = self.last_full_ping else { debug!("full ping: no full ping done"); return true; + }; + match self.best_addr.state(*now) { + best_addr::State::Empty => { + debug!("full ping: best addr not set"); + true + } + best_addr::State::Outdated(_) => { + debug!("full ping: best addr expired"); + true + } + best_addr::State::Valid(addr) => { + if addr + .latency + .map(|l| l > GOOD_ENOUGH_LATENCY) + .unwrap_or(true) + && *now - last_full_ping >= UPGRADE_INTERVAL + { + debug!( + "full ping: full ping interval expired and latency is only {}ms", + addr.latency.map(|l| l.as_millis()).unwrap_or_default() + ); + true + } else { + trace!(?now, "full ping: not needed"); + false + } + } } - if !self.best_addr.is_valid(*now) { - debug!("full ping: best addr expired"); - return true; - } - - if self - .best_addr - .latency() - .map(|l| l > GOOD_ENOUGH_LATENCY) - .unwrap_or(true) - && *now - *self.last_full_ping.as_ref().unwrap() >= UPGRADE_INTERVAL - { - debug!( - "full ping: full ping interval expired and latency is only {}ms", - self.best_addr - .latency() - .map(|l| l.as_millis()) - .unwrap_or_default() - ); - return true; - } - trace!(?now, "full ping: not needed"); - - false } /// Starts a ping for the "ping" command. @@ -318,7 +318,7 @@ impl Endpoint { } } if let Some(udp_addr) = udp_addr { - if self.best_addr.is_valid(now) { + if let best_addr::State::Valid(_) = self.best_addr.state(now) { // Already have an active session, so just ping the address we're using. // Otherwise "tailscale ping" results to a node on the local network // can look like they're bouncing between, say 10.0.0.0/9 and the peer's @@ -980,13 +980,6 @@ impl Endpoint { } } -/// A `SocketAddr` with an associated latency. -#[derive(Debug, Clone)] -pub struct AddrLatency { - pub addr: SocketAddr, - pub latency: Option, -} - /// An (Ip, Port) pair. /// /// NOTE: storing an [`IpPort`] is safer than storing a [`SocketAddr`] because for IPv6 socket @@ -1555,34 +1548,6 @@ pub enum DiscoPingPurpose { StayinAlive, } -impl AddrLatency { - /// Reports whether `self` is a better addr to use than `other`. - fn is_better_than(&self, other: &Self) -> bool { - if self.addr == other.addr { - return false; - } - if self.addr.is_ipv6() && other.addr.is_ipv4() { - // Prefer IPv6 for being a bit more robust, as long as - // the latencies are roughly equivalent. - match (self.latency, other.latency) { - (Some(latency), Some(other_latency)) => { - if latency / 10 * 9 < other_latency { - return true; - } - } - (Some(_), None) => { - // If we have latency and the other doesn't prefer us - return true; - } - _ => {} - } - } else if self.addr.is_ipv4() && other.addr.is_ipv6() && other.is_better_than(self) { - return false; - } - self.latency < other.latency - } -} - #[cfg(test)] mod tests { use std::{env::temp_dir, net::Ipv4Addr}; diff --git a/iroh-net/src/magicsock/endpoint/best_addr.rs b/iroh-net/src/magicsock/endpoint/best_addr.rs index aba2bf6ea3..2e002da31e 100644 --- a/iroh-net/src/magicsock/endpoint/best_addr.rs +++ b/iroh-net/src/magicsock/endpoint/best_addr.rs @@ -6,11 +6,13 @@ use std::{ }; use iroh_metrics::inc; -use tracing::{debug, info, trace}; +use tracing::{debug, info}; -use super::{AddrLatency, TRUST_UDP_ADDR_DURATION}; use crate::magicsock::metrics::Metrics as MagicsockMetrics; +/// How long we trust a UDP address as the exclusive path (without using DERP) without having heard a Pong reply. +const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); + #[derive(Debug, Default)] pub(super) struct BestAddr(Option); @@ -67,14 +69,11 @@ impl BestAddr { }; Self(Some(inner)) } + pub fn is_empty(&self) -> bool { self.0.is_none() } - pub fn is_valid(&self, now: Instant) -> bool { - matches!(self.state(now), State::Valid(_)) - } - pub fn clear(&mut self, reason: ClearReason, has_derp: bool) -> bool { if let Some(addr) = self.addr() { self.0 = None; @@ -167,23 +166,10 @@ impl BestAddr { pub fn state(&self, now: Instant) -> State { match &self.0 { - None => { - trace!("best_addr invalid: not set"); - State::Empty - } + None => State::Empty, Some(state) => match state.trust_until { - Some(expiry) if now < expiry => { - trace!(addr = %state.addr.addr, remaining=?expiry.duration_since(now), "best_addr valid"); - State::Valid(&state.addr) - } - Some(expiry) => { - trace!(addr = %state.addr.addr, since=?expiry.duration_since(now), "best_addr invalid: expired"); - State::Outdated(&state.addr) - } - None => { - trace!(addr = %state.addr.addr, "best_addr invalid: trust_best_addr_until not set"); - State::Outdated(&state.addr) - } + Some(expiry) if now < expiry => State::Valid(&state.addr), + Some(_) | None => State::Outdated(&state.addr), }, } } @@ -191,12 +177,39 @@ impl BestAddr { pub fn addr(&self) -> Option { self.0.as_ref().map(|a| a.addr.addr) } +} - pub fn addr_info(&self) -> Option<&AddrLatency> { - self.0.as_ref().map(|a| &a.addr) - } +/// A `SocketAddr` with an associated latency. +#[derive(Debug, Clone)] +pub struct AddrLatency { + pub addr: SocketAddr, + pub latency: Option, +} - pub fn latency(&self) -> Option { - self.addr_info().and_then(|a| a.latency) +impl AddrLatency { + /// Reports whether `self` is a better addr to use than `other`. + fn is_better_than(&self, other: &Self) -> bool { + if self.addr == other.addr { + return false; + } + if self.addr.is_ipv6() && other.addr.is_ipv4() { + // Prefer IPv6 for being a bit more robust, as long as + // the latencies are roughly equivalent. + match (self.latency, other.latency) { + (Some(latency), Some(other_latency)) => { + if latency / 10 * 9 < other_latency { + return true; + } + } + (Some(_), None) => { + // If we have latency and the other doesn't prefer us + return true; + } + _ => {} + } + } else if self.addr.is_ipv4() && other.addr.is_ipv6() && other.is_better_than(self) { + return false; + } + self.latency < other.latency } } From 6bc422c9189a327223d6a7bc5144abbb1e5ced70 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 23 Oct 2023 12:02:34 +0200 Subject: [PATCH 4/6] refactor: best_addr latency is never empty --- iroh-net/src/magicsock/endpoint.rs | 18 ++++++++---------- iroh-net/src/magicsock/endpoint/best_addr.rs | 19 +++++-------------- 2 files changed, 13 insertions(+), 24 deletions(-) diff --git a/iroh-net/src/magicsock/endpoint.rs b/iroh-net/src/magicsock/endpoint.rs index 5e160e7f02..55c4c27f5d 100644 --- a/iroh-net/src/magicsock/endpoint.rs +++ b/iroh-net/src/magicsock/endpoint.rs @@ -154,7 +154,10 @@ impl Endpoint { best_addr::State::Empty => None, }; let (conn_type, latency) = if let Some(addr_info) = direct { - (ConnectionType::Direct(addr_info.addr), addr_info.latency) + ( + ConnectionType::Direct(addr_info.addr), + Some(addr_info.latency), + ) } else if let Some((region_id, relay_state)) = self.derp_region.as_ref() { let latency = relay_state.recent_pong().map(|pong| pong.latency); (ConnectionType::Relay(*region_id), latency) @@ -256,7 +259,7 @@ impl Endpoint { if let Some(pong) = last_pong { self.best_addr.insert( pong.from.as_socket_addr(), - Some(pong.latency), + pong.latency, best_addr::Source::BestCandidate, pong.pong_at, self.derp_region.is_some(), @@ -281,15 +284,10 @@ impl Endpoint { true } best_addr::State::Valid(addr) => { - if addr - .latency - .map(|l| l > GOOD_ENOUGH_LATENCY) - .unwrap_or(true) - && *now - last_full_ping >= UPGRADE_INTERVAL - { + if addr.latency > GOOD_ENOUGH_LATENCY && *now - last_full_ping >= UPGRADE_INTERVAL { debug!( "full ping: full ping interval expired and latency is only {}ms", - addr.latency.map(|l| l.as_millis()).unwrap_or_default() + addr.latency.as_millis() ); true } else { @@ -795,7 +793,7 @@ impl Endpoint { debug_assert!(!is_derp, "missmatching derp & udp"); self.best_addr.insert_if_better_or_reconfirm( to, - Some(latency), + latency, best_addr::Source::ReceivedPong, now, self.derp_region.is_some(), diff --git a/iroh-net/src/magicsock/endpoint/best_addr.rs b/iroh-net/src/magicsock/endpoint/best_addr.rs index 2e002da31e..7345f292be 100644 --- a/iroh-net/src/magicsock/endpoint/best_addr.rs +++ b/iroh-net/src/magicsock/endpoint/best_addr.rs @@ -111,7 +111,7 @@ impl BestAddr { pub fn insert_if_better_or_reconfirm( &mut self, addr: SocketAddr, - latency: Option, + latency: Duration, source: Source, confirmed_at: Instant, has_derp: bool, @@ -133,7 +133,7 @@ impl BestAddr { pub fn insert( &mut self, addr: SocketAddr, - latency: Option, + latency: Duration, source: Source, confirmed_at: Instant, has_derp: bool, @@ -183,7 +183,7 @@ impl BestAddr { #[derive(Debug, Clone)] pub struct AddrLatency { pub addr: SocketAddr, - pub latency: Option, + pub latency: Duration, } impl AddrLatency { @@ -195,17 +195,8 @@ impl AddrLatency { if self.addr.is_ipv6() && other.addr.is_ipv4() { // Prefer IPv6 for being a bit more robust, as long as // the latencies are roughly equivalent. - match (self.latency, other.latency) { - (Some(latency), Some(other_latency)) => { - if latency / 10 * 9 < other_latency { - return true; - } - } - (Some(_), None) => { - // If we have latency and the other doesn't prefer us - return true; - } - _ => {} + if self.latency / 10 * 9 < other.latency { + return true; } } else if self.addr.is_ipv4() && other.addr.is_ipv6() && other.is_better_than(self) { return false; From 29455c6383dbb06267f81baa08ac5e2686bc2f70 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 23 Oct 2023 12:11:53 +0200 Subject: [PATCH 5/6] tests: fix after changes --- iroh-net/src/magicsock/endpoint.rs | 8 ++++---- iroh-net/src/magicsock/endpoint/best_addr.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/iroh-net/src/magicsock/endpoint.rs b/iroh-net/src/magicsock/endpoint.rs index 55c4c27f5d..7c957df6ac 100644 --- a/iroh-net/src/magicsock/endpoint.rs +++ b/iroh-net/src/magicsock/endpoint.rs @@ -1663,7 +1663,7 @@ mod tests { derp_region: new_relay_and_state(Some(0)), best_addr: BestAddr::from_parts( ip_port.into(), - Some(latency), + latency, now, now + Duration::from_secs(100), ), @@ -1759,7 +1759,7 @@ mod tests { derp_region: Some((0, relay_state)), best_addr: BestAddr::from_parts( socket_addr, - Some(Duration::from_millis(80)), + Duration::from_millis(80), now, expired, ), @@ -1815,8 +1815,8 @@ mod tests { last_control: Some((elapsed, ControlMsg::Pong)), last_payload: None, }]), - conn_type: ConnectionType::Relay(0), - latency: Some(latency), + conn_type: ConnectionType::Direct(d_socket_addr), + latency: Some(Duration::from_millis(80)), last_used: Some(elapsed), }, ]); diff --git a/iroh-net/src/magicsock/endpoint/best_addr.rs b/iroh-net/src/magicsock/endpoint/best_addr.rs index 7345f292be..0652c0b225 100644 --- a/iroh-net/src/magicsock/endpoint/best_addr.rs +++ b/iroh-net/src/magicsock/endpoint/best_addr.rs @@ -58,7 +58,7 @@ impl BestAddr { #[cfg(test)] pub fn from_parts( addr: SocketAddr, - latency: Option, + latency: Duration, confirmed_at: Instant, trust_until: Instant, ) -> Self { From 50f4440c8dcdae22febdd33aca05ab05ddbd8ddf Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 23 Oct 2023 12:59:22 +0200 Subject: [PATCH 6/6] fix: report ConnectionType::Mixed if using both UDP and Derp --- iroh-net/src/magicsock/endpoint.rs | 50 ++++++++++++++++++------------ 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/iroh-net/src/magicsock/endpoint.rs b/iroh-net/src/magicsock/endpoint.rs index 7c957df6ac..a3e076fc4f 100644 --- a/iroh-net/src/magicsock/endpoint.rs +++ b/iroh-net/src/magicsock/endpoint.rs @@ -146,23 +146,24 @@ impl Endpoint { /// Returns info about this endpoint pub fn info(&self, now: Instant) -> EndpointInfo { - // If the best address is valid or outdated, report that the address is in use. - // Actually, if the best_addr is outdated, we are using *both* the direct address and derp - // in parallel. We don't have a way to report that in [`EndpointInfo`] at the moment though. - let direct = match self.best_addr.state(now) { - best_addr::State::Valid(addr) | best_addr::State::Outdated(addr) => Some(addr), - best_addr::State::Empty => None, - }; - let (conn_type, latency) = if let Some(addr_info) = direct { - ( - ConnectionType::Direct(addr_info.addr), - Some(addr_info.latency), - ) - } else if let Some((region_id, relay_state)) = self.derp_region.as_ref() { - let latency = relay_state.recent_pong().map(|pong| pong.latency); - (ConnectionType::Relay(*region_id), latency) - } else { - (ConnectionType::None, None) + use best_addr::State::*; + // Report our active connection. This replicates the logic of [`Endpoint::addr_for_send`] + // without chosing a random candidate address if no best_addr is set. + let (conn_type, latency) = match (self.best_addr.state(now), self.derp_region.as_ref()) { + (Valid(addr), _) | (Outdated(addr), None) => { + (ConnectionType::Direct(addr.addr), Some(addr.latency)) + } + (Outdated(addr), Some((region, relay_state))) => { + let latency = relay_state + .latency() + .map(|l| l.min(addr.latency)) + .unwrap_or(addr.latency); + (ConnectionType::Mixed(addr.addr, *region), Some(latency)) + } + (Empty, Some((region, relay_state))) => { + (ConnectionType::Relay(*region), relay_state.latency()) + } + (Empty, None) => (ConnectionType::None, None), }; let addrs = self .direct_addr_state @@ -1454,6 +1455,12 @@ pub enum ConnectionType { /// Relay connection over DERP #[display("relay")] Relay(u16), + /// Both a UDP and a DERP connection are used. + /// + /// This is the case if we do have a UDP address, but are missing a recent confirmation that + /// the addrss works. + #[display("relay")] + Mixed(SocketAddr, u16), /// We have no verified connection to this PublicKey #[display("none")] None, @@ -1567,6 +1574,11 @@ impl EndpointState { self.recent_pong.as_ref() } + /// Returns the latency from the most recent pong, if available. + fn latency(&self) -> Option { + self.recent_pong.as_ref().map(|p| p.latency) + } + fn needs_ping(&self, now: &Instant) -> bool { match self.last_ping { None => true, @@ -1815,8 +1827,8 @@ mod tests { last_control: Some((elapsed, ControlMsg::Pong)), last_payload: None, }]), - conn_type: ConnectionType::Direct(d_socket_addr), - latency: Some(Duration::from_millis(80)), + conn_type: ConnectionType::Mixed(d_socket_addr, 0), + latency: Some(Duration::from_millis(50)), last_used: Some(elapsed), }, ]);