From ada6946458fea658e9e9e9cc012a973bd0700529 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Sun, 1 Mar 2020 18:29:14 +0100 Subject: [PATCH] Use a different approach. --- .../src/protocol/generic_proto/behaviour.rs | 450 ++++++++---------- .../protocol/generic_proto/handler/group.rs | 30 +- .../protocol/generic_proto/handler/legacy.rs | 24 +- 3 files changed, 225 insertions(+), 279 deletions(-) diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 27f1b73bf2b77..f72ab4d98b864 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -74,21 +74,25 @@ use wasm_timer::Instant; /// manager. In other words, the peerset manager doesn't differentiate whether we are dialing a /// node or connected to it. /// -/// For each peer, one connection is designated as the "primary connection". If there are -/// multiple connections to a peer, which may currently practically be at most two and only -/// as a result of "simultaneous" dialing, the enabled/disabled status of all secondary connections -/// follows the status of the primary connection and hence maintain a unified status that -/// is in sync with the peerset manager. Furtheremore, if a secondary connection fails -/// due to an error, it has no impact on the current state of the peer on the external -/// API. Secondary connections can thus serve traffic but do not affect the status of -/// a peer w.r.t. this behaviour and the associated peerset manager. +/// There may be multiple connections to a peer. However, the status of a peer on +/// the API of this behaviour and towards the peerset manager is aggregated in +/// the following way: /// -/// Additionally, there also exists a "banning" system. If we fail to dial a node, we "ban" it for -/// a few seconds. If the PSM requests a node that is in the "banned" state, then we delay the -/// actual dialing attempt until after the ban expires, but the PSM will still consider the link -/// to be established. Note that this "banning" system is not an actual ban. If a "banned" node -/// tries to connect to us, we accept the connection. The "banning" system is only about delaying -/// dialing attempts. +/// 1. The enabled/disabled status is the same across all connections, as +/// decided by the peerset manager. +/// 2. The behaviour reports `GenericProtoOut::CustomProtocolOpen` when the +/// first connection reports `NotifsHandlerOut::Open`. +/// 3. The behaviour reports `GenericProtoOut::CustomProtocolClosed` when the +/// last connection reports `NotifsHandlerOut::Closed`. +/// +/// In this way, the number of actual established connections to the peer is +/// an implementation detail of this behaviour. +/// +/// Additionally, there also exists a "banning" system. If we fail to dial a peer, we "ban" it for +/// a few seconds. If the PSM requests connecting to a peer that is currently "banned", the next +/// dialing attempt is delayed until after the ban expires. However, the PSM will still consider +/// the peer to be connected. This "ban" is thus not a ban in a strict sense: If a "banned" peer +/// tries to connect, the connection is accepted. A ban only delays dialing attempts. /// pub struct GenericProto { /// Legacy protocol to open with peers. Never modified. @@ -145,14 +149,8 @@ enum PeerState { /// /// We may still have ongoing requests to that peer, but it should cease shortly. Disabled { - /// The designated primary (usually the only) connection. - connection: Option, - /// How we are connected to this peer. - connected_point: ConnectedPoint, - /// If true, we still have a custom protocol open with it. It will likely get closed in - /// a short amount of time, but we need to keep the information in order to not have a - /// state mismatch. - open: bool, + /// The connections that are currently open for custom protocol traffic. + open: FnvHashMap, /// If `Some`, the node is banned until the given `Instant`. banned_until: Option, }, @@ -161,14 +159,8 @@ enum PeerState { /// will be enabled when `timer` fires. This peer can still perform Kademlia queries and such, /// but should get disconnected in a few seconds. DisabledPendingEnable { - /// The designated primary (usually the only) connection. - connection: Option, - /// How we are connected to this peer. - connected_point: ConnectedPoint, - /// If true, we still have a custom protocol open with it. It will likely get closed in - /// a short amount of time, but we need to keep the information in order to not have a - /// state mismatch. - open: bool, + /// The connections that are currently open for custom protocol traffic. + open: FnvHashMap, /// When to enable this remote. timer: futures_timer::Delay, /// When the `timer` will trigger. @@ -178,22 +170,14 @@ enum PeerState { /// We are connected to this peer and the peerset has accepted it. The handler is in the /// enabled state. Enabled { - /// The designated primary (usually the only) connection. - connection: Option, - /// How we are connected to this peer. - connected_point: ConnectedPoint, - /// If true, we have a custom protocol open with this peer. - open: bool, + /// The connections that are currently open for custom protocol traffic. + open: FnvHashMap, }, /// We are connected to this peer, and we sent an incoming message to the peerset. The handler /// is in initialization mode. We are waiting for the Accept or Reject from the peerset. There /// is a corresponding entry in `incoming`. Incoming { - /// The designated primary (usually the only) connection. - connection: Option, - /// How we are connected to this peer. - connected_point: ConnectedPoint, }, } @@ -205,24 +189,12 @@ impl PeerState { PeerState::Banned { .. } => false, PeerState::PendingRequest { .. } => false, PeerState::Requested => false, - PeerState::Disabled { open, .. } => *open, - PeerState::DisabledPendingEnable { open, .. } => *open, - PeerState::Enabled { open, .. } => *open, + PeerState::Disabled { open, .. } => !open.is_empty(), + PeerState::DisabledPendingEnable { open, .. } => !open.is_empty(), + PeerState::Enabled { open, .. } => !open.is_empty(), PeerState::Incoming { .. } => false, } } - - /// The ID designated primary connection, if one is assigned - /// w.r.t. the current state of the peer. - fn connection(&self) -> &Option { - match self { - PeerState::Incoming { ref connection, .. } | - PeerState::Enabled { ref connection, .. } | - PeerState::Disabled { ref connection, .. } | - PeerState::DisabledPendingEnable { ref connection, .. } => connection, - _ => &None - } - } } /// State of an "incoming" message sent to the peer set manager. @@ -244,8 +216,6 @@ pub enum GenericProtoOut { CustomProtocolOpen { /// Id of the node we have opened a connection with. peer_id: PeerId, - /// Endpoint used for this custom protocol. - endpoint: ConnectedPoint, }, /// Closed a custom protocol with the remote. @@ -344,9 +314,7 @@ impl GenericProto { // DisabledPendingEnable => Disabled. PeerState::DisabledPendingEnable { open, - connected_point, timer_deadline, - connection, timer: _ } => { debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); @@ -358,14 +326,12 @@ impl GenericProto { }); *entry.into_mut() = PeerState::Disabled { open, - connection, - connected_point, banned_until } }, // Enabled => Disabled. - PeerState::Enabled { open, connected_point, connection } => { + PeerState::Enabled { open } => { debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); @@ -377,14 +343,12 @@ impl GenericProto { let banned_until = ban.map(|dur| Instant::now() + dur); *entry.into_mut() = PeerState::Disabled { open, - connection, - connected_point, banned_until } }, // Incoming => Disabled. - PeerState::Incoming { connected_point, connection } => { + PeerState::Incoming { } => { let inc = if let Some(inc) = self.incoming.iter_mut() .find(|i| i.peer_id == *entry.key() && i.alive) { inc @@ -403,9 +367,7 @@ impl GenericProto { }); let banned_until = ban.map(|dur| Instant::now() + dur); *entry.into_mut() = PeerState::Disabled { - open: false, - connection, - connected_point, + open: FnvHashMap::default(), banned_until } }, @@ -531,36 +493,32 @@ impl GenericProto { PeerState::Disabled { open, - connection, - ref connected_point, banned_until: Some(ref banned) } if *banned > now => { - debug!(target: "sub-libp2p", "PSM => Connect({:?}): Has idle connection through \ - {:?} but node is banned until {:?}", occ_entry.key(), connected_point, banned); + debug!(target: "sub-libp2p", "PSM => Connect({:?}): But node is banned until {:?}", + occ_entry.key(), banned); *occ_entry.into_mut() = PeerState::DisabledPendingEnable { - connected_point: connected_point.clone(), - connection, open, timer: futures_timer::Delay::new(banned.clone() - now), timer_deadline: banned.clone(), }; }, - PeerState::Disabled { open, connection, connected_point, banned_until: _ } => { - debug!(target: "sub-libp2p", "PSM => Connect({:?}): Enabling previously-idle \ - connection through {:?}", occ_entry.key(), connected_point); + PeerState::Disabled { open, banned_until: _ } => { + debug!(target: "sub-libp2p", "PSM => Connect({:?}): Enabling connections.", + occ_entry.key()); debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", occ_entry.key()); self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: occ_entry.key().clone(), handler: NotifyHandler::All, event: NotifsHandlerIn::Enable, }); - *occ_entry.into_mut() = PeerState::Enabled { connection, connected_point, open }; + *occ_entry.into_mut() = PeerState::Enabled { open }; }, - PeerState::Incoming { connected_point, connection } => { - debug!(target: "sub-libp2p", "PSM => Connect({:?}): Enabling incoming \ - connection through {:?}", occ_entry.key(), connected_point); + PeerState::Incoming { } => { + debug!(target: "sub-libp2p", "PSM => Connect({:?}): Enabling connections.", + occ_entry.key()); if let Some(inc) = self.incoming.iter_mut() .find(|i| i.peer_id == *occ_entry.key() && i.alive) { inc.alive = false; @@ -575,25 +533,26 @@ impl GenericProto { event: NotifsHandlerIn::Enable, }); *occ_entry.into_mut() = PeerState::Enabled { - connection, - connected_point, - open: false + open: FnvHashMap::default() }; }, st @ PeerState::Enabled { .. } => { - warn!(target: "sub-libp2p", "PSM => Connect({:?}): Already connected to this \ - peer", occ_entry.key()); + warn!(target: "sub-libp2p", + "PSM => Connect({:?}): Already connected.", + occ_entry.key()); *occ_entry.into_mut() = st; }, st @ PeerState::DisabledPendingEnable { .. } => { - warn!(target: "sub-libp2p", "PSM => Connect({:?}): Already have an idle \ - connection to this peer and waiting to enable it", occ_entry.key()); + warn!(target: "sub-libp2p", + "PSM => Connect({:?}): Already pending enabling.", + occ_entry.key()); *occ_entry.into_mut() = st; }, st @ PeerState::Requested { .. } | st @ PeerState::PendingRequest { .. } => { - warn!(target: "sub-libp2p", "PSM => Connect({:?}): Received a previous \ - request for that peer", occ_entry.key()); + warn!(target: "sub-libp2p", + "PSM => Connect({:?}): Duplicate request.", + occ_entry.key()); *occ_entry.into_mut() = st; }, @@ -607,36 +566,33 @@ impl GenericProto { let mut entry = match self.peers.entry(peer_id) { Entry::Occupied(entry) => entry, Entry::Vacant(entry) => { - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Node already disabled", entry.key()); + debug!(target: "sub-libp2p", "PSM => Drop({:?}): Already disabled.", entry.key()); return } }; match mem::replace(entry.get_mut(), PeerState::Poisoned) { st @ PeerState::Disabled { .. } | st @ PeerState::Banned { .. } => { - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Node already disabled", entry.key()); + debug!(target: "sub-libp2p", "PSM => Drop({:?}): Already disabled.", entry.key()); *entry.into_mut() = st; }, PeerState::DisabledPendingEnable { open, - connection, - connected_point, timer_deadline, timer: _ } => { - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Interrupting pending \ - enable", entry.key()); + debug!(target: "sub-libp2p", + "PSM => Drop({:?}): Interrupting pending enabling.", + entry.key()); *entry.into_mut() = PeerState::Disabled { open, - connection, - connected_point, banned_until: Some(timer_deadline), }; }, - PeerState::Enabled { open, connection, connected_point } => { - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Disabling connection", entry.key()); + PeerState::Enabled { open } => { + debug!(target: "sub-libp2p", "PSM => Drop({:?}): Disabling connections.", entry.key()); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", entry.key()); self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: entry.key().clone(), @@ -645,13 +601,11 @@ impl GenericProto { }); *entry.into_mut() = PeerState::Disabled { open, - connection, - connected_point, banned_until: None } }, - st @ PeerState::Incoming { .. } => { - error!(target: "sub-libp2p", "PSM => Drop({:?}): Was in incoming mode", + st @ PeerState::Incoming { } => { + error!(target: "sub-libp2p", "PSM => Drop({:?}): Not enabled (Incoming).", entry.key()); *entry.into_mut() = st; }, @@ -659,11 +613,11 @@ impl GenericProto { // We don't cancel dialing. Libp2p doesn't expose that on purpose, as other // sub-systems (such as the discovery mechanism) may require dialing this node as // well at the same time. - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Was not yet connected", entry.key()); + debug!(target: "sub-libp2p", "PSM => Drop({:?}): Not yet connected.", entry.key()); entry.remove(); }, PeerState::PendingRequest { timer_deadline, .. } => { - debug!(target: "sub-libp2p", "PSM => Drop({:?}): Was not yet connected", entry.key()); + debug!(target: "sub-libp2p", "PSM => Drop({:?}): Not yet connected", entry.key()); *entry.into_mut() = PeerState::Banned { until: timer_deadline } }, @@ -697,17 +651,16 @@ impl GenericProto { return }; - let (connection, connected_point) = - if let PeerState::Incoming { connection, connected_point } = state { - (*connection, connected_point.clone()) - } else { - error!(target: "sub-libp2p", "State mismatch in libp2p: entry in peers corresponding \ - to an alive incoming is not in incoming state"); - return - }; + if let PeerState::Incoming { } = state { + // ... + } else { + error!(target: "sub-libp2p", "State mismatch in libp2p: entry in peers corresponding \ + to an alive incoming is not in incoming state"); + return + }; - debug!(target: "sub-libp2p", "PSM => Accept({:?}, {:?}): Enabling connection \ - through {:?}", index, incoming.peer_id, connected_point); + debug!(target: "sub-libp2p", "PSM => Accept({:?}, {:?}): Enabling connections.", + index, incoming.peer_id); debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", incoming.peer_id); self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: incoming.peer_id, @@ -715,7 +668,7 @@ impl GenericProto { event: NotifsHandlerIn::Enable, }); - *state = PeerState::Enabled { open: false, connection, connected_point }; + *state = PeerState::Enabled { open: FnvHashMap::default() }; } /// Function that is called when the peerset wants us to reject an incoming node. @@ -741,17 +694,16 @@ impl GenericProto { return }; - let (connection, connected_point) = - if let PeerState::Incoming { connected_point, connection } = state { - (*connection, connected_point.clone()) - } else { - error!(target: "sub-libp2p", "State mismatch in libp2p: entry in peers corresponding \ - to an alive incoming is not in incoming state"); - return - }; + if let PeerState::Incoming { } = state { + // ... + } else { + error!(target: "sub-libp2p", "State mismatch in libp2p: entry in peers corresponding \ + to an alive incoming is not in incoming state"); + return + }; - debug!(target: "sub-libp2p", "PSM => Reject({:?}, {:?}): Rejecting connection through \ - {:?}", index, incoming.peer_id, connected_point); + debug!(target: "sub-libp2p", "PSM => Reject({:?}, {:?}): Rejecting connections.", + index, incoming.peer_id); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", incoming.peer_id); self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: incoming.peer_id, @@ -759,9 +711,7 @@ impl GenericProto { event: NotifsHandlerIn::Disable, }); *state = PeerState::Disabled { - open: false, - connection, - connected_point, + open: FnvHashMap::default(), banned_until: None }; } @@ -792,8 +742,9 @@ impl NetworkBehaviour for GenericProto { match (self.peers.entry(peer_id.clone()).or_insert(PeerState::Poisoned), connected_point) { (st @ &mut PeerState::Requested, connected_point) | (st @ &mut PeerState::PendingRequest { .. }, connected_point) => { - debug!(target: "sub-libp2p", "Libp2p => Connected({:?}): Connection \ - requested by PSM (through {:?})", peer_id, connected_point + debug!(target: "sub-libp2p", + "Libp2p => Connected({:?}, {:?}): Connection was requested by PSM.", + peer_id, connected_point ); debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", peer_id); self.events.push(NetworkBehaviourAction::NotifyHandler { @@ -801,7 +752,7 @@ impl NetworkBehaviour for GenericProto { handler: NotifyHandler::All, event: NotifsHandlerIn::Enable, }); - *st = PeerState::Enabled { open: false, connected_point, connection: None }; + *st = PeerState::Enabled { open: FnvHashMap::default() }; } // Note: it may seem weird that "Banned" nodes get treated as if they were absent. @@ -817,17 +768,17 @@ impl NetworkBehaviour for GenericProto { return } }; - debug!(target: "sub-libp2p", "Libp2p => Connected({:?}): Incoming connection", - peer_id); - debug!(target: "sub-libp2p", "PSM <= Incoming({:?}, {:?}): Through {:?}", - incoming_id, peer_id, connected_point); + debug!(target: "sub-libp2p", "Libp2p => Connected({:?}, {:?}): Incoming connection", + peer_id, connected_point); + debug!(target: "sub-libp2p", "PSM <= Incoming({:?}, {:?}).", + incoming_id, peer_id); self.peerset.incoming(peer_id.clone(), incoming_id); self.incoming.push(IncomingPeer { peer_id: peer_id.clone(), alive: true, incoming_id, }); - *st = PeerState::Incoming { connected_point, connection: None }; + *st = PeerState::Incoming { }; } (st @ &mut PeerState::Poisoned, connected_point) | @@ -837,8 +788,9 @@ impl NetworkBehaviour for GenericProto { } else { None }; - debug!(target: "sub-libp2p", "Libp2p => Connected({:?}): Requested by something \ - else than PSM, disabling", peer_id); + debug!(target: "sub-libp2p", + "Libp2p => Connected({:?},{:?}): Not requested by PSM, disabling.", + peer_id, connected_point); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), @@ -846,9 +798,7 @@ impl NetworkBehaviour for GenericProto { event: NotifsHandlerIn::Disable, }); *st = PeerState::Disabled { - open: false, - connection: None, - connected_point, + open: FnvHashMap::default(), banned_until }; } @@ -868,15 +818,16 @@ impl NetworkBehaviour for GenericProto { Some(PeerState::Banned { .. }) => // This is a serious bug either in this state machine or in libp2p. error!(target: "sub-libp2p", "Received inject_disconnected for non-connected \ - node {:?}", peer_id), + peer {:?}", peer_id), - Some(PeerState::Disabled { open, banned_until, .. }) => { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was disabled \ - (through {:?})", peer_id, endpoint); + Some(PeerState::Disabled { open, banned_until }) => { + debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?}): Was disabled.", + peer_id, endpoint); if let Some(until) = banned_until { self.peers.insert(peer_id.clone(), PeerState::Banned { until }); } - if open { + + if !open.is_empty() { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); let event = GenericProtoOut::CustomProtocolClosed { peer_id: peer_id.clone(), @@ -888,12 +839,13 @@ impl NetworkBehaviour for GenericProto { } Some(PeerState::DisabledPendingEnable { open, timer_deadline, .. }) => { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was disabled \ - (through {:?}) but pending enable", peer_id, endpoint); + debug!(target: "sub-libp2p", + "Libp2p => Disconnected({:?}, {:?}): Was disabled but pending enable.", + peer_id, endpoint); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()); self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer_deadline }); - if open { + if !open.is_empty() { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); let event = GenericProtoOut::CustomProtocolClosed { peer_id: peer_id.clone(), @@ -905,8 +857,9 @@ impl NetworkBehaviour for GenericProto { } Some(PeerState::Enabled { open, .. }) => { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was enabled \ - (through {:?})", peer_id, endpoint); + debug!(target: "sub-libp2p", + "Libp2p => Disconnected({:?}, {:?}): Was enabled.", + peer_id, endpoint); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()); @@ -915,7 +868,7 @@ impl NetworkBehaviour for GenericProto { until: Instant::now() + Duration::from_secs(ban_dur) }); - if open { + if !open.is_empty() { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); let event = GenericProtoOut::CustomProtocolClosed { peer_id: peer_id.clone(), @@ -928,10 +881,11 @@ impl NetworkBehaviour for GenericProto { // In the incoming state, we don't report "Dropped". Instead we will just ignore the // corresponding Accept/Reject. - Some(PeerState::Incoming { .. }) => { + Some(PeerState::Incoming { }) => { if let Some(state) = self.incoming.iter_mut().find(|i| i.peer_id == *peer_id) { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was in incoming \ - mode (id {:?}, through {:?})", peer_id, state.incoming_id, endpoint); + debug!(target: "sub-libp2p", + "Libp2p => Disconnected({:?},{:?}): Was in incoming mode with id {:?}.", + peer_id, endpoint, state.incoming_id); state.alive = false; } else { error!(target: "sub-libp2p", "State mismatch in libp2p: no entry in incoming \ @@ -992,36 +946,28 @@ impl NetworkBehaviour for GenericProto { event: NotifsHandlerOut, ) { match event { - NotifsHandlerOut::Init { endpoint } => { - debug!(target: "sub-libp2p", "Handler({:?}) => Init: {:?}", source, endpoint); + NotifsHandlerOut::Init { } => { + debug!(target: "sub-libp2p", "Handler({:?}) => Init", source); - let mut entry = if let Entry::Occupied(entry) = self.peers.entry(source.clone()) { + let entry = if let Entry::Occupied(entry) = self.peers.entry(source.clone()) { entry } else { error!(target: "sub-libp2p", "Init: State mismatch in the custom protos handler"); return }; - match entry.get_mut() { - PeerState::Enabled { connected_point, connection: primary, .. } | - PeerState::Disabled { connected_point, connection: primary, .. } | - PeerState::DisabledPendingEnable { connected_point, connection: primary, .. } | - PeerState::Incoming { connected_point, connection: primary, .. } - if connected_point == &endpoint => - { - // We take that handler as the one of our primary connection - // for this peer tracked in the `PeerState`. - *primary = primary.or(Some(connection)); - } - _ => {} - } - let event = match entry.get() { // Waiting for a decision from the PSM. Let the handler stay // in initialisation state. PeerState::Incoming { .. } => None, - PeerState::Enabled { .. } => Some(NotifsHandlerIn::Enable), - _ => Some(NotifsHandlerIn::Disable), + PeerState::Enabled { .. } => { + debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", source); + Some(NotifsHandlerIn::Enable) + } + _ => { + debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", source); + Some(NotifsHandlerIn::Disable) + } }; if let Some(event) = event { @@ -1032,8 +978,10 @@ impl NetworkBehaviour for GenericProto { }); } } - NotifsHandlerOut::Closed { reason } => { - debug!(target: "sub-libp2p", "Handler({:?}) => Closed: {}", source, reason); + NotifsHandlerOut::Closed { endpoint, reason } => { + debug!(target: "sub-libp2p", + "Handler({:?}) => Endpoint {:?} closed for custom protocols: {}", + source, endpoint, reason); let mut entry = if let Entry::Occupied(entry) = self.peers.entry(source.clone()) { entry @@ -1042,95 +990,93 @@ impl NetworkBehaviour for GenericProto { return }; - // Return early if this is not the primary connection w.r.t. the - // `PeerState` and PSM. - if entry.get().connection() != &Some(connection) { - debug!(target: "sub-libp2p", "Secondary connection closed by {:?}", reason); - return - } - - debug!(target: "sub-libp2p", "External API <= Closed({:?})", source); - let event = GenericProtoOut::CustomProtocolClosed { - reason, - peer_id: source.clone(), - }; - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); - - match mem::replace(entry.get_mut(), PeerState::Poisoned) { - PeerState::Enabled { open, connection, connected_point } => { - debug_assert!(open); - - debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", source); - self.peerset.dropped(source.clone()); + let last = match mem::replace(entry.get_mut(), PeerState::Poisoned) { + PeerState::Enabled { mut open, .. } => { + debug_assert!(open.contains_key(&connection)); + open.remove(&connection); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", source); self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: source.clone(), - handler: NotifyHandler::All, + handler: NotifyHandler::One(connection), event: NotifsHandlerIn::Disable, }); - *entry.into_mut() = PeerState::Disabled { - open: false, - connection, - connected_point, - banned_until: None - }; + let last = open.is_empty(); + + if last { + debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", source); + self.peerset.dropped(source.clone()); + + *entry.into_mut() = PeerState::Disabled { + open, + banned_until: None + }; + } + + last }, - PeerState::Disabled { open, connection, connected_point, banned_until } => { - debug_assert!(open); + PeerState::Disabled { mut open, banned_until } => { + debug_assert!(open.contains_key(&connection)); + open.remove(&connection); + let last = open.is_empty(); *entry.into_mut() = PeerState::Disabled { - open: false, - connection, - connected_point, + open, banned_until }; + last }, PeerState::DisabledPendingEnable { - open, - connection, - connected_point, + mut open, timer, timer_deadline } => { - debug_assert!(open); + debug_assert!(open.contains_key(&connection)); + open.remove(&connection); + let last = open.is_empty(); *entry.into_mut() = PeerState::DisabledPendingEnable { - open: false, - connection, - connected_point, + open, timer, timer_deadline }; + last }, - state => error!(target: "sub-libp2p", - "Unexpected state in the custom protos handler: {:?}", state), + state => { + error!(target: "sub-libp2p", + "Unexpected state in the custom protos handler: {:?}", + state); + return + } + }; + + if last { + debug!(target: "sub-libp2p", "External API <= Closed({:?})", source); + let event = GenericProtoOut::CustomProtocolClosed { + reason, + peer_id: source.clone(), + }; + self.events.push(NetworkBehaviourAction::GenerateEvent(event)); } } - NotifsHandlerOut::Open => { - debug!(target: "sub-libp2p", "Handler({:?}) => Open", source); - let endpoint = match self.peers.get_mut(&source) { + NotifsHandlerOut::Open { endpoint } => { + debug!(target: "sub-libp2p", + "Handler({:?}) => Endpoint {:?} open for custom protocols.", + source, endpoint); + + let first = match self.peers.get_mut(&source) { Some(PeerState::Enabled { - ref mut open, connected_point, connection: primary + ref mut open, .. }) | Some(PeerState::DisabledPendingEnable { - ref mut open, connected_point, connection: primary, .. + ref mut open, .. }) | Some(PeerState::Disabled { - ref mut open, connected_point, connection: primary, .. + ref mut open, .. }) => { - if primary != &Some(connection) { - debug!(target: "sub-libp2p", - "Secondary connection opened custom protocol."); - return - } - if *open { - error!(target: "sub-libp2p", - "Open: State mismatch in the custom protos handler"); - return - } - *open = true; - connected_point.clone() + let first = open.is_empty(); + open.insert(connection, endpoint); + first } state => { error!(target: "sub-libp2p", @@ -1140,13 +1086,16 @@ impl NetworkBehaviour for GenericProto { } }; - debug!(target: "sub-libp2p", "External API <= Open({:?})", source); - let event = GenericProtoOut::CustomProtocolOpen { - peer_id: source, - endpoint, - }; + if first { + debug!(target: "sub-libp2p", "External API <= Open({:?})", source); + let event = GenericProtoOut::CustomProtocolOpen { + peer_id: source, + }; - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + } else { + debug!(target: "sub-libp2p", "Secondary connection opened custom protocol."); + } } NotifsHandlerOut::CustomMessage { message } => { @@ -1205,13 +1154,6 @@ impl NetworkBehaviour for GenericProto { } NotifsHandlerOut::ProtocolError { error, .. } => { - if self.peers.get(&source).and_then(|p| *p.connection()) != Some(connection) { - debug!(target: "sub-libp2p", - "Handler({:?}) => Secondary connection severe protocol error: {:?}", - source, error); - return - } - debug!(target: "sub-libp2p", "Handler({:?}) => Severe protocol error: {:?}", source, error); @@ -1277,29 +1219,25 @@ impl NetworkBehaviour for GenericProto { PeerState::DisabledPendingEnable { mut timer, - connected_point, - connection, open, timer_deadline } => { if let Poll::Pending = Pin::new(&mut timer).poll(cx) { *peer_state = PeerState::DisabledPendingEnable { timer, - connection, - connected_point, open, timer_deadline }; continue; } - debug!(target: "sub-libp2p", "Handler({:?}) <= Enable now that ban has expired", peer_id); + debug!(target: "sub-libp2p", "Handler({:?}) <= Enable (ban expired)", peer_id); self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::All, event: NotifsHandlerIn::Enable, }); - *peer_state = PeerState::Enabled { connection, connected_point, open }; + *peer_state = PeerState::Enabled { open }; } st @ _ => *peer_state = st, diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 7656011f5ee6e..24753f69e6069 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -186,21 +186,25 @@ pub enum NotifsHandlerIn { /// Event that can be emitted by a `NotifsHandler`. #[derive(Debug)] pub enum NotifsHandlerOut { - /// The handler is requesting initialisation. + /// The connection handler is requesting initialisation, i.e. + /// to be either enabled or disabled. /// /// This is always the first event emitted by a handler and it is only /// emitted once. - Init { + Init { }, + + /// The connection is open for custom protocols. + Open { + /// The endpoint of the connection that is open for custom protocols. endpoint: ConnectedPoint, }, - /// Opened the substreams with the remote. - Open, - - /// Closed the substreams with the remote. + /// The connection is closed for custom protocols. Closed { - /// Reason why the substream closed, for diagnostic purposes. + /// The reason for closing, for diagnostic purposes. reason: Cow<'static, str>, + /// The endpoint of the connection that closed for custom protocols. + endpoint: ConnectedPoint, }, /// Received a non-gossiping message on the legacy substream. @@ -501,17 +505,17 @@ impl ProtocolsHandler for NotifsHandler { protocol: protocol.map_upgrade(EitherUpgrade::B), info: None, }), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Init { endpoint }) => + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Init { }) => return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Init { endpoint } + NotifsHandlerOut::Init { } )), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { .. }) => + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) => return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Open + NotifsHandlerOut::Open { endpoint } )), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason }) => + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Closed { reason } + NotifsHandlerOut::Closed { endpoint, reason } )), ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => return Poll::Ready(ProtocolsHandlerEvent::Custom( diff --git a/client/network/src/protocol/generic_proto/handler/legacy.rs b/client/network/src/protocol/generic_proto/handler/legacy.rs index 8c97571466829..0743837f469d3 100644 --- a/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -111,7 +111,7 @@ impl IntoProtocolsHandler for LegacyProtoHandlerProto { fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { let mut handler = LegacyProtoHandler { protocol: self.protocol, - endpoint: connected_point.to_endpoint(), + endpoint: connected_point.clone(), remote_peer_id: remote_peer_id.clone(), state: ProtocolState::Init { substreams: SmallVec::new(), @@ -119,9 +119,7 @@ impl IntoProtocolsHandler for LegacyProtoHandlerProto { }, events_queue: SmallVec::new(), }; - handler.events_queue.push(ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Init { - endpoint: connected_point.clone() - })); + handler.events_queue.push(ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Init {})); handler } } @@ -140,7 +138,7 @@ pub struct LegacyProtoHandler { /// Whether we are the connection dialer or listener. Used to determine who, between the local /// node and the remote node, has priority. - endpoint: Endpoint, + endpoint: ConnectedPoint, /// Queue of events to send to the outside. /// @@ -223,19 +221,22 @@ pub enum LegacyProtoHandlerOut { /// This is always the first event emitted by a handler, and it is only /// emitted once. Init { - endpoint: ConnectedPoint, }, /// Opened a custom protocol with the remote. CustomProtocolOpen { /// Version of the protocol that has been opened. version: u8, + /// TODO + endpoint: ConnectedPoint, }, /// Closed a custom protocol with the remote. CustomProtocolClosed { /// Reason why the substream closed, for diagnostic purposes. reason: Cow<'static, str>, + /// TODO + endpoint: ConnectedPoint, }, /// Receives a message on a custom protocol substream. @@ -284,7 +285,7 @@ impl LegacyProtoHandler { ProtocolState::Init { substreams: incoming, .. } => { if incoming.is_empty() { - if let Endpoint::Dialer = self.endpoint { + if let ConnectedPoint::Dialer { .. } = self.endpoint { self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(self.protocol.clone()), info: (), @@ -293,10 +294,10 @@ impl LegacyProtoHandler { ProtocolState::Opening { deadline: Delay::new(Duration::from_secs(60)) } - } else { let event = LegacyProtoHandlerOut::CustomProtocolOpen { - version: incoming[0].protocol_version() + version: incoming[0].protocol_version(), + endpoint: self.endpoint.clone() }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal { @@ -416,6 +417,7 @@ impl LegacyProtoHandler { if substreams.is_empty() { let event = LegacyProtoHandlerOut::CustomProtocolClosed { reason: "All substreams have been closed by the remote".into(), + endpoint: self.endpoint.clone() }; self.state = ProtocolState::Disabled { shutdown: shutdown.into_iter().collect(), @@ -428,6 +430,7 @@ impl LegacyProtoHandler { if substreams.is_empty() { let event = LegacyProtoHandlerOut::CustomProtocolClosed { reason: format!("Error on the last substream: {:?}", err).into(), + endpoint: self.endpoint.clone() }; self.state = ProtocolState::Disabled { shutdown: shutdown.into_iter().collect(), @@ -491,7 +494,8 @@ impl LegacyProtoHandler { ProtocolState::Opening { .. } => { let event = LegacyProtoHandlerOut::CustomProtocolOpen { - version: substream.protocol_version() + version: substream.protocol_version(), + endpoint: self.endpoint.clone() }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal {