Skip to content

Commit

Permalink
fix(comms): improve simultaneous connection handling (tari-project#3697)
Browse files Browse the repository at this point in the history
Description
---

- Handle case where connection is disconnected due to simultaneous dial can cause new connection to be removed from connectivity state
- Remove messaging protocol timeouts, messaging protocol will end correctly when disconnected (ref tari-project#3664)
- Remove condition that connection should have 0 substreams for reaping. A connection may only be reaped if it's age is >= 20 minutes and there are 0 handles held for the connection
- Detect if remote outbound stream is closed, by reading from it (yamux requires reading to determine if the stream is still alive)
- Ensure disconnect event can only be emitted once per peer connection
- remove delayed connection close and "will close" event 

Motivation and Context
---
Observed a case where two nodes simultaneously dial and end up with no connections to each other due to a mis-timed peer disconnected event. The connection would have to wait for DHT connectivity to eventually redial to recover. Inactivity timeouts for messaging were a patch for the outbound messaging staying open, but this is not properly detected and handled.

How Has This Been Tested?
---
Existing tests updated
Manually: two base nodes that previously "lost" their connection due to this bug
  • Loading branch information
sdbondi authored Jan 12, 2022
1 parent 75ad348 commit 99ba6a3
Show file tree
Hide file tree
Showing 17 changed files with 178 additions and 293 deletions.
2 changes: 1 addition & 1 deletion comms/dht/examples/memory_net/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ fn connection_manager_logger(
println!("'{}' connected to '{}'", node_name, get_name(conn.peer_node_id()),);
},
},
PeerDisconnected(node_id) => {
PeerDisconnected(_, node_id) => {
println!("'{}' disconnected from '{}'", get_name(node_id), node_name);
},
PeerConnectFailed(node_id, err) => {
Expand Down
31 changes: 30 additions & 1 deletion comms/dht/src/connectivity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl DhtConnectivity {
);
debug!(
target: LOG_TARGET,
"Adding {} peer(s) to connectivity manager: {}",
"Adding {} peer(s) to DHT connectivity manager: {}",
new_neighbours.len(),
new_neighbours
.iter()
Expand All @@ -350,6 +350,33 @@ impl DhtConnectivity {
self.connectivity.request_many_dials(new_neighbours).await?;
}

self.redial_neighbours_as_required().await?;

Ok(())
}

async fn redial_neighbours_as_required(&mut self) -> Result<(), DhtConnectivityError> {
let disconnected = self
.connection_handles
.iter()
.filter(|c| !c.is_connected())
.collect::<Vec<_>>();
let to_redial = self
.neighbours
.iter()
.filter(|n| disconnected.iter().any(|c| c.peer_node_id() == *n))
.cloned()
.collect::<Vec<_>>();

if !to_redial.is_empty() {
debug!(
target: LOG_TARGET,
"Redialling {} disconnected peer(s)",
to_redial.len()
);
self.connectivity.request_many_dials(to_redial).await?;
}

Ok(())
}

Expand Down Expand Up @@ -478,6 +505,7 @@ impl DhtConnectivity {
self.handle_new_peer_connected(conn).await?;
},
PeerConnectFailed(node_id) => {
self.connection_handles.retain(|c| *c.peer_node_id() != node_id);
if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() {
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -518,6 +546,7 @@ impl DhtConnectivity {
self.log_status();
},
PeerDisconnected(node_id) => {
self.connection_handles.retain(|c| *c.peer_node_id() != node_id);
if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() {
debug!(
target: LOG_TARGET,
Expand Down
7 changes: 6 additions & 1 deletion comms/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ async fn peer_to_peer_messaging() {

#[runtime::test]
async fn peer_to_peer_messaging_simultaneous() {
const NUM_MSGS: usize = 10;
const NUM_MSGS: usize = 100;
let shutdown = Shutdown::new();

let (comms_node1, mut inbound_rx1, outbound_tx1, _) = spawn_node(Protocols::new(), shutdown.to_signal()).await;
Expand Down Expand Up @@ -324,6 +324,11 @@ async fn peer_to_peer_messaging_simultaneous() {
.await
.unwrap();

comms_node1
.connectivity()
.dial_peer(comms_node2.node_identity().node_id().clone())
.await
.unwrap();
// Simultaneously send messages between the two nodes
let handle1 = task::spawn(async move {
for i in 0..NUM_MSGS {
Expand Down
6 changes: 3 additions & 3 deletions comms/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use super::{
};
use crate::{
backoff::Backoff,
connection_manager::{metrics, ConnectionDirection},
connection_manager::{metrics, ConnectionDirection, ConnectionId},
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeId, NodeIdentity, PeerManagerError},
Expand All @@ -61,7 +61,7 @@ const DIALER_REQUEST_CHANNEL_SIZE: usize = 32;
pub enum ConnectionManagerEvent {
// Peer connection
PeerConnected(PeerConnection),
PeerDisconnected(NodeId),
PeerDisconnected(ConnectionId, NodeId),
PeerConnectFailed(NodeId, ConnectionManagerError),
PeerInboundConnectFailed(ConnectionManagerError),

Expand All @@ -74,7 +74,7 @@ impl fmt::Display for ConnectionManagerEvent {
use ConnectionManagerEvent::*;
match self {
PeerConnected(conn) => write!(f, "PeerConnected({})", conn),
PeerDisconnected(node_id) => write!(f, "PeerDisconnected({})", node_id.short_str()),
PeerDisconnected(id, node_id) => write!(f, "PeerDisconnected({}, {})", id, node_id.short_str()),
PeerConnectFailed(node_id, err) => write!(f, "PeerConnectFailed({}, {:?})", node_id.short_str(), err),
PeerInboundConnectFailed(err) => write!(f, "PeerInboundConnectFailed({:?})", err),
NewInboundSubstream(node_id, protocol, _) => write!(
Expand Down
25 changes: 21 additions & 4 deletions comms/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,29 @@ impl PeerConnectionActor {
///
/// silent - true to suppress the PeerDisconnected event, false to publish the event
async fn disconnect(&mut self, silent: bool) -> Result<(), PeerConnectionError> {
if !silent {
self.notify_event(ConnectionManagerEvent::PeerDisconnected(self.peer_node_id.clone()))
.await;
match self.control.close().await {
Err(yamux::ConnectionError::Closed) => {
debug!(
target: LOG_TARGET,
"(Peer = {}) Connection already closed",
self.peer_node_id.short_str()
);

return Ok(());
},
// Only emit closed event once
_ => {
if !silent {
self.notify_event(ConnectionManagerEvent::PeerDisconnected(
self.id,
self.peer_node_id.clone(),
))
.await;
}
},
}

self.control.close().await?;
self.request_rx.close();

debug!(
target: LOG_TARGET,
Expand Down
4 changes: 1 addition & 3 deletions comms/src/connectivity/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@ impl ConnectionPool {
}

pub fn get_inactive_connections_mut(&mut self, min_age: Duration) -> Vec<&mut PeerConnection> {
self.filter_connections_mut(|conn| {
conn.age() > min_age && conn.substream_count() == 0 && conn.handle_count() <= 1
})
self.filter_connections_mut(|conn| conn.age() > min_age && conn.handle_count() <= 1)
}

pub(in crate::connectivity) fn filter_drain<P>(&mut self, mut predicate: P) -> Vec<PeerConnectionState>
Expand Down
99 changes: 46 additions & 53 deletions comms/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,73 +490,79 @@ impl ConnectivityManagerActor {
event: &ConnectionManagerEvent,
) -> Result<(), ConnectivityError> {
use ConnectionManagerEvent::*;
#[allow(clippy::single_match)]
debug!(target: LOG_TARGET, "Received event: {}", event);
match event {
PeerConnected(new_conn) => {
// self.connection_manager
// .cancel_dial(new_conn.peer_node_id().clone())
// .await?;

match self.pool.get_connection(new_conn.peer_node_id()) {
match self.pool.get_connection(new_conn.peer_node_id()).cloned() {
Some(existing_conn) if !existing_conn.is_connected() => {
debug!(
target: LOG_TARGET,
"Tie break: Existing connection was not connected, resolving tie break by using the new \
connection. (New={}, Existing={})",
new_conn,
existing_conn,
);
},
Some(existing_conn) if existing_conn.age() >= Duration::from_secs(60) => {
debug!(
target: LOG_TARGET,
"Tie break: Existing connection is reported as connected however the authenticated peer \
is still attempting to connect to us. Resolving tie break by using the new connection. \
(New={}, Existing={})",
new_conn,
existing_conn,
"Tie break: Existing connection (id: {}, peer: {}, direction: {}) was not connected, \
resolving tie break by using the new connection. (New: id: {}, peer: {}, direction: {})",
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
);
let node_id = existing_conn.peer_node_id().clone();
let direction = existing_conn.direction();
delayed_close(existing_conn.clone(), self.config.connection_tie_break_linger);
self.publish_event(ConnectivityEvent::PeerConnectionWillClose(node_id, direction));
self.pool.remove(existing_conn.peer_node_id());
},
Some(existing_conn) if self.tie_break_existing_connection(existing_conn, new_conn) => {
Some(mut existing_conn) if self.tie_break_existing_connection(&existing_conn, new_conn) => {
debug!(
target: LOG_TARGET,
"Tie break: (Peer = {}) Keep new {} connection, Disconnect existing {} connection",
new_conn.peer_node_id().short_str(),
"Tie break: Keep new connection (id: {}, peer: {}, direction: {}). Disconnect existing \
connection (id: {}, peer: {}, direction: {})",
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
existing_conn.direction()
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
);

let node_id = existing_conn.peer_node_id().clone();
let direction = existing_conn.direction();
delayed_close(existing_conn.clone(), self.config.connection_tie_break_linger);
self.publish_event(ConnectivityEvent::PeerConnectionWillClose(node_id, direction));
let _ = existing_conn.disconnect_silent().await;
self.pool.remove(existing_conn.peer_node_id());
},
Some(existing_conn) => {
debug!(
target: LOG_TARGET,
"Tie break: (Peer = {}) Keeping existing {} connection, Disconnecting new {} connection",
existing_conn.peer_node_id().short_str(),
existing_conn.direction(),
"Tie break: Keeping existing connection (id: {}, peer: {}, direction: {}). Disconnecting \
new connection (id: {}, peer: {}, direction: {})",
new_conn.id(),
new_conn.peer_node_id(),
new_conn.direction(),
existing_conn.id(),
existing_conn.peer_node_id(),
existing_conn.direction(),
);

delayed_close(new_conn.clone(), self.config.connection_tie_break_linger);
let _ = new_conn.clone().disconnect_silent().await;
// Ignore this event - state can stay as is
return Ok(());
},

_ => {},
}
},

PeerDisconnected(id, node_id) => {
if let Some(conn) = self.pool.get_connection(node_id) {
if conn.id() != *id {
debug!(
target: LOG_TARGET,
"Ignoring peer disconnected event for stale peer connection (id: {}) for peer '{}'",
id,
node_id
);
return Ok(());
}
}
},
_ => {},
}

let (node_id, mut new_status, connection) = match event {
PeerDisconnected(node_id) => (&*node_id, ConnectionStatus::Disconnected, None),
PeerDisconnected(_, node_id) => (&*node_id, ConnectionStatus::Disconnected, None),
PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())),

PeerConnectFailed(node_id, ConnectionManagerError::DialCancelled) => {
Expand Down Expand Up @@ -649,8 +655,8 @@ impl ConnectivityManagerActor {
(Inbound, Outbound) => peer_node_id > our_node_id,
// We connected to them at the same time as they connected to us
(Outbound, Inbound) => our_node_id > peer_node_id,
// We connected to them twice for some reason. Drop the newer connection.
(Outbound, Outbound) => false,
// We connected to them twice for some reason. Drop the older connection.
(Outbound, Outbound) => true,
}
}

Expand Down Expand Up @@ -818,16 +824,3 @@ impl ConnectivityManagerActor {
}
}
}

fn delayed_close(conn: PeerConnection, delay: Duration) {
task::spawn(async move {
time::sleep(delay).await;
debug!(
target: LOG_TARGET,
"Closing connection from peer `{}` after delay",
conn.peer_node_id()
);
// Can ignore the error here, the error is already logged by peer connection
let _ = conn.clone().disconnect_silent().await;
});
}
10 changes: 1 addition & 9 deletions comms/src/connectivity/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ use super::{
manager::ConnectivityStatus,
ConnectivitySelection,
};
use crate::{
connection_manager::{ConnectionDirection, ConnectionManagerError},
peer_manager::NodeId,
PeerConnection,
};
use crate::{connection_manager::ConnectionManagerError, peer_manager::NodeId, PeerConnection};

const LOG_TARGET: &str = "comms::connectivity::requester";

Expand All @@ -57,7 +53,6 @@ pub enum ConnectivityEvent {
PeerConnectFailed(NodeId),
PeerBanned(NodeId),
PeerOffline(NodeId),
PeerConnectionWillClose(NodeId, ConnectionDirection),

ConnectivityStateInitialized,
ConnectivityStateOnline(usize),
Expand All @@ -74,9 +69,6 @@ impl fmt::Display for ConnectivityEvent {
PeerConnectFailed(node_id) => write!(f, "PeerConnectFailed({})", node_id),
PeerBanned(node_id) => write!(f, "PeerBanned({})", node_id),
PeerOffline(node_id) => write!(f, "PeerOffline({})", node_id),
PeerConnectionWillClose(node_id, direction) => {
write!(f, "PeerConnectionWillClose({}, {})", node_id, direction)
},
ConnectivityStateInitialized => write!(f, "ConnectivityStateInitialized"),
ConnectivityStateOnline(n) => write!(f, "ConnectivityStateOnline({})", n),
ConnectivityStateDegraded(n) => write!(f, "ConnectivityStateDegraded({})", n),
Expand Down
16 changes: 13 additions & 3 deletions comms/src/connectivity/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ async fn online_then_offline() {
));

for conn in connections.iter().skip(1) {
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(conn.peer_node_id().clone()));
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(
conn.id(),
conn.peer_node_id().clone(),
));
}

streams::assert_in_broadcast(
Expand All @@ -218,7 +221,10 @@ async fn online_then_offline() {

// Disconnect client connections
for conn in &client_connections {
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(conn.peer_node_id().clone()));
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(
conn.id(),
conn.peer_node_id().clone(),
));
}

streams::assert_in_broadcast(
Expand Down Expand Up @@ -389,7 +395,10 @@ async fn pool_management() {
assert_eq!(conn.handle_count(), 2);
// The peer connection mock does not "automatically" publish event to connectivity manager
conn.disconnect().await.unwrap();
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(conn.peer_node_id().clone()));
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(
conn.id(),
conn.peer_node_id().clone(),
));
}
}

Expand All @@ -407,6 +416,7 @@ async fn pool_management() {
assert_eq!(conns.len(), 1);
important_connection.disconnect().await.unwrap();
cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected(
important_connection.id(),
important_connection.peer_node_id().clone(),
));
drop(important_connection);
Expand Down
Loading

0 comments on commit 99ba6a3

Please sign in to comment.