Skip to content

Commit

Permalink
Merge pull request #2432 from RolandSherwin/skip_ws_connection_error
Browse files Browse the repository at this point in the history
fix(network): do not log connection error for established connections
  • Loading branch information
jacderida authored Nov 11, 2024
2 parents b74c2c3 + 416652d commit 93e516c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 2 deletions.
22 changes: 21 additions & 1 deletion sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use futures::future::Either;
use futures::StreamExt;
#[cfg(feature = "local")]
use libp2p::mdns;
use libp2p::Transport as _;
use libp2p::{core::muxing::StreamMuxerBox, relay};
use libp2p::{
identity::Keypair,
Expand All @@ -45,6 +44,7 @@ use libp2p::{
},
Multiaddr, PeerId,
};
use libp2p::{swarm::SwarmEvent, Transport as _};
#[cfg(feature = "open-metrics")]
use prometheus_client::metrics::info::Info;
use sn_evm::PaymentQuote;
Expand Down Expand Up @@ -721,6 +721,7 @@ impl NetworkBuilder {
network_discovery: NetworkDiscovery::new(&peer_id),
bootstrap_peers: Default::default(),
live_connected_peers: Default::default(),
latest_connected_peers: Default::default(),
handling_statistics: Default::default(),
handled_times: 0,
hard_disk_write_error: 0,
Expand Down Expand Up @@ -819,6 +820,10 @@ pub struct SwarmDriver {
// Peers that having live connection to. Any peer got contacted during kad network query
// will have live connection established. And they may not appear in the RT.
pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Instant)>,
/// The peers that we recently connected to.
/// This is a limited list used to prevent log spamming.
/// Use `live_connected_peers` for a full list.
pub(crate) latest_connected_peers: HashMap<Multiaddr, Instant>,
// Record the handling time of the recent 10 for each handling kind.
handling_statistics: BTreeMap<String, Vec<Duration>>,
handled_times: usize,
Expand Down Expand Up @@ -846,6 +851,8 @@ impl SwarmDriver {
let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL);
let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL);

// temporarily skip processing IncomingConnectionError swarm event to avoid log spamming
let mut previous_incoming_connection_error_event = None;
loop {
tokio::select! {
// polls futures in order they appear here (as opposed to random)
Expand Down Expand Up @@ -878,6 +885,19 @@ impl SwarmDriver {
},
// next take and react to external swarm events
swarm_event = self.swarm.select_next_some() => {

// Refer to the handle_swarm_events::IncomingConnectionError for more info on why we skip
// processing the event for one round.
if let Some(previous_event) = previous_incoming_connection_error_event.take() {
if let Err(err) = self.handle_swarm_events(previous_event) {
warn!("Error while handling swarm event: {err}");
}
}
if matches!(swarm_event, SwarmEvent::IncomingConnectionError {..}) {
previous_incoming_connection_error_event = Some(swarm_event);
continue;
}

// logging for handling events happens inside handle_swarm_events
// otherwise we're rewriting match statements etc around this anwyay
if let Err(err) = self.handle_swarm_events(swarm_event) {
Expand Down
41 changes: 40 additions & 1 deletion sn_networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ impl SwarmDriver {
connection_id,
(peer_id, Instant::now() + Duration::from_secs(60)),
);
self.insert_latest_connected_peers(endpoint.get_remote_address().clone());
self.record_connection_metrics();

if endpoint.is_dialer() {
Expand All @@ -380,6 +381,9 @@ impl SwarmDriver {
event_string = "ConnectionClosed";
debug!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint));
let _ = self.live_connected_peers.remove(&connection_id);
let _ = self
.latest_connected_peers
.remove(endpoint.get_remote_address());
self.record_connection_metrics();
}
SwarmEvent::OutgoingConnectionError {
Expand Down Expand Up @@ -509,7 +513,18 @@ impl SwarmDriver {
error,
} => {
event_string = "Incoming ConnErr";
error!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}");
// Only log if this for a non-existing connection.
// If a peer contains multiple transports/listen addrs, we might try to open multiple connections,
// and if the first one passes, we would get error on the rest. We don't want to log these.
//
// Also sometimes we get the ConnectionEstablished event immediately after this event.
// So during tokio::select! of the events, we skip processing IncomingConnectionError for one round,
// giving time for ConnectionEstablished to be hopefully processed.
// And since we don't do anything critical with this event, the order and time of processing is
// not critical.
if !self.latest_connected_peers.contains_key(&send_back_addr) {
error!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}");
}
let _ = self.live_connected_peers.remove(&connection_id);
self.record_connection_metrics();
}
Expand Down Expand Up @@ -659,6 +674,30 @@ impl SwarmDriver {
.set(self.swarm.connected_peers().count() as i64);
}
}

/// Insert into the latest connected peers list. This list does not contain all the connected peers.
/// Older entries are removed if the list exceeds 50.
fn insert_latest_connected_peers(&mut self, addr: Multiaddr) {
let old_instant = self
.latest_connected_peers
.entry(addr)
.or_insert_with(Instant::now);
*old_instant = Instant::now();

while self.latest_connected_peers.len() >= 50 {
// remove the oldest entry
let Some(oldest) = self
.latest_connected_peers
.iter()
.min_by_key(|(_, time)| *time)
.map(|(addr, _)| addr.clone())
else {
break;
};

self.latest_connected_peers.remove(&oldest);
}
}
}

/// Helper function to print formatted connection role info.
Expand Down

0 comments on commit 93e516c

Please sign in to comment.