Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(network): do not log connection error for established connections #2432

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use futures::future::Either;
use futures::StreamExt;
#[cfg(feature = "local")]
use libp2p::mdns;
use libp2p::Transport as _;
use libp2p::{core::muxing::StreamMuxerBox, relay};
use libp2p::{
identity::Keypair,
Expand All @@ -45,6 +44,7 @@ use libp2p::{
},
Multiaddr, PeerId,
};
use libp2p::{swarm::SwarmEvent, Transport as _};
#[cfg(feature = "open-metrics")]
use prometheus_client::metrics::info::Info;
use sn_evm::PaymentQuote;
Expand Down Expand Up @@ -721,6 +721,7 @@ impl NetworkBuilder {
network_discovery: NetworkDiscovery::new(&peer_id),
bootstrap_peers: Default::default(),
live_connected_peers: Default::default(),
latest_connected_peers: Default::default(),
handling_statistics: Default::default(),
handled_times: 0,
hard_disk_write_error: 0,
Expand Down Expand Up @@ -819,6 +820,10 @@ pub struct SwarmDriver {
// Peers that having live connection to. Any peer got contacted during kad network query
// will have live connection established. And they may not appear in the RT.
pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Instant)>,
/// The peers that we recently connected to.
/// This is a limited list used to prevent log spamming.
/// Use `live_connected_peers` for a full list.
pub(crate) latest_connected_peers: HashMap<Multiaddr, Instant>,
// Record the handling time of the recent 10 for each handling kind.
handling_statistics: BTreeMap<String, Vec<Duration>>,
handled_times: usize,
Expand Down Expand Up @@ -846,6 +851,8 @@ impl SwarmDriver {
let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL);
let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL);

// temporarily skip processing IncomingConnectionError swarm event to avoid log spamming
let mut previous_incoming_connection_error_event = None;
loop {
tokio::select! {
// polls futures in order they appear here (as opposed to random)
Expand Down Expand Up @@ -878,6 +885,19 @@ impl SwarmDriver {
},
// next take and react to external swarm events
swarm_event = self.swarm.select_next_some() => {

// Refer to the handle_swarm_events::IncomingConnectionError for more info on why we skip
// processing the event for one round.
if let Some(previous_event) = previous_incoming_connection_error_event.take() {
if let Err(err) = self.handle_swarm_events(previous_event) {
warn!("Error while handling swarm event: {err}");
}
}
if matches!(swarm_event, SwarmEvent::IncomingConnectionError {..}) {
previous_incoming_connection_error_event = Some(swarm_event);
continue;
}

// logging for handling events happens inside handle_swarm_events
// otherwise we're rewriting match statements etc around this anwyay
if let Err(err) = self.handle_swarm_events(swarm_event) {
Expand Down
41 changes: 40 additions & 1 deletion sn_networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ impl SwarmDriver {
connection_id,
(peer_id, Instant::now() + Duration::from_secs(60)),
);
self.insert_latest_connected_peers(endpoint.get_remote_address().clone());
self.record_connection_metrics();

if endpoint.is_dialer() {
Expand All @@ -380,6 +381,9 @@ impl SwarmDriver {
event_string = "ConnectionClosed";
debug!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint));
let _ = self.live_connected_peers.remove(&connection_id);
let _ = self
.latest_connected_peers
.remove(endpoint.get_remote_address());
self.record_connection_metrics();
}
SwarmEvent::OutgoingConnectionError {
Expand Down Expand Up @@ -509,7 +513,18 @@ impl SwarmDriver {
error,
} => {
event_string = "Incoming ConnErr";
error!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}");
// Only log if this for a non-existing connection.
// If a peer contains multiple transports/listen addrs, we might try to open multiple connections,
// and if the first one passes, we would get error on the rest. We don't want to log these.
//
// Also sometimes we get the ConnectionEstablished event immediately after this event.
// So during tokio::select! of the events, we skip processing IncomingConnectionError for one round,
// giving time for ConnectionEstablished to be hopefully processed.
// And since we don't do anything critical with this event, the order and time of processing is
// not critical.
if !self.latest_connected_peers.contains_key(&send_back_addr) {
error!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}");
}
let _ = self.live_connected_peers.remove(&connection_id);
self.record_connection_metrics();
}
Expand Down Expand Up @@ -659,6 +674,30 @@ impl SwarmDriver {
.set(self.swarm.connected_peers().count() as i64);
}
}

/// Insert into the latest connected peers list. This list does not contain all the connected peers.
/// Older entries are removed if the list exceeds 50.
fn insert_latest_connected_peers(&mut self, addr: Multiaddr) {
let old_instant = self
.latest_connected_peers
.entry(addr)
.or_insert_with(Instant::now);
*old_instant = Instant::now();

while self.latest_connected_peers.len() >= 50 {
// remove the oldest entry
let Some(oldest) = self
.latest_connected_peers
.iter()
.min_by_key(|(_, time)| *time)
.map(|(addr, _)| addr.clone())
else {
maqi marked this conversation as resolved.
Show resolved Hide resolved
break;
};

self.latest_connected_peers.remove(&oldest);
}
}
}

/// Helper function to print formatted connection role info.
Expand Down
Loading