Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(comms): improve simultaneous connection handling #3697

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion comms/dht/examples/memory_net/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ fn connection_manager_logger(
println!("'{}' connected to '{}'", node_name, get_name(conn.peer_node_id()),);
},
},
PeerDisconnected(node_id) => {
PeerDisconnected(_, node_id) => {
println!("'{}' disconnected from '{}'", get_name(node_id), node_name);
},
PeerConnectFailed(node_id, err) => {
Expand Down
31 changes: 30 additions & 1 deletion comms/dht/src/connectivity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl DhtConnectivity {
);
debug!(
target: LOG_TARGET,
"Adding {} peer(s) to connectivity manager: {}",
"Adding {} peer(s) to DHT connectivity manager: {}",
new_neighbours.len(),
new_neighbours
.iter()
Expand All @@ -350,6 +350,33 @@ impl DhtConnectivity {
self.connectivity.request_many_dials(new_neighbours).await?;
}

self.redial_neighbours_as_required().await?;

Ok(())
}

async fn redial_neighbours_as_required(&mut self) -> Result<(), DhtConnectivityError> {
let disconnected = self
.connection_handles
.iter()
.filter(|c| !c.is_connected())
.collect::<Vec<_>>();
let to_redial = self
.neighbours
.iter()
.filter(|n| disconnected.iter().any(|c| c.peer_node_id() == *n))
.cloned()
.collect::<Vec<_>>();

if !to_redial.is_empty() {
debug!(
target: LOG_TARGET,
"Redialling {} disconnected peer(s)",
to_redial.len()
);
self.connectivity.request_many_dials(to_redial).await?;
}

Ok(())
}

Expand Down Expand Up @@ -478,6 +505,7 @@ impl DhtConnectivity {
self.handle_new_peer_connected(conn).await?;
},
PeerConnectFailed(node_id) => {
self.connection_handles.retain(|c| *c.peer_node_id() != node_id);
if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() {
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -518,6 +546,7 @@ impl DhtConnectivity {
self.log_status();
},
PeerDisconnected(node_id) => {
self.connection_handles.retain(|c| *c.peer_node_id() != node_id);
if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() {
debug!(
target: LOG_TARGET,
Expand Down
7 changes: 6 additions & 1 deletion comms/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ async fn peer_to_peer_messaging() {

#[runtime::test]
async fn peer_to_peer_messaging_simultaneous() {
const NUM_MSGS: usize = 10;
const NUM_MSGS: usize = 100;
let shutdown = Shutdown::new();

let (comms_node1, mut inbound_rx1, outbound_tx1, _) = spawn_node(Protocols::new(), shutdown.to_signal()).await;
Expand Down Expand Up @@ -324,6 +324,11 @@ async fn peer_to_peer_messaging_simultaneous() {
.await
.unwrap();

comms_node1
.connectivity()
.dial_peer(comms_node2.node_identity().node_id().clone())
.await
.unwrap();
// Simultaneously send messages between the two nodes
let handle1 = task::spawn(async move {
for i in 0..NUM_MSGS {
Expand Down
6 changes: 3 additions & 3 deletions comms/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use super::{
};
use crate::{
backoff::Backoff,
connection_manager::{metrics, ConnectionDirection},
connection_manager::{metrics, ConnectionDirection, ConnectionId},
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeId, NodeIdentity, PeerManagerError},
Expand All @@ -61,7 +61,7 @@ const DIALER_REQUEST_CHANNEL_SIZE: usize = 32;
pub enum ConnectionManagerEvent {
// Peer connection
PeerConnected(PeerConnection),
PeerDisconnected(NodeId),
PeerDisconnected(ConnectionId, NodeId),
PeerConnectFailed(NodeId, ConnectionManagerError),
PeerInboundConnectFailed(ConnectionManagerError),

Expand All @@ -74,7 +74,7 @@ impl fmt::Display for ConnectionManagerEvent {
use ConnectionManagerEvent::*;
match self {
PeerConnected(conn) => write!(f, "PeerConnected({})", conn),
PeerDisconnected(node_id) => write!(f, "PeerDisconnected({})", node_id.short_str()),
PeerDisconnected(id, node_id) => write!(f, "PeerDisconnected({}, {})", id, node_id.short_str()),
PeerConnectFailed(node_id, err) => write!(f, "PeerConnectFailed({}, {:?})", node_id.short_str(), err),
PeerInboundConnectFailed(err) => write!(f, "PeerInboundConnectFailed({:?})", err),
NewInboundSubstream(node_id, protocol, _) => write!(
Expand Down
25 changes: 21 additions & 4 deletions comms/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,29 @@ impl PeerConnectionActor {
///
/// silent - true to suppress the PeerDisconnected event, false to publish the event
async fn disconnect(&mut self, silent: bool) -> Result<(), PeerConnectionError> {
if !silent {
self.notify_event(ConnectionManagerEvent::PeerDisconnected(self.peer_node_id.clone()))
.await;
match self.control.close().await {
Err(yamux::ConnectionError::Closed) => {
debug!(
target: LOG_TARGET,
"(Peer = {}) Connection already closed",
self.peer_node_id.short_str()
);

return Ok(());
},
// Only emit closed event once
_ => {
if !silent {
self.notify_event(ConnectionManagerEvent::PeerDisconnected(
self.id,
self.peer_node_id.clone(),
))
.await;
}
},
}

self.control.close().await?;
self.request_rx.close();

debug!(
target: LOG_TARGET,
Expand Down
4 changes: 1 addition & 3 deletions comms/src/connectivity/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@ impl ConnectionPool {
}

pub fn get_inactive_connections_mut(&mut self, min_age: Duration) -> Vec<&mut PeerConnection> {
self.filter_connections_mut(|conn| {
conn.age() > min_age && conn.substream_count() == 0 && conn.handle_count() <= 1
})
self.filter_connections_mut(|conn| conn.age() > min_age && conn.handle_count() <= 1)
}

pub(in crate::connectivity) fn filter_drain<P>(&mut self, mut predicate: P) -> Vec<PeerConnectionState>
Expand Down
99 changes: 46 additions & 53 deletions comms/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,73 +490,79 @@ impl ConnectivityManagerActor {
event: &ConnectionManagerEvent,
) -> Result<(), ConnectivityError> {
use ConnectionManagerEvent::*;
#[allow(clippy::single_match)]
debug!(target: LOG_TARGET, "Received event: {}", event);
match event {
PeerConnected(new_conn) => {
// self.connection_manager
// .cancel_dial(new_conn.peer_node_id().clone())
// .await?;

match self.pool.get_connection(new_conn.peer_node_id()) {
match self.pool.get_connection(new_conn.peer_node_id()).cloned() {
Some(existing_conn) if !existing_conn.is_connected() => {
debug!(
target: LOG_TARGET,
"Tie break: Existing connection was not connected, resolving tie break by using the new \
connection. (New={}, Existing={})",
new_conn,
existing_conn,
);
},
Some(existing_conn) if existing_conn.age() >= Duration::from_secs(60) => {
debug!(
target: LOG_TARGET,
"Tie break: Existing connection is reported as connected however the authenticated peer \
is still attempting to connect to us. Resolving tie break by using the new connection. \
(New={}, Existing={})",
new_conn,
existing_conn,
"Tie break: Existing connection (id: {}, peer: {}, direction: {}) was not connected, \
resolving tie break by using the new connection. (New: id: {}, peer: {}, direction: {})",
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
);
let node_id = existing_conn.peer_node_id().clone();
let direction = existing_conn.direction();
delayed_close(existing_conn.clone(), self.config.connection_tie_break_linger);
self.publish_event(ConnectivityEvent::PeerConnectionWillClose(node_id, direction));
self.pool.remove(existing_conn.peer_node_id());
},
Some(existing_conn) if self.tie_break_existing_connection(existing_conn, new_conn) => {
Some(mut existing_conn) if self.tie_break_existing_connection(&existing_conn, new_conn) => {
debug!(
target: LOG_TARGET,
"Tie break: (Peer = {}) Keep new {} connection, Disconnect existing {} connection",
new_conn.peer_node_id().short_str(),
"Tie break: Keep new connection (id: {}, peer: {}, direction: {}). Disconnect existing \
connection (id: {}, peer: {}, direction: {})",
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
existing_conn.direction()
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
);

let node_id = existing_conn.peer_node_id().clone();
let direction = existing_conn.direction();
delayed_close(existing_conn.clone(), self.config.connection_tie_break_linger);
self.publish_event(ConnectivityEvent::PeerConnectionWillClose(node_id, direction));
let _ = existing_conn.disconnect_silent().await;
self.pool.remove(existing_conn.peer_node_id());
},
Some(existing_conn) => {
debug!(
target: LOG_TARGET,
"Tie break: (Peer = {}) Keeping existing {} connection, Disconnecting new {} connection",
existing_conn.peer_node_id().short_str(),
existing_conn.direction(),
"Tie break: Keeping existing connection (id: {}, peer: {}, direction: {}). Disconnecting \
new connection (id: {}, peer: {}, direction: {})",
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
);

delayed_close(new_conn.clone(), self.config.connection_tie_break_linger);
let _ = new_conn.clone().disconnect_silent().await;
// Ignore this event - state can stay as is
return Ok(());
},

_ => {},
}
},

PeerDisconnected(id, node_id) => {
if let Some(conn) = self.pool.get_connection(node_id) {
if conn.id() != *id {
debug!(
target: LOG_TARGET,
"Ignoring peer disconnected event for stale peer connection (id: {}) for peer '{}'",
id,
node_id
);
return Ok(());
}
}
},
_ => {},
}

let (node_id, mut new_status, connection) = match event {
PeerDisconnected(node_id) => (&*node_id, ConnectionStatus::Disconnected, None),
PeerDisconnected(_, node_id) => (&*node_id, ConnectionStatus::Disconnected, None),
PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())),

PeerConnectFailed(node_id, ConnectionManagerError::DialCancelled) => {
Expand Down Expand Up @@ -649,8 +655,8 @@ impl ConnectivityManagerActor {
(Inbound, Outbound) => peer_node_id > our_node_id,
// We connected to them at the same time as they connected to us
(Outbound, Inbound) => our_node_id > peer_node_id,
// We connected to them twice for some reason. Drop the newer connection.
(Outbound, Outbound) => false,
// We connected to them twice for some reason. Drop the older connection.
(Outbound, Outbound) => true,
}
}

Expand Down Expand Up @@ -818,16 +824,3 @@ impl ConnectivityManagerActor {
}
}
}

fn delayed_close(conn: PeerConnection, delay: Duration) {
task::spawn(async move {
time::sleep(delay).await;
debug!(
target: LOG_TARGET,
"Closing connection from peer `{}` after delay",
conn.peer_node_id()
);
// Can ignore the error here, the error is already logged by peer connection
let _ = conn.clone().disconnect_silent().await;
});
}
10 changes: 1 addition & 9 deletions comms/src/connectivity/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ use super::{
manager::ConnectivityStatus,
ConnectivitySelection,
};
use crate::{
connection_manager::{ConnectionDirection, ConnectionManagerError},
peer_manager::NodeId,
PeerConnection,
};
use crate::{connection_manager::ConnectionManagerError, peer_manager::NodeId, PeerConnection};

const LOG_TARGET: &str = "comms::connectivity::requester";

Expand All @@ -57,7 +53,6 @@ pub enum ConnectivityEvent {
PeerConnectFailed(NodeId),
PeerBanned(NodeId),
PeerOffline(NodeId),
PeerConnectionWillClose(NodeId, ConnectionDirection),

ConnectivityStateInitialized,
ConnectivityStateOnline(usize),
Expand All @@ -74,9 +69,6 @@ impl fmt::Display for ConnectivityEvent {
PeerConnectFailed(node_id) => write!(f, "PeerConnectFailed({})", node_id),
PeerBanned(node_id) => write!(f, "PeerBanned({})", node_id),
PeerOffline(node_id) => write!(f, "PeerOffline({})", node_id),
PeerConnectionWillClose(node_id, direction) => {
write!(f, "PeerConnectionWillClose({}, {})", node_id, direction)
},
ConnectivityStateInitialized => write!(f, "ConnectivityStateInitialized"),
ConnectivityStateOnline(n) => write!(f, "ConnectivityStateOnline({})", n),
ConnectivityStateDegraded(n) => write!(f, "ConnectivityStateDegraded({})", n),
Expand Down
16 changes: 13 additions & 3 deletions comms/src/connectivity/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ async fn online_then_offline() {
));

for conn in connections.iter().skip(1) {
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(conn.peer_node_id().clone()));
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(
conn.id(),
conn.peer_node_id().clone(),
));
}

streams::assert_in_broadcast(
Expand All @@ -218,7 +221,10 @@ async fn online_then_offline() {

// Disconnect client connections
for conn in &client_connections {
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(conn.peer_node_id().clone()));
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(
conn.id(),
conn.peer_node_id().clone(),
));
}

streams::assert_in_broadcast(
Expand Down Expand Up @@ -389,7 +395,10 @@ async fn pool_management() {
assert_eq!(conn.handle_count(), 2);
// The peer connection mock does not "automatically" publish event to connectivity manager
conn.disconnect().await.unwrap();
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(conn.peer_node_id().clone()));
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(
conn.id(),
conn.peer_node_id().clone(),
));
}
}

Expand All @@ -407,6 +416,7 @@ async fn pool_management() {
assert_eq!(conns.len(), 1);
important_connection.disconnect().await.unwrap();
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(
important_connection.id(),
important_connection.peer_node_id().clone(),
));
drop(important_connection);
Expand Down
Loading