Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

fix(network): wrong connected consensus peer count #451

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 31 additions & 23 deletions core/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ impl PeerManager {
}

let session = ArcSession::new(remote_peer.clone(), Arc::clone(&ctx));
info!("new session from {}", session.connected_addr);
info!("check new session from {}", session.connected_addr);

// Always allow peer in allowlist and consensus peer
if !remote_peer.tags.contains(&PeerTag::AlwaysAllow)
Expand Down Expand Up @@ -829,6 +829,7 @@ impl PeerManager {
return;
}
};
info!("new session from {}", session.connected_addr);

if !session.peer.has_pubkey() {
if let Err(e) = session.peer.set_pubkey(pubkey) {
Expand Down Expand Up @@ -862,54 +863,61 @@ impl PeerManager {
}
}

fn session_closed(&mut self, sid: SessionId) {
debug!("session {} closed", sid);
fn session_closed(&mut self, pid: PeerId, sid: SessionId) {
debug!("peer {:?} session {} closed", pid, sid);

// Unidentified session
if self.unidentified_backlog.take(&sid).is_some() {
return;
}

common_apm::metrics::network::NETWORK_CONNECTED_PEERS.dec();

let session = match self.inner.remove_session(sid) {
Some(s) => s,
None => return, /* Session may be removed by other event or rejected
* due to max connections before insert */
};
// Session may be removed by other event or rejected
let opt_session = self.inner.remove_session(sid);
if let Some(ref session) = opt_session {
common_apm::metrics::network::NETWORK_IP_DISCONNECTED_COUNT_VEC
.with_label_values(&[&session.connected_addr.host])
.inc();

common_apm::metrics::network::NETWORK_IP_DISCONNECTED_COUNT_VEC
.with_label_values(&[&session.connected_addr.host])
.inc();
log::info!("{} session closed", session.connected_addr);
}

info!("session closed {}", session.connected_addr);
session.peer.mark_disconnected();
let remote_peer = {
match opt_session.map_or_else(|| self.inner.peer(&pid), |s| Some(s.peer.to_owned())) {
Some(peer) => peer,
None => {
log::info!("close unsaved peer session, peer {:?}", pid);
return;
}
}
};

if session.peer.tags.contains(&PeerTag::Consensus) {
remote_peer.mark_disconnected();
if remote_peer.tags.contains(&PeerTag::Consensus) {
common_apm::metrics::network::NETWORK_CONNECTED_CONSENSUS_PEERS.dec();
}

match session.peer.trust_metric() {
match remote_peer.trust_metric() {
Some(trust_metric) => trust_metric.pause(),
None => {
warn!("session peer {:?} trust metric not found", session.peer.id);
warn!("session peer {:?} trust metric not found", remote_peer.id);

let trust_metric = TrustMetric::new(Arc::clone(&self.config.peer_trust_config));
session.peer.set_trust_metric(trust_metric);
remote_peer.set_trust_metric(trust_metric);
}
}

if session.peer.alive() < SHORT_ALIVE_SESSION {
if remote_peer.alive() < SHORT_ALIVE_SESSION {
// NOTE: peer maybe abnormally disconnect from others. When we try
// to reconnect, other peers may treat this as repeated connection,
// then disconnect. We have to wait for timeout.
warn!(
"increase peer {:?} retry due to repeated short live session",
session.peer.id
remote_peer.id
);

while session.peer.retry.eta() < REPEATED_CONNECTION_TIMEOUT {
session.peer.retry.inc();
while remote_peer.retry.eta() < REPEATED_CONNECTION_TIMEOUT {
remote_peer.retry.inc();
}
}
}
Expand Down Expand Up @@ -1344,7 +1352,7 @@ impl PeerManager {
self.repeated_connection(ty, sid, addr)
}
PeerManagerEvent::SessionBlocked { ctx, .. } => self.session_blocked(ctx),
PeerManagerEvent::SessionClosed { sid, .. } => self.session_closed(sid),
PeerManagerEvent::SessionClosed { sid, pid } => self.session_closed(pid, sid),
PeerManagerEvent::SessionFailed { sid, kind } => self.session_failed(sid, kind),
PeerManagerEvent::PeerAlive { pid } => self.update_peer_alive(&pid),
PeerManagerEvent::Misbehave { pid, kind } => self.peer_misbehave(pid, kind),
Expand Down
21 changes: 21 additions & 0 deletions core/network/src/peer_manager/test_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,27 @@ async fn should_increase_retry_for_short_alive_session_on_session_closed() {
);
}

#[tokio::test]
async fn should_properly_update_peer_state_even_if_session_not_found_on_session_closed() {
let (mut mgr, _conn_rx) = make_manager(2, 20);
let remote_peers = make_sessions(&mut mgr, 1, 5000, SessionType::Outbound).await;
let test_peer = remote_peers.first().expect("get first peer");

let inner = mgr.core_inner();
assert_eq!(inner.connected(), 1, "should have one session");

inner.remove_session(test_peer.session_id());
assert_eq!(inner.connected(), 0, "should have no session");

let session_closed = PeerManagerEvent::SessionClosed {
pid: test_peer.owned_id(),
sid: test_peer.session_id(),
};
mgr.poll_event(session_closed).await;

assert_eq!(test_peer.connectedness(), Connectedness::CanConnect);
}

#[tokio::test]
async fn should_inc_peer_multiaddr_failure_count_for_io_error_on_connect_failed() {
let (mut mgr, _conn_rx) = make_manager(1, 20);
Expand Down