From 0f8bee4dbfdbbc6f1a8e2e6cd2ed962993e9fcae Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Mon, 11 Nov 2024 23:09:50 +0530 Subject: [PATCH] fix(network): do not log connection error for established connections --- sn_networking/src/driver.rs | 22 ++++++++++++++++- sn_networking/src/event/swarm.rs | 41 +++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 694a850640..3039ef68cf 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -32,7 +32,6 @@ use futures::future::Either; use futures::StreamExt; #[cfg(feature = "local")] use libp2p::mdns; -use libp2p::Transport as _; use libp2p::{core::muxing::StreamMuxerBox, relay}; use libp2p::{ identity::Keypair, @@ -45,6 +44,7 @@ use libp2p::{ }, Multiaddr, PeerId, }; +use libp2p::{swarm::SwarmEvent, Transport as _}; #[cfg(feature = "open-metrics")] use prometheus_client::metrics::info::Info; use sn_evm::PaymentQuote; @@ -721,6 +721,7 @@ impl NetworkBuilder { network_discovery: NetworkDiscovery::new(&peer_id), bootstrap_peers: Default::default(), live_connected_peers: Default::default(), + latest_connected_peers: Default::default(), handling_statistics: Default::default(), handled_times: 0, hard_disk_write_error: 0, @@ -819,6 +820,10 @@ pub struct SwarmDriver { // Peers that having live connection to. Any peer got contacted during kad network query // will have live connection established. And they may not appear in the RT. pub(crate) live_connected_peers: BTreeMap, + /// The peers that we connected to recently to. + /// This is a limited list used to prevent log spamming. + /// Use `live_connected_peers` for a full list. + pub(crate) latest_connected_peers: HashMap, // Record the handling time of the recent 10 for each handling kind. handling_statistics: BTreeMap>, handled_times: usize, @@ -846,6 +851,8 @@ impl SwarmDriver { let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL); let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL); + // temporarily skip processing IncomingConnectionError swarm event to avoid log spamming + let mut previous_incoming_connection_error_event = None; loop { tokio::select! { // polls futures in order they appear here (as opposed to random) @@ -878,6 +885,19 @@ impl SwarmDriver { }, // next take and react to external swarm events swarm_event = self.swarm.select_next_some() => { + + // Refer to the handle_swarm_events::IncomingConnectionError for more info on why we skip + // processing the event for one round. + if let Some(previous_event) = previous_incoming_connection_error_event.take() { + if let Err(err) = self.handle_swarm_events(previous_event) { + warn!("Error while handling swarm event: {err}"); + } + } + if matches!(swarm_event, SwarmEvent::IncomingConnectionError {..}) { + previous_incoming_connection_error_event = Some(swarm_event); + continue; + } + // logging for handling events happens inside handle_swarm_events // otherwise we're rewriting match statements etc around this anwyay if let Err(err) = self.handle_swarm_events(swarm_event) { diff --git a/sn_networking/src/event/swarm.rs b/sn_networking/src/event/swarm.rs index f0fd69254e..a159076210 100644 --- a/sn_networking/src/event/swarm.rs +++ b/sn_networking/src/event/swarm.rs @@ -364,6 +364,7 @@ impl SwarmDriver { connection_id, (peer_id, Instant::now() + Duration::from_secs(60)), ); + self.insert_latest_connected_peers(endpoint.get_remote_address().clone()); self.record_connection_metrics(); if endpoint.is_dialer() { @@ -380,6 +381,9 @@ impl SwarmDriver { event_string = "ConnectionClosed"; debug!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint)); let _ = self.live_connected_peers.remove(&connection_id); + let _ = self + .latest_connected_peers + .remove(endpoint.get_remote_address()); self.record_connection_metrics(); } SwarmEvent::OutgoingConnectionError { @@ -509,7 +513,18 @@ impl SwarmDriver { error, } => { event_string = "Incoming ConnErr"; - error!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}"); + // Only log if this for a non-existing connection. + // If a peer contains multiple transports/listen addrs, we might try to open multiple connections, + // and if the first one passes, we would get error on the rest. We don't want to log these. + // + // Also sometimes we get the ConnectionEstablished event immediately after this event. + // So during tokio::select! of the events, we skip processing IncomingConnectionError for one round, + // giving time for ConnectionEstablished to be hopefully processed. + // And since we don't do anything critical with this event, the order and time of processing is + // not critical. + if !self.latest_connected_peers.contains_key(&send_back_addr) { + error!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}"); + } let _ = self.live_connected_peers.remove(&connection_id); self.record_connection_metrics(); } @@ -659,6 +674,30 @@ impl SwarmDriver { .set(self.swarm.connected_peers().count() as i64); } } + + /// Insert into the latest connected peers list. This list does not contain all the connected peers. + /// Older entries are removed if the list exceeds 50. + fn insert_latest_connected_peers(&mut self, addr: Multiaddr) { + while self.latest_connected_peers.len() >= 50 { + // remove the oldest entry + let Some(oldest) = self + .latest_connected_peers + .iter() + .min_by_key(|(_, time)| *time) + .map(|(addr, _)| addr.clone()) + else { + break; + }; + + self.latest_connected_peers.remove(&oldest); + } + + let old_instant = self + .latest_connected_peers + .entry(addr) + .or_insert_with(Instant::now); + *old_instant = Instant::now(); + } } /// Helper function to print formatted connection role info.