Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the dialing priority system #907

Merged
merged 6 commits into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-core"
edition = "2018"
description = "Core traits and structs of libp2p"
version = "0.3.0"
version = "0.3.1"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down Expand Up @@ -30,9 +30,11 @@ unsigned-varint = "0.2"
void = "1"

[dev-dependencies]
env_logger = "0.6"
libp2p-ping = { version = "0.3.0", path = "../protocols/ping" }
libp2p-tcp = { version = "0.3.0", path = "../transports/tcp" }
libp2p-mplex = { version = "0.3.0", path = "../muxers/mplex" }
libp2p-secio = { version = "0.3.0", path = "../protocols/secio" }
rand = "0.6"
tokio = "0.1"
tokio-codec = "0.1"
Expand Down
119 changes: 84 additions & 35 deletions core/src/nodes/raw_swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::{
CollectionNodeAccept,
CollectionReachEvent,
CollectionStream,
PeerMut as CollecPeerMut,
ReachAttemptId
},
handled_node::{
Expand All @@ -48,6 +47,7 @@ use std::{
error,
fmt,
hash::Hash,
num::NonZeroUsize,
};

mod tests;
Expand Down Expand Up @@ -92,13 +92,15 @@ struct ReachAttempts<TPeerId> {
local_peer_id: TPeerId,

/// Attempts to reach a peer.
/// May contain nodes we are already connected to, because we don't cancel outgoing attempts.
out_reach_attempts: FnvHashMap<TPeerId, OutReachAttempt>,

/// Reach attempts for incoming connections, and outgoing connections for which we don't know
/// the peer ID.
other_reach_attempts: Vec<(ReachAttemptId, ConnectedPoint)>,

/// For each peer ID we're connected to, contains the endpoint we're connected to.
/// Always in sync with `active_nodes`.
connected_points: FnvHashMap<TPeerId, ConnectedPoint>,
}

Expand Down Expand Up @@ -201,10 +203,8 @@ where

/// Failed to reach a peer that we were trying to dial.
DialError {
/// Returns the number of multiaddresses that still need to be attempted. If this is
/// non-zero, then there's still a chance we can connect to this node. If this is zero,
/// then we have definitely failed.
remain_addrs_attempt: usize,
/// New state of a peer.
new_state: PeerState,

/// Id of the peer we were trying to dial.
peer_id: TPeerId,
Expand Down Expand Up @@ -293,9 +293,9 @@ where
.field("error", error)
.finish()
}
RawSwarmEvent::DialError { ref remain_addrs_attempt, ref peer_id, ref multiaddr, ref error } => {
RawSwarmEvent::DialError { ref new_state, ref peer_id, ref multiaddr, ref error } => {
f.debug_struct("DialError")
.field("remain_addrs_attempt", remain_addrs_attempt)
.field("new_state", new_state)
.field("peer_id", peer_id)
.field("multiaddr", multiaddr)
.field("error", error)
Expand Down Expand Up @@ -364,6 +364,20 @@ where
}
}

/// State of a peer.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PeerState {
/// We are connected to this peer.
Connected,
/// We are currently trying to reach this peer.
Dialing {
/// Number of addresses we are trying to dial.
num_pending_addresses: NonZeroUsize,
},
/// We are not connected to this peer.
NotConnected,
}

/// Error that can happen when trying to reach a node.
#[derive(Debug)]
pub enum RawSwarmReachError<TTransErr, TPeerId> {
Expand Down Expand Up @@ -839,19 +853,18 @@ where
// the borrow checker yells at us.

if self.active_nodes.peer_mut(&peer_id).is_some() {
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
return Peer::Connected(PeerConnected {
peer: self
.active_nodes
.peer_mut(&peer_id)
.expect("we checked for Some just above"),
active_nodes: &mut self.active_nodes,
peer_id,
connected_points: &mut self.reach_attempts.connected_points,
out_reach_attempts: &mut self.reach_attempts.out_reach_attempts,
});
}

// The state of `connected_points` always follows `self.active_nodes`.
debug_assert!(!self.reach_attempts.connected_points.contains_key(&peer_id));

if self.reach_attempts.out_reach_attempts.get_mut(&peer_id).is_some() {
debug_assert!(!self.reach_attempts.connected_points.contains_key(&peer_id));
return Peer::PendingConnect(PeerPendingConnect {
attempt: match self.reach_attempts.out_reach_attempts.entry(peer_id.clone()) {
Entry::Occupied(e) => e,
Expand All @@ -861,7 +874,6 @@ where
});
}

debug_assert!(!self.reach_attempts.connected_points.contains_key(&peer_id));
Peer::NotConnected(PeerNotConnected {
nodes: self,
peer_id,
Expand Down Expand Up @@ -993,7 +1005,6 @@ where
underlying API is guaranteed to always deliver a connection \
closed message after it has been opened, and no two closed \
messages; QED");
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = RawSwarmEvent::NodeError {
peer_id,
Expand All @@ -1008,7 +1019,6 @@ where
underlying API is guaranteed to always deliver a connection \
closed message after it has been opened, and no two closed \
messages; QED");
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint };
}
Expand Down Expand Up @@ -1082,10 +1092,11 @@ where
.position(|i| i.0 == event.reach_attempt_id())
{
let (_, opened_endpoint) = reach_attempts.other_reach_attempts.swap_remove(in_pos);
let has_dial_prio = has_dial_prio(&reach_attempts.local_peer_id, event.peer_id());

// If we already have an active connection to this peer, a priority system comes into play.
// If we have a lower peer ID than the incoming one, we drop an incoming connection.
if event.would_replace() && has_dial_prio(&reach_attempts.local_peer_id, event.peer_id()) {
if event.would_replace() && has_dial_prio {
if let Some(ConnectedPoint::Dialer { .. }) = reach_attempts.connected_points.get(event.peer_id()) {
if let ConnectedPoint::Listener { listen_addr, send_back_addr } = opened_endpoint {
return (Default::default(), RawSwarmEvent::IncomingConnectionError {
Expand All @@ -1100,15 +1111,26 @@ where
// Set the endpoint for this peer.
let closed_endpoint = reach_attempts.connected_points.insert(event.peer_id().clone(), opened_endpoint.clone());

// Cancel any outgoing attempt to this peer.
let action = if let Some(attempt) = reach_attempts.out_reach_attempts.remove(&event.peer_id()) {
debug_assert_ne!(attempt.id, event.reach_attempt_id());
ActionItem {
interrupt: Some(attempt.id),
.. Default::default()
// If we have dial priority, we keep the current outgoing attempt because it may already
// have succeeded without us knowing. It is possible that the remote has already closed
// its ougoing attempt because it sees our outgoing attempt as a success.
// However we cancel any further multiaddress to attempt in any situation.
let action = if has_dial_prio {
if let Some(attempt) = reach_attempts.out_reach_attempts.get_mut(&event.peer_id()) {
debug_assert_ne!(attempt.id, event.reach_attempt_id());
attempt.next_attempts.clear();
}
} else {
ActionItem::default()
} else {
if let Some(attempt) = reach_attempts.out_reach_attempts.remove(&event.peer_id()) {
debug_assert_ne!(attempt.id, event.reach_attempt_id());
ActionItem {
interrupt: Some(attempt.id),
.. Default::default()
}
} else {
ActionItem::default()
}
};

let (outcome, peer_id) = event.accept();
Expand Down Expand Up @@ -1217,6 +1239,17 @@ where
let num_remain = attempt.next_attempts.len();
let failed_addr = attempt.cur_attempted.clone();

let new_state = if reach_attempts.connected_points.contains_key(&peer_id) {
PeerState::Connected
} else if num_remain == 0 {
PeerState::NotConnected
} else {
PeerState::Dialing {
num_pending_addresses: NonZeroUsize::new(num_remain)
.expect("We check that num_remain is not 0 right above; QED"),
}
};

let action = if !attempt.next_attempts.is_empty() {
let mut attempt = attempt;
let next_attempt = attempt.next_attempts.remove(0);
Expand All @@ -1240,7 +1273,7 @@ where
};

return (action, RawSwarmEvent::DialError {
remain_addrs_attempt: num_remain,
new_state,
peer_id,
multiaddr: failed_addr,
error,
Expand Down Expand Up @@ -1304,7 +1337,7 @@ where
TTrans: Transport,
{
/// We are connected to this peer.
Connected(PeerConnected<'a, TInEvent, TPeerId>),
Connected(PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),

/// We are currently attempting to connect to this peer.
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
Expand Down Expand Up @@ -1371,7 +1404,7 @@ where
{
/// If we are connected, returns the `PeerConnected`.
#[inline]
pub fn into_connected(self) -> Option<PeerConnected<'a, TInEvent, TPeerId>> {
pub fn into_connected(self) -> Option<PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>> {
match self {
Peer::Connected(peer) => Some(peer),
_ => None,
Expand Down Expand Up @@ -1440,7 +1473,7 @@ where
TTrans: Transport
{
/// We are connected to this peer.
Connected(PeerConnected<'a, TInEvent, TPeerId>),
Connected(PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),

/// We are currently attempting to connect to this peer.
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
Expand All @@ -1464,7 +1497,7 @@ where

/// If we are connected, returns the `PeerConnected`.
#[inline]
pub fn into_connected(self) -> Option<PeerConnected<'a, TInEvent, TPeerId>> {
pub fn into_connected(self) -> Option<PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>> {
match self {
PeerPotentialConnect::Connected(peer) => Some(peer),
_ => None,
Expand All @@ -1482,25 +1515,39 @@ where
}

/// Access to a peer we are connected to.
pub struct PeerConnected<'a, TInEvent: 'a, TPeerId: 'a> {
peer: CollecPeerMut<'a, TInEvent, TPeerId>,
pub struct PeerConnected<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a>
where TTrans: Transport,
{
/// Reference to the `active_nodes` of the parent.
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, TPeerId>,
/// Reference to the `connected_points` field of the parent.
connected_points: &'a mut FnvHashMap<TPeerId, ConnectedPoint>,
/// Reference to the `out_reach_attempts` field of the parent.
out_reach_attempts: &'a mut FnvHashMap<TPeerId, OutReachAttempt>,
peer_id: TPeerId,
}

impl<'a, TInEvent, TPeerId> PeerConnected<'a, TInEvent, TPeerId>
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
where
TPeerId: Eq + Hash,
TTrans: Transport,
TPeerId: Eq + Hash + Clone,
{
/// Closes the connection to this node.
///
/// No `NodeClosed` message will be generated for this node.
// TODO: consider returning a `PeerNotConnected`; however this makes all the borrows things
// much more annoying to deal with
pub fn close(self) {
if let Some(reach_attempt) = self.out_reach_attempts.remove(&self.peer_id) {
self.active_nodes
.interrupt(reach_attempt.id)
.expect("Elements in out_reach_attempts are in sync with active_nodes; QED");
}

self.connected_points.remove(&self.peer_id);
self.peer.close()
self.active_nodes.peer_mut(&self.peer_id)
.expect("A PeerConnected is always created with a PeerId in active_nodes; QED")
.close();
}

/// Returns the endpoint we're connected to.
Expand All @@ -1516,7 +1563,9 @@ where
/// Sends an event to the node.
#[inline]
pub fn send_event(&mut self, event: TInEvent) {
self.peer.send_event(event)
self.active_nodes.peer_mut(&self.peer_id)
.expect("A PeerConnected is always created with a PeerId in active_nodes; QED")
.send_event(event)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/nodes/raw_swarm/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ fn known_peer_that_is_unreachable_yields_dial_error() {
Async::Ready(event) => {
let failed_peer_id = assert_matches!(
event,
RawSwarmEvent::DialError { remain_addrs_attempt: _, peer_id: failed_peer_id, .. } => failed_peer_id
RawSwarmEvent::DialError { new_state: _, peer_id: failed_peer_id, .. } => failed_peer_id
);
assert_eq!(peer_id, failed_peer_id);
Ok(Async::Ready(false))
Expand Down
Loading